http://git-wip-us.apache.org/repos/asf/samza/blob/9d2d49e9/samza-core/src/test/java/org/apache/samza/application/TestStreamApplicationDescriptorImpl.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/application/TestStreamApplicationDescriptorImpl.java b/samza-core/src/test/java/org/apache/samza/application/TestStreamApplicationDescriptorImpl.java index 84f5dbb..de16ef2 100644 --- a/samza-core/src/test/java/org/apache/samza/application/TestStreamApplicationDescriptorImpl.java +++ b/samza-core/src/test/java/org/apache/samza/application/TestStreamApplicationDescriptorImpl.java @@ -19,16 +19,17 @@ package org.apache.samza.application; import com.google.common.collect.ImmutableList; - import java.util.ArrayList; import java.util.HashMap; import java.util.List; +import java.util.Optional; import java.util.concurrent.atomic.AtomicInteger; import org.apache.samza.SamzaException; import org.apache.samza.config.Config; import org.apache.samza.config.JobConfig; +import org.apache.samza.context.ApplicationContainerContextFactory; +import org.apache.samza.context.ApplicationTaskContextFactory; import org.apache.samza.operators.BaseTableDescriptor; -import org.apache.samza.operators.ContextManager; import org.apache.samza.operators.data.TestMessageEnvelope; import org.apache.samza.operators.descriptors.GenericInputDescriptor; import org.apache.samza.operators.descriptors.GenericOutputDescriptor; @@ -521,11 +522,35 @@ public class TestStreamApplicationDescriptorImpl { } @Test - public void testContextManager() { - ContextManager cntxMan = mock(ContextManager.class); - StreamApplication testApp = appDesc -> appDesc.withContextManager(cntxMan); + public void testApplicationContainerContextFactory() { + ApplicationContainerContextFactory factory = mock(ApplicationContainerContextFactory.class); + StreamApplication testApp = appDesc -> appDesc.withApplicationContainerContextFactory(factory); + StreamApplicationDescriptorImpl appSpec = new StreamApplicationDescriptorImpl(testApp, mock(Config.class)); + assertEquals(appSpec.getApplicationContainerContextFactory(), Optional.of(factory)); + } + + @Test + public void testNoApplicationContainerContextFactory() { + StreamApplication testApp = appDesc -> { + }; + StreamApplicationDescriptorImpl appSpec = new StreamApplicationDescriptorImpl(testApp, mock(Config.class)); + assertEquals(appSpec.getApplicationContainerContextFactory(), Optional.empty()); + } + + @Test + public void testApplicationTaskContextFactory() { + ApplicationTaskContextFactory factory = mock(ApplicationTaskContextFactory.class); + StreamApplication testApp = appDesc -> appDesc.withApplicationTaskContextFactory(factory); + StreamApplicationDescriptorImpl appSpec = new StreamApplicationDescriptorImpl(testApp, mock(Config.class)); + assertEquals(appSpec.getApplicationTaskContextFactory(), Optional.of(factory)); + } + + @Test + public void testNoApplicationTaskContextFactory() { + StreamApplication testApp = appDesc -> { + }; StreamApplicationDescriptorImpl appSpec = new StreamApplicationDescriptorImpl(testApp, mock(Config.class)); - assertEquals(appSpec.getContextManager(), cntxMan); + assertEquals(appSpec.getApplicationTaskContextFactory(), Optional.empty()); } @Test
http://git-wip-us.apache.org/repos/asf/samza/blob/9d2d49e9/samza-core/src/test/java/org/apache/samza/application/TestTaskApplicationDescriptorImpl.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/application/TestTaskApplicationDescriptorImpl.java b/samza-core/src/test/java/org/apache/samza/application/TestTaskApplicationDescriptorImpl.java index abe5ce1..e79e25b 100644 --- a/samza-core/src/test/java/org/apache/samza/application/TestTaskApplicationDescriptorImpl.java +++ b/samza-core/src/test/java/org/apache/samza/application/TestTaskApplicationDescriptorImpl.java @@ -21,10 +21,12 @@ package org.apache.samza.application; import java.util.ArrayList; import java.util.HashSet; import java.util.List; +import java.util.Optional; import java.util.Set; import org.apache.samza.config.Config; +import org.apache.samza.context.ApplicationContainerContextFactory; +import org.apache.samza.context.ApplicationTaskContextFactory; import org.apache.samza.operators.BaseTableDescriptor; -import org.apache.samza.operators.ContextManager; import org.apache.samza.operators.TableDescriptor; import org.apache.samza.operators.descriptors.base.stream.InputDescriptor; import org.apache.samza.operators.descriptors.base.stream.OutputDescriptor; @@ -127,13 +129,35 @@ public class TestTaskApplicationDescriptorImpl { } @Test - public void testContextManager() { - ContextManager cntxMan = mock(ContextManager.class); + public void testApplicationContainerContextFactory() { + ApplicationContainerContextFactory factory = mock(ApplicationContainerContextFactory.class); + TaskApplication testApp = appDesc -> appDesc.withApplicationContainerContextFactory(factory); + TaskApplicationDescriptorImpl appSpec = new TaskApplicationDescriptorImpl(testApp, mock(Config.class)); + assertEquals(appSpec.getApplicationContainerContextFactory(), Optional.of(factory)); + } + + @Test + public void testNoApplicationContainerContextFactory() { TaskApplication testApp = appDesc -> { - appDesc.withContextManager(cntxMan); }; - TaskApplicationDescriptorImpl appDesc = new TaskApplicationDescriptorImpl(testApp, config); - assertEquals(appDesc.getContextManager(), cntxMan); + TaskApplicationDescriptorImpl appSpec = new TaskApplicationDescriptorImpl(testApp, mock(Config.class)); + assertEquals(appSpec.getApplicationContainerContextFactory(), Optional.empty()); + } + + @Test + public void testApplicationTaskContextFactory() { + ApplicationTaskContextFactory factory = mock(ApplicationTaskContextFactory.class); + TaskApplication testApp = appDesc -> appDesc.withApplicationTaskContextFactory(factory); + TaskApplicationDescriptorImpl appSpec = new TaskApplicationDescriptorImpl(testApp, mock(Config.class)); + assertEquals(appSpec.getApplicationTaskContextFactory(), Optional.of(factory)); + } + + @Test + public void testNoApplicationTaskContextFactory() { + TaskApplication testApp = appDesc -> { + }; + TaskApplicationDescriptorImpl appSpec = new TaskApplicationDescriptorImpl(testApp, mock(Config.class)); + assertEquals(appSpec.getApplicationTaskContextFactory(), Optional.empty()); } @Test http://git-wip-us.apache.org/repos/asf/samza/blob/9d2d49e9/samza-core/src/test/java/org/apache/samza/context/MockContext.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/context/MockContext.java b/samza-core/src/test/java/org/apache/samza/context/MockContext.java new file mode 100644 index 0000000..778d486 --- /dev/null +++ b/samza-core/src/test/java/org/apache/samza/context/MockContext.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.context; + +import org.apache.samza.config.Config; +import org.apache.samza.config.MapConfig; + +import static org.mockito.Mockito.*; + + +public class MockContext implements Context { + private final JobContext jobContext = mock(JobContext.class); + private final ContainerContext containerContext = mock(ContainerContext.class); + /** + * This is {@link TaskContextImpl} because some tests need more than just the interface. + */ + private final TaskContextImpl taskContext = mock(TaskContextImpl.class); + private final ApplicationContainerContext applicationContainerContext = mock(ApplicationContainerContext.class); + private final ApplicationTaskContext applicationTaskContext = mock(ApplicationTaskContext.class); + + public MockContext() { + this(new MapConfig()); + } + + /** + * @param config config is widely used, so help wire it in here + */ + public MockContext(Config config) { + when(this.jobContext.getConfig()).thenReturn(config); + } + + @Override + public JobContext getJobContext() { + return jobContext; + } + + @Override + public ContainerContext getContainerContext() { + return containerContext; + } + + @Override + public TaskContext getTaskContext() { + return taskContext; + } + + @Override + public ApplicationContainerContext getApplicationContainerContext() { + return applicationContainerContext; + } + + @Override + public ApplicationTaskContext getApplicationTaskContext() { + return applicationTaskContext; + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/9d2d49e9/samza-core/src/test/java/org/apache/samza/context/TestContextImpl.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/context/TestContextImpl.java b/samza-core/src/test/java/org/apache/samza/context/TestContextImpl.java index 33ad3a5..40526db 100644 --- a/samza-core/src/test/java/org/apache/samza/context/TestContextImpl.java +++ b/samza-core/src/test/java/org/apache/samza/context/TestContextImpl.java @@ -18,9 +18,11 @@ */ package org.apache.samza.context; +import java.util.Optional; import org.junit.Test; import static org.junit.Assert.assertEquals; +import static org.mockito.Mockito.mock; public class TestContextImpl { @@ -63,11 +65,17 @@ public class TestContextImpl { } private static Context buildWithApplicationContainerContext(ApplicationContainerContext applicationContainerContext) { - return new ContextImpl(null, null, null, applicationContainerContext, null); + return buildWithApplicationContext(applicationContainerContext, mock(ApplicationTaskContext.class)); } private static Context buildWithApplicationTaskContext(ApplicationTaskContext applicationTaskContext) { - return new ContextImpl(null, null, null, null, applicationTaskContext); + return buildWithApplicationContext(mock(ApplicationContainerContext.class), applicationTaskContext); + } + + private static Context buildWithApplicationContext(ApplicationContainerContext applicationContainerContext, + ApplicationTaskContext applicationTaskContext) { + return new ContextImpl(mock(JobContext.class), mock(ContainerContext.class), mock(TaskContext.class), + Optional.ofNullable(applicationContainerContext), Optional.ofNullable(applicationTaskContext)); } /** http://git-wip-us.apache.org/repos/asf/samza/blob/9d2d49e9/samza-core/src/test/java/org/apache/samza/context/TestTaskContextImpl.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/context/TestTaskContextImpl.java b/samza-core/src/test/java/org/apache/samza/context/TestTaskContextImpl.java index 78f886c..3d3803b 100644 --- a/samza-core/src/test/java/org/apache/samza/context/TestTaskContextImpl.java +++ b/samza-core/src/test/java/org/apache/samza/context/TestTaskContextImpl.java @@ -34,6 +34,7 @@ import org.mockito.Mock; import org.mockito.MockitoAnnotations; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -62,7 +63,7 @@ public class TestTaskContextImpl { MockitoAnnotations.initMocks(this); taskContext = new TaskContextImpl(taskModel, taskMetricsRegistry, keyValueStoreProvider, tableManager, callbackScheduler, - offsetManager); + offsetManager, null, null); when(this.taskModel.getTaskName()).thenReturn(TASK_NAME); } @@ -95,4 +96,16 @@ public class TestTaskContextImpl { taskContext.setStartingOffset(ssp, "123"); verify(offsetManager).setStartingOffset(TASK_NAME, ssp, "123"); } + + /** + * Given a registered object, fetchObject should get it. If an object is not registered at a key, then fetchObject + * should return null. + */ + @Test + public void testRegisterAndFetchObject() { + String value = "hello world"; + taskContext.registerObject("key", value); + assertEquals(value, taskContext.fetchObject("key")); + assertNull(taskContext.fetchObject("not a key")); + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/samza/blob/9d2d49e9/samza-core/src/test/java/org/apache/samza/execution/TestJobNodeConfigurationGenerator.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/execution/TestJobNodeConfigurationGenerator.java b/samza-core/src/test/java/org/apache/samza/execution/TestJobNodeConfigurationGenerator.java index 51a9523..4618e52 100644 --- a/samza-core/src/test/java/org/apache/samza/execution/TestJobNodeConfigurationGenerator.java +++ b/samza-core/src/test/java/org/apache/samza/execution/TestJobNodeConfigurationGenerator.java @@ -19,14 +19,6 @@ package org.apache.samza.execution; import com.google.common.base.Joiner; -import java.util.ArrayList; -import java.util.Base64; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.stream.Collectors; import org.apache.samza.application.StreamApplicationDescriptorImpl; import org.apache.samza.application.TaskApplicationDescriptorImpl; import org.apache.samza.config.Config; @@ -36,7 +28,7 @@ import org.apache.samza.config.MapConfig; import org.apache.samza.config.SerializerConfig; import org.apache.samza.config.TaskConfig; import org.apache.samza.config.TaskConfigJava; -import org.apache.samza.container.SamzaContainerContext; +import org.apache.samza.context.Context; import org.apache.samza.operators.BaseTableDescriptor; import org.apache.samza.operators.KV; import org.apache.samza.operators.TableDescriptor; @@ -52,9 +44,17 @@ import org.apache.samza.table.Table; import org.apache.samza.table.TableProvider; import org.apache.samza.table.TableProviderFactory; import org.apache.samza.table.TableSpec; -import org.apache.samza.task.TaskContext; import org.junit.Test; +import java.util.ArrayList; +import java.util.Base64; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; + import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; import static org.mockito.Mockito.mock; @@ -445,7 +445,7 @@ public class TestJobNodeConfigurationGenerator extends ExecutionPlannerTestBase } @Override - public void init(SamzaContainerContext containerContext, TaskContext taskContext) { + public void init(Context context) { } http://git-wip-us.apache.org/repos/asf/samza/blob/9d2d49e9/samza-core/src/test/java/org/apache/samza/operators/TestJoinOperator.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/operators/TestJoinOperator.java b/samza-core/src/test/java/org/apache/samza/operators/TestJoinOperator.java index 6fa9ed1..1315912 100644 --- a/samza-core/src/test/java/org/apache/samza/operators/TestJoinOperator.java +++ b/samza-core/src/test/java/org/apache/samza/operators/TestJoinOperator.java @@ -19,13 +19,14 @@ package org.apache.samza.operators; import com.google.common.collect.ImmutableSet; - import org.apache.samza.Partition; import org.apache.samza.SamzaException; import org.apache.samza.application.StreamApplicationDescriptorImpl; import org.apache.samza.config.Config; import org.apache.samza.config.MapConfig; -import org.apache.samza.container.TaskContextImpl; +import org.apache.samza.context.Context; +import org.apache.samza.context.MockContext; +import org.apache.samza.job.model.TaskModel; import org.apache.samza.metrics.MetricsRegistryMap; import org.apache.samza.operators.descriptors.GenericInputDescriptor; import org.apache.samza.operators.descriptors.GenericSystemDescriptor; @@ -40,7 +41,6 @@ import org.apache.samza.system.SystemStream; import org.apache.samza.system.SystemStreamPartition; import org.apache.samza.task.MessageCollector; import org.apache.samza.task.StreamOperatorTask; -import org.apache.samza.task.TaskContext; import org.apache.samza.task.TaskCoordinator; import org.apache.samza.testUtils.StreamTestUtils; import org.apache.samza.testUtils.TestClock; @@ -56,10 +56,10 @@ import java.util.List; import java.util.Map; import java.util.Set; -import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertEquals; -import static org.mockito.Matchers.eq; +import static org.junit.Assert.assertTrue; import static org.mockito.Matchers.any; +import static org.mockito.Matchers.eq; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; @@ -304,22 +304,23 @@ public class TestJoinOperator { mapConfig.put("job.id", "jobId"); StreamTestUtils.addStreamConfigs(mapConfig, "inStream", "insystem", "instream"); StreamTestUtils.addStreamConfigs(mapConfig, "inStream2", "insystem", "instream2"); - Config config = new MapConfig(mapConfig); - TaskContextImpl taskContext = mock(TaskContextImpl.class); - when(taskContext.getSystemStreamPartitions()).thenReturn(ImmutableSet + Context context = new MockContext(new MapConfig(mapConfig)); + TaskModel taskModel = mock(TaskModel.class); + when(taskModel.getSystemStreamPartitions()).thenReturn(ImmutableSet .of(new SystemStreamPartition("insystem", "instream", new Partition(0)), new SystemStreamPartition("insystem", "instream2", new Partition(0)))); - when(taskContext.getMetricsRegistry()).thenReturn(new MetricsRegistryMap()); + when(context.getTaskContext().getTaskModel()).thenReturn(taskModel); + when(context.getTaskContext().getTaskMetricsRegistry()).thenReturn(new MetricsRegistryMap()); // need to return different stores for left and right side IntegerSerde integerSerde = new IntegerSerde(); TimestampedValueSerde timestampedValueSerde = new TimestampedValueSerde(new KVSerde(integerSerde, integerSerde)); - when(taskContext.getStore(eq("jobName-jobId-join-j1-L"))) + when(context.getTaskContext().getStore(eq("jobName-jobId-join-j1-L"))) .thenReturn(new TestInMemoryStore(integerSerde, timestampedValueSerde)); - when(taskContext.getStore(eq("jobName-jobId-join-j1-R"))) + when(context.getTaskContext().getStore(eq("jobName-jobId-join-j1-R"))) .thenReturn(new TestInMemoryStore(integerSerde, timestampedValueSerde)); - StreamOperatorTask sot = new StreamOperatorTask(graphSpec.getOperatorSpecGraph(), graphSpec.getContextManager(), clock); - sot.init(config, taskContext); + StreamOperatorTask sot = new StreamOperatorTask(graphSpec.getOperatorSpecGraph(), clock); + sot.init(context); return sot; } @@ -357,7 +358,7 @@ public class TestJoinOperator { private int numCloseCalls = 0; @Override - public void init(Config config, TaskContext context) { + public void init(Context context) { numInitCalls++; } http://git-wip-us.apache.org/repos/asf/samza/blob/9d2d49e9/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImpl.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImpl.java b/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImpl.java index 6d12d99..0ff2e0d 100644 --- a/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImpl.java +++ b/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImpl.java @@ -21,9 +21,9 @@ package org.apache.samza.operators.impl; import java.util.Collection; import java.util.Collections; import java.util.Set; - -import org.apache.samza.config.Config; -import org.apache.samza.container.TaskContextImpl; +import org.apache.samza.context.Context; +import org.apache.samza.context.MockContext; +import org.apache.samza.job.model.TaskModel; import org.apache.samza.metrics.Counter; import org.apache.samza.metrics.MetricsRegistryMap; import org.apache.samza.metrics.ReadableMetricsRegistry; @@ -32,8 +32,8 @@ import org.apache.samza.operators.functions.ScheduledFunction; import org.apache.samza.operators.functions.WatermarkFunction; import org.apache.samza.operators.spec.OperatorSpec; import org.apache.samza.task.MessageCollector; -import org.apache.samza.task.TaskContext; import org.apache.samza.task.TaskCoordinator; +import org.junit.Before; import org.junit.Test; import static org.mockito.Matchers.anyLong; @@ -46,14 +46,20 @@ import static org.mockito.Mockito.when; public class TestOperatorImpl { + private Context context; + + @Before + public void setup() { + this.context = new MockContext(); + when(this.context.getTaskContext().getTaskMetricsRegistry()).thenReturn(new MetricsRegistryMap()); + when(this.context.getTaskContext().getTaskModel()).thenReturn(mock(TaskModel.class)); + } @Test(expected = IllegalStateException.class) public void testMultipleInitShouldThrow() { OperatorImpl<Object, Object> opImpl = new TestOpImpl(mock(Object.class)); - TaskContextImpl mockTaskContext = mock(TaskContextImpl.class); - when(mockTaskContext.getMetricsRegistry()).thenReturn(new MetricsRegistryMap()); - opImpl.init(mock(Config.class), mockTaskContext); - opImpl.init(mock(Config.class), mockTaskContext); + opImpl.init(this.context); + opImpl.init(this.context); } @Test(expected = IllegalStateException.class) @@ -64,24 +70,21 @@ public class TestOperatorImpl { @Test public void testOnMessagePropagatesResults() { - TaskContextImpl mockTaskContext = mock(TaskContextImpl.class); - when(mockTaskContext.getMetricsRegistry()).thenReturn(new MetricsRegistryMap()); - Object mockTestOpImplOutput = mock(Object.class); OperatorImpl<Object, Object> opImpl = new TestOpImpl(mockTestOpImplOutput); - opImpl.init(mock(Config.class), mockTaskContext); + opImpl.init(this.context); // register a couple of operators OperatorImpl mockNextOpImpl1 = mock(OperatorImpl.class); when(mockNextOpImpl1.getOperatorSpec()).thenReturn(new TestOpSpec()); when(mockNextOpImpl1.handleMessage(anyObject(), anyObject(), anyObject())).thenReturn(Collections.emptyList()); - mockNextOpImpl1.init(mock(Config.class), mockTaskContext); + mockNextOpImpl1.init(this.context); opImpl.registerNextOperator(mockNextOpImpl1); OperatorImpl mockNextOpImpl2 = mock(OperatorImpl.class); when(mockNextOpImpl2.getOperatorSpec()).thenReturn(new TestOpSpec()); when(mockNextOpImpl2.handleMessage(anyObject(), anyObject(), anyObject())).thenReturn(Collections.emptyList()); - mockNextOpImpl2.init(mock(Config.class), mockTaskContext); + mockNextOpImpl2.init(this.context); opImpl.registerNextOperator(mockNextOpImpl2); // send a message to this operator @@ -96,9 +99,8 @@ public class TestOperatorImpl { @Test public void testOnMessageUpdatesMetrics() { - TaskContextImpl mockTaskContext = mock(TaskContextImpl.class); ReadableMetricsRegistry mockMetricsRegistry = mock(ReadableMetricsRegistry.class); - when(mockTaskContext.getMetricsRegistry()).thenReturn(mockMetricsRegistry); + when(this.context.getTaskContext().getTaskMetricsRegistry()).thenReturn(mockMetricsRegistry); Counter mockCounter = mock(Counter.class); Timer mockTimer = mock(Timer.class); when(mockMetricsRegistry.newCounter(anyString(), anyString())).thenReturn(mockCounter); @@ -106,7 +108,7 @@ public class TestOperatorImpl { Object mockTestOpImplOutput = mock(Object.class); OperatorImpl<Object, Object> opImpl = new TestOpImpl(mockTestOpImplOutput); - opImpl.init(mock(Config.class), mockTaskContext); + opImpl.init(this.context); // send a message to this operator MessageCollector mockCollector = mock(MessageCollector.class); @@ -120,24 +122,21 @@ public class TestOperatorImpl { @Test public void testOnTimerPropagatesResultsAndTimer() { - TaskContextImpl mockTaskContext = mock(TaskContextImpl.class); - when(mockTaskContext.getMetricsRegistry()).thenReturn(new MetricsRegistryMap()); - Object mockTestOpImplOutput = mock(Object.class); OperatorImpl<Object, Object> opImpl = new TestOpImpl(mockTestOpImplOutput); - opImpl.init(mock(Config.class), mockTaskContext); + opImpl.init(this.context); // register a couple of operators OperatorImpl mockNextOpImpl1 = mock(OperatorImpl.class); when(mockNextOpImpl1.getOperatorSpec()).thenReturn(new TestOpSpec()); when(mockNextOpImpl1.handleMessage(anyObject(), anyObject(), anyObject())).thenReturn(Collections.emptyList()); - mockNextOpImpl1.init(mock(Config.class), mockTaskContext); + mockNextOpImpl1.init(this.context); opImpl.registerNextOperator(mockNextOpImpl1); OperatorImpl mockNextOpImpl2 = mock(OperatorImpl.class); when(mockNextOpImpl2.getOperatorSpec()).thenReturn(new TestOpSpec()); when(mockNextOpImpl2.handleMessage(anyObject(), anyObject(), anyObject())).thenReturn(Collections.emptyList()); - mockNextOpImpl2.init(mock(Config.class), mockTaskContext); + mockNextOpImpl2.init(this.context); opImpl.registerNextOperator(mockNextOpImpl2); // send a timer tick to this operator @@ -156,9 +155,8 @@ public class TestOperatorImpl { @Test public void testOnTimerUpdatesMetrics() { - TaskContextImpl mockTaskContext = mock(TaskContextImpl.class); ReadableMetricsRegistry mockMetricsRegistry = mock(ReadableMetricsRegistry.class); - when(mockTaskContext.getMetricsRegistry()).thenReturn(mockMetricsRegistry); + when(this.context.getTaskContext().getTaskMetricsRegistry()).thenReturn(mockMetricsRegistry); Counter mockMessageCounter = mock(Counter.class); Timer mockTimer = mock(Timer.class); when(mockMetricsRegistry.newCounter(anyString(), anyString())).thenReturn(mockMessageCounter); @@ -166,7 +164,7 @@ public class TestOperatorImpl { Object mockTestOpImplOutput = mock(Object.class); OperatorImpl<Object, Object> opImpl = new TestOpImpl(mockTestOpImplOutput); - opImpl.init(mock(Config.class), mockTaskContext); + opImpl.init(this.context); // send a message to this operator MessageCollector mockCollector = mock(MessageCollector.class); @@ -188,7 +186,7 @@ public class TestOperatorImpl { } @Override - protected void handleInit(Config config, TaskContext context) {} + protected void handleInit(Context context) {} @Override public Collection<Object> handleMessage(Object message, http://git-wip-us.apache.org/repos/asf/samza/blob/9d2d49e9/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImplGraph.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImplGraph.java b/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImplGraph.java index 3abd502..d760805 100644 --- a/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImplGraph.java +++ b/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImplGraph.java @@ -21,27 +21,16 @@ package org.apache.samza.operators.impl; import com.google.common.collect.HashMultimap; import com.google.common.collect.Multimap; -import java.io.Serializable; -import java.time.Duration; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.function.BiFunction; -import java.util.function.Function; import org.apache.samza.Partition; import org.apache.samza.application.StreamApplicationDescriptorImpl; import org.apache.samza.config.Config; import org.apache.samza.config.JobConfig; import org.apache.samza.config.MapConfig; import org.apache.samza.config.StreamConfig; -import org.apache.samza.container.SamzaContainerContext; -import org.apache.samza.container.TaskContextImpl; import org.apache.samza.container.TaskName; +import org.apache.samza.context.Context; +import org.apache.samza.context.MockContext; +import org.apache.samza.context.TaskContextImpl; import org.apache.samza.job.model.ContainerModel; import org.apache.samza.job.model.JobModel; import org.apache.samza.job.model.TaskModel; @@ -67,15 +56,28 @@ import org.apache.samza.system.IncomingMessageEnvelope; import org.apache.samza.system.SystemStream; import org.apache.samza.system.SystemStreamPartition; import org.apache.samza.task.MessageCollector; -import org.apache.samza.task.TaskContext; import org.apache.samza.task.TaskCoordinator; import org.apache.samza.testUtils.StreamTestUtils; import org.apache.samza.util.Clock; import org.apache.samza.util.SystemClock; import org.apache.samza.util.TimestampedValue; import org.junit.After; +import org.junit.Before; import org.junit.Test; +import java.io.Serializable; +import java.time.Duration; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.function.BiFunction; +import java.util.function.Function; + import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotSame; import static org.junit.Assert.assertTrue; @@ -84,7 +86,6 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; public class TestOperatorImplGraph { - private void addOperatorRecursively(HashSet<OperatorImpl> s, OperatorImpl op) { List<OperatorImpl> operators = new ArrayList<>(); operators.add(op); @@ -193,25 +194,39 @@ public class TestOperatorImplGraph { } @Override - public void init(Config config, TaskContext context) { - if (perTaskFunctionMap.get(context.getTaskName()) == null) { - perTaskFunctionMap.put(context.getTaskName(), new HashMap<String, BaseTestFunction>() { { this.put(opId, BaseTestFunction.this); } }); + public void init(Context context) { + TaskName taskName = context.getTaskContext().getTaskModel().getTaskName(); + if (perTaskFunctionMap.get(taskName) == null) { + perTaskFunctionMap.put(taskName, new HashMap<String, BaseTestFunction>() { { this.put(opId, BaseTestFunction.this); } }); } else { - if (perTaskFunctionMap.get(context.getTaskName()).containsKey(opId)) { + if (perTaskFunctionMap.get(taskName).containsKey(opId)) { throw new IllegalStateException(String.format("Multiple init called for op %s in the same task instance %s", opId, this.taskName.getTaskName())); } - perTaskFunctionMap.get(context.getTaskName()).put(opId, this); + perTaskFunctionMap.get(taskName).put(opId, this); } - if (perTaskInitList.get(context.getTaskName()) == null) { - perTaskInitList.put(context.getTaskName(), new ArrayList<String>() { { this.add(opId); } }); + if (perTaskInitList.get(taskName) == null) { + perTaskInitList.put(taskName, new ArrayList<String>() { { this.add(opId); } }); } else { - perTaskInitList.get(context.getTaskName()).add(opId); + perTaskInitList.get(taskName).add(opId); } - this.taskName = context.getTaskName(); + this.taskName = taskName; this.numInitCalled++; } } + private Context context; + + @Before + public void setup() { + this.context = new MockContext(); + // individual tests can override this config if necessary + when(this.context.getJobContext().getConfig()).thenReturn(mock(Config.class)); + TaskModel taskModel = mock(TaskModel.class); + when(taskModel.getTaskName()).thenReturn(new TaskName("task 0")); + when(this.context.getTaskContext().getTaskModel()).thenReturn(taskModel); + when(this.context.getTaskContext().getTaskMetricsRegistry()).thenReturn(new MetricsRegistryMap()); + } + @After public void tearDown() { BaseTestFunction.reset(); @@ -220,8 +235,7 @@ public class TestOperatorImplGraph { @Test public void testEmptyChain() { StreamApplicationDescriptorImpl graphSpec = new StreamApplicationDescriptorImpl(appDesc -> { }, mock(Config.class)); - OperatorImplGraph opGraph = - new OperatorImplGraph(graphSpec.getOperatorSpecGraph(), mock(Config.class), mock(TaskContextImpl.class), mock(Clock.class)); + OperatorImplGraph opGraph = new OperatorImplGraph(graphSpec.getOperatorSpecGraph(), context, mock(Clock.class)); assertEquals(0, opGraph.getAllInputOperators().size()); } @@ -242,6 +256,7 @@ public class TestOperatorImplGraph { StreamTestUtils.addStreamConfigs(configs, inputStreamId, inputSystem, inputPhysicalName); StreamTestUtils.addStreamConfigs(configs, outputStreamId, outputSystem, outputPhysicalName); Config config = new MapConfig(configs); + when(this.context.getJobContext().getConfig()).thenReturn(config); StreamApplicationDescriptorImpl graphSpec = new StreamApplicationDescriptorImpl(appDesc -> { GenericSystemDescriptor sd = new GenericSystemDescriptor(inputSystem, "mockFactoryClass"); @@ -256,11 +271,8 @@ public class TestOperatorImplGraph { .sendTo(outputStream); }, config); - TaskContextImpl mockTaskContext = mock(TaskContextImpl.class); - when(mockTaskContext.getMetricsRegistry()).thenReturn(new MetricsRegistryMap()); - when(mockTaskContext.getTaskName()).thenReturn(new TaskName("task 0")); OperatorImplGraph opImplGraph = - new OperatorImplGraph(graphSpec.getOperatorSpecGraph(), config, mockTaskContext, mock(Clock.class)); + new OperatorImplGraph(graphSpec.getOperatorSpecGraph(), this.context, mock(Clock.class)); InputOperatorImpl inputOpImpl = opImplGraph.getInputOperator(new SystemStream(inputSystem, inputPhysicalName)); assertEquals(1, inputOpImpl.registeredOperators.size()); @@ -296,6 +308,7 @@ public class TestOperatorImplGraph { StreamTestUtils.addStreamConfigs(configs, inputStreamId, inputSystem, inputPhysicalName); StreamTestUtils.addStreamConfigs(configs, outputStreamId, outputSystem, outputPhysicalName); Config config = new MapConfig(configs); + when(this.context.getJobContext().getConfig()).thenReturn(config); StreamApplicationDescriptorImpl graphSpec = new StreamApplicationDescriptorImpl(appDesc -> { GenericSystemDescriptor isd = new GenericSystemDescriptor(inputSystem, "mockFactoryClass"); @@ -312,21 +325,15 @@ public class TestOperatorImplGraph { .sendTo(outputStream); }, config); - TaskContextImpl mockTaskContext = mock(TaskContextImpl.class); - when(mockTaskContext.getMetricsRegistry()).thenReturn(new MetricsRegistryMap()); - when(mockTaskContext.getTaskName()).thenReturn(new TaskName("task 0")); JobModel jobModel = mock(JobModel.class); ContainerModel containerModel = mock(ContainerModel.class); TaskModel taskModel = mock(TaskModel.class); when(jobModel.getContainers()).thenReturn(Collections.singletonMap("0", containerModel)); when(containerModel.getTasks()).thenReturn(Collections.singletonMap(new TaskName("task 0"), taskModel)); when(taskModel.getSystemStreamPartitions()).thenReturn(Collections.emptySet()); - when(mockTaskContext.getJobModel()).thenReturn(jobModel); - SamzaContainerContext containerContext = - new SamzaContainerContext("0", config, Collections.singleton(new TaskName("task 0")), new MetricsRegistryMap()); - when(mockTaskContext.getSamzaContainerContext()).thenReturn(containerContext); + when(((TaskContextImpl) this.context.getTaskContext()).getJobModel()).thenReturn(jobModel); OperatorImplGraph opImplGraph = - new OperatorImplGraph(graphSpec.getOperatorSpecGraph(), config, mockTaskContext, mock(Clock.class)); + new OperatorImplGraph(graphSpec.getOperatorSpecGraph(), this.context, mock(Clock.class)); InputOperatorImpl inputOpImpl = opImplGraph.getInputOperator(new SystemStream(inputSystem, inputPhysicalName)); assertEquals(1, inputOpImpl.registeredOperators.size()); @@ -352,6 +359,7 @@ public class TestOperatorImplGraph { HashMap<String, String> configMap = new HashMap<>(); StreamTestUtils.addStreamConfigs(configMap, inputStreamId, inputSystem, inputPhysicalName); Config config = new MapConfig(configMap); + when(this.context.getJobContext().getConfig()).thenReturn(config); StreamApplicationDescriptorImpl graphSpec = new StreamApplicationDescriptorImpl(appDesc -> { GenericSystemDescriptor sd = new GenericSystemDescriptor(inputSystem, "mockFactoryClass"); GenericInputDescriptor inputDescriptor = sd.getInputDescriptor(inputStreamId, mock(Serde.class)); @@ -360,10 +368,8 @@ public class TestOperatorImplGraph { inputStream.map(mock(MapFunction.class)); }, config); - TaskContextImpl mockTaskContext = mock(TaskContextImpl.class); - when(mockTaskContext.getMetricsRegistry()).thenReturn(new MetricsRegistryMap()); OperatorImplGraph opImplGraph = - new OperatorImplGraph(graphSpec.getOperatorSpecGraph(), config, mockTaskContext, mock(Clock.class)); + new OperatorImplGraph(graphSpec.getOperatorSpecGraph(), this.context, mock(Clock.class)); InputOperatorImpl inputOpImpl = opImplGraph.getInputOperator(new SystemStream(inputSystem, inputPhysicalName)); assertEquals(2, inputOpImpl.registeredOperators.size()); @@ -377,10 +383,6 @@ public class TestOperatorImplGraph { public void testMergeChain() { String inputStreamId = "input"; String inputSystem = "input-system"; - String inputPhysicalName = "input-stream"; - HashMap<String, String> configs = new HashMap<>(); - StreamTestUtils.addStreamConfigs(configs, inputStreamId, inputSystem, inputPhysicalName); - Config config = new MapConfig(configs); StreamApplicationDescriptorImpl graphSpec = new StreamApplicationDescriptorImpl(appDesc -> { GenericSystemDescriptor sd = new GenericSystemDescriptor(inputSystem, "mockFactoryClass"); GenericInputDescriptor inputDescriptor = sd.getInputDescriptor(inputStreamId, mock(Serde.class)); @@ -390,13 +392,14 @@ public class TestOperatorImplGraph { stream1.merge(Collections.singleton(stream2)) .map(new TestMapFunction<Object, Object>("test-map-1", (Function & Serializable) m -> m)); }, mock(Config.class)); - TaskContextImpl mockTaskContext = mock(TaskContextImpl.class); + TaskName mockTaskName = mock(TaskName.class); - when(mockTaskContext.getTaskName()).thenReturn(mockTaskName); - when(mockTaskContext.getMetricsRegistry()).thenReturn(new MetricsRegistryMap()); + TaskModel taskModel = mock(TaskModel.class); + when(taskModel.getTaskName()).thenReturn(mockTaskName); + when(this.context.getTaskContext().getTaskModel()).thenReturn(taskModel); OperatorImplGraph opImplGraph = - new OperatorImplGraph(graphSpec.getOperatorSpecGraph(), mock(Config.class), mockTaskContext, mock(Clock.class)); + new OperatorImplGraph(graphSpec.getOperatorSpecGraph(), this.context, mock(Clock.class)); Set<OperatorImpl> opSet = opImplGraph.getAllInputOperators().stream().collect(HashSet::new, (s, op) -> addOperatorRecursively(s, op), HashSet::addAll); @@ -423,6 +426,7 @@ public class TestOperatorImplGraph { StreamTestUtils.addStreamConfigs(configs, inputStreamId1, inputSystem, inputPhysicalName1); StreamTestUtils.addStreamConfigs(configs, inputStreamId2, inputSystem, inputPhysicalName2); Config config = new MapConfig(configs); + when(this.context.getJobContext().getConfig()).thenReturn(config); Integer joinKey = new Integer(1); Function<Object, Integer> keyFn = (Function & Serializable) m -> joinKey; @@ -441,15 +445,16 @@ public class TestOperatorImplGraph { }, config); TaskName mockTaskName = mock(TaskName.class); - TaskContextImpl mockTaskContext = mock(TaskContextImpl.class); - when(mockTaskContext.getTaskName()).thenReturn(mockTaskName); - when(mockTaskContext.getMetricsRegistry()).thenReturn(new MetricsRegistryMap()); + TaskModel taskModel = mock(TaskModel.class); + when(taskModel.getTaskName()).thenReturn(mockTaskName); + when(this.context.getTaskContext().getTaskModel()).thenReturn(taskModel); + KeyValueStore mockLeftStore = mock(KeyValueStore.class); - when(mockTaskContext.getStore(eq("jobName-jobId-join-j1-L"))).thenReturn(mockLeftStore); + when(this.context.getTaskContext().getStore(eq("jobName-jobId-join-j1-L"))).thenReturn(mockLeftStore); KeyValueStore mockRightStore = mock(KeyValueStore.class); - when(mockTaskContext.getStore(eq("jobName-jobId-join-j1-R"))).thenReturn(mockRightStore); + when(this.context.getTaskContext().getStore(eq("jobName-jobId-join-j1-R"))).thenReturn(mockRightStore); OperatorImplGraph opImplGraph = - new OperatorImplGraph(graphSpec.getOperatorSpecGraph(), config, mockTaskContext, mock(Clock.class)); + new OperatorImplGraph(graphSpec.getOperatorSpecGraph(), this.context, mock(Clock.class)); // verify that join function is initialized once. assertEquals(TestJoinFunction.getInstanceByTaskName(mockTaskName, "jobName-jobId-join-j1").numInitCalled, 1); @@ -491,10 +496,12 @@ public class TestOperatorImplGraph { String inputStreamId2 = "input2"; String inputSystem = "input-system"; Config mockConfig = mock(Config.class); + TaskName mockTaskName = mock(TaskName.class); - TaskContextImpl mockContext = mock(TaskContextImpl.class); - when(mockContext.getTaskName()).thenReturn(mockTaskName); - when(mockContext.getMetricsRegistry()).thenReturn(new MetricsRegistryMap()); + TaskModel taskModel = mock(TaskModel.class); + when(taskModel.getTaskName()).thenReturn(mockTaskName); + when(this.context.getTaskContext().getTaskModel()).thenReturn(taskModel); + StreamApplicationDescriptorImpl graphSpec = new StreamApplicationDescriptorImpl(appDesc -> { GenericSystemDescriptor sd = new GenericSystemDescriptor(inputSystem, "mockFactoryClass"); GenericInputDescriptor inputDescriptor1 = sd.getInputDescriptor(inputStreamId1, mock(Serde.class)); @@ -510,7 +517,7 @@ public class TestOperatorImplGraph { .map(new TestMapFunction<Object, Object>("4", mapFn)); }, mockConfig); - OperatorImplGraph opImplGraph = new OperatorImplGraph(graphSpec.getOperatorSpecGraph(), mockConfig, mockContext, SystemClock.instance()); + OperatorImplGraph opImplGraph = new OperatorImplGraph(graphSpec.getOperatorSpecGraph(), this.context, SystemClock.instance()); List<String> initializedOperators = BaseTestFunction.getInitListByTaskName(mockTaskName); @@ -541,6 +548,7 @@ public class TestOperatorImplGraph { StreamTestUtils.addStreamConfigs(configs, streamId0, system, streamId0); StreamTestUtils.addStreamConfigs(configs, streamId1, system, streamId1); Config config = new MapConfig(configs); + when(this.context.getJobContext().getConfig()).thenReturn(config); SystemStreamPartition ssp0 = new SystemStreamPartition(system, streamId0, new Partition(0)); SystemStreamPartition ssp1 = new SystemStreamPartition(system, streamId0, new Partition(1)); @@ -590,6 +598,7 @@ public class TestOperatorImplGraph { StreamTestUtils.addStreamConfigs(configs, outputStreamId1, outputSystem, outputStreamId1); StreamTestUtils.addStreamConfigs(configs, outputStreamId2, outputSystem, outputStreamId2); Config config = new MapConfig(configs); + when(this.context.getJobContext().getConfig()).thenReturn(config); StreamApplicationDescriptorImpl graphSpec = new StreamApplicationDescriptorImpl(appDesc -> { GenericSystemDescriptor isd = new GenericSystemDescriptor(inputSystem, "mockFactoryClass"); @@ -640,14 +649,6 @@ public class TestOperatorImplGraph { String inputSystem1 = "system1"; String inputSystem2 = "system2"; - HashMap<String, String> configs = new HashMap<>(); - configs.put(JobConfig.JOB_NAME(), "test-app"); - configs.put(JobConfig.JOB_DEFAULT_SYSTEM(), inputSystem1); - StreamTestUtils.addStreamConfigs(configs, inputStreamId1, inputSystem1, inputStreamId1); - StreamTestUtils.addStreamConfigs(configs, inputStreamId2, inputSystem2, inputStreamId2); - StreamTestUtils.addStreamConfigs(configs, inputStreamId3, inputSystem2, inputStreamId3); - Config config = new MapConfig(configs); - SystemStream input1 = new SystemStream("system1", "intput1"); SystemStream input2 = new SystemStream("system2", "intput2"); SystemStream input3 = new SystemStream("system2", "intput3"); http://git-wip-us.apache.org/repos/asf/samza/blob/9d2d49e9/samza-core/src/test/java/org/apache/samza/operators/impl/TestSinkOperatorImpl.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/operators/impl/TestSinkOperatorImpl.java b/samza-core/src/test/java/org/apache/samza/operators/impl/TestSinkOperatorImpl.java index dc94e36..dfd8657 100644 --- a/samza-core/src/test/java/org/apache/samza/operators/impl/TestSinkOperatorImpl.java +++ b/samza-core/src/test/java/org/apache/samza/operators/impl/TestSinkOperatorImpl.java @@ -18,12 +18,10 @@ */ package org.apache.samza.operators.impl; -import org.apache.samza.config.Config; import org.apache.samza.operators.data.TestOutputMessageEnvelope; import org.apache.samza.operators.functions.SinkFunction; import org.apache.samza.operators.spec.SinkOperatorSpec; import org.apache.samza.task.MessageCollector; -import org.apache.samza.task.TaskContext; import org.apache.samza.task.TaskCoordinator; import org.junit.Test; @@ -69,9 +67,6 @@ public class TestSinkOperatorImpl { private SinkOperatorImpl createSinkOperator(SinkFunction<TestOutputMessageEnvelope> sinkFn) { SinkOperatorSpec<TestOutputMessageEnvelope> sinkOp = mock(SinkOperatorSpec.class); when(sinkOp.getSinkFn()).thenReturn(sinkFn); - - Config mockConfig = mock(Config.class); - TaskContext mockContext = mock(TaskContext.class); - return new SinkOperatorImpl<>(sinkOp, mockConfig, mockContext); + return new SinkOperatorImpl<>(sinkOp); } } http://git-wip-us.apache.org/repos/asf/samza/blob/9d2d49e9/samza-core/src/test/java/org/apache/samza/operators/impl/TestStreamOperatorImpl.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/operators/impl/TestStreamOperatorImpl.java b/samza-core/src/test/java/org/apache/samza/operators/impl/TestStreamOperatorImpl.java index 873cd3c..ae05305 100644 --- a/samza-core/src/test/java/org/apache/samza/operators/impl/TestStreamOperatorImpl.java +++ b/samza-core/src/test/java/org/apache/samza/operators/impl/TestStreamOperatorImpl.java @@ -18,13 +18,11 @@ */ package org.apache.samza.operators.impl; -import org.apache.samza.config.Config; import org.apache.samza.operators.data.TestMessageEnvelope; import org.apache.samza.operators.data.TestOutputMessageEnvelope; import org.apache.samza.operators.functions.FlatMapFunction; import org.apache.samza.operators.spec.StreamOperatorSpec; import org.apache.samza.task.MessageCollector; -import org.apache.samza.task.TaskContext; import org.apache.samza.task.TaskCoordinator; import org.junit.Test; @@ -45,8 +43,6 @@ public class TestStreamOperatorImpl { StreamOperatorSpec<TestMessageEnvelope, TestOutputMessageEnvelope> mockOp = mock(StreamOperatorSpec.class); FlatMapFunction<TestMessageEnvelope, TestOutputMessageEnvelope> txfmFn = mock(FlatMapFunction.class); when(mockOp.getTransformFn()).thenReturn(txfmFn); - Config mockConfig = mock(Config.class); - TaskContext mockContext = mock(TaskContext.class); StreamOperatorImpl<TestMessageEnvelope, TestOutputMessageEnvelope> opImpl = new StreamOperatorImpl<>(mockOp); TestMessageEnvelope inMsg = mock(TestMessageEnvelope.class); @@ -65,8 +61,6 @@ public class TestStreamOperatorImpl { StreamOperatorSpec<TestMessageEnvelope, TestOutputMessageEnvelope> mockOp = mock(StreamOperatorSpec.class); FlatMapFunction<TestMessageEnvelope, TestOutputMessageEnvelope> txfmFn = mock(FlatMapFunction.class); when(mockOp.getTransformFn()).thenReturn(txfmFn); - Config mockConfig = mock(Config.class); - TaskContext mockContext = mock(TaskContext.class); StreamOperatorImpl<TestMessageEnvelope, TestOutputMessageEnvelope> opImpl = new StreamOperatorImpl<>(mockOp); http://git-wip-us.apache.org/repos/asf/samza/blob/9d2d49e9/samza-core/src/test/java/org/apache/samza/operators/impl/TestStreamTableJoinOperatorImpl.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/operators/impl/TestStreamTableJoinOperatorImpl.java b/samza-core/src/test/java/org/apache/samza/operators/impl/TestStreamTableJoinOperatorImpl.java index d8b2e8d..9083495 100644 --- a/samza-core/src/test/java/org/apache/samza/operators/impl/TestStreamTableJoinOperatorImpl.java +++ b/samza-core/src/test/java/org/apache/samza/operators/impl/TestStreamTableJoinOperatorImpl.java @@ -18,10 +18,10 @@ */ package org.apache.samza.operators.impl; -import java.util.Collection; - +import junit.framework.Assert; import org.apache.samza.SamzaException; -import org.apache.samza.config.Config; +import org.apache.samza.context.Context; +import org.apache.samza.context.MockContext; import org.apache.samza.operators.KV; import org.apache.samza.operators.data.TestMessageEnvelope; import org.apache.samza.operators.functions.StreamTableJoinFunction; @@ -29,11 +29,10 @@ import org.apache.samza.operators.spec.StreamTableJoinOperatorSpec; import org.apache.samza.table.ReadableTable; import org.apache.samza.table.TableSpec; import org.apache.samza.task.MessageCollector; -import org.apache.samza.task.TaskContext; import org.apache.samza.task.TaskCoordinator; import org.junit.Test; -import junit.framework.Assert; +import java.util.Collection; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -75,18 +74,16 @@ public class TestStreamTableJoinOperatorImpl { return record.getKey(); } }); - Config config = mock(Config.class); ReadableTable table = mock(ReadableTable.class); when(table.get("1")).thenReturn("r1"); when(table.get("2")).thenReturn(null); - TaskContext mockTaskContext = mock(TaskContext.class); - when(mockTaskContext.getTable(tableId)).thenReturn(table); + Context context = new MockContext(); + when(context.getTaskContext().getTable(tableId)).thenReturn(table); MessageCollector mockMessageCollector = mock(MessageCollector.class); TaskCoordinator mockTaskCoordinator = mock(TaskCoordinator.class); - StreamTableJoinOperatorImpl streamTableJoinOperator = new StreamTableJoinOperatorImpl( - mockJoinOpSpec, config, mockTaskContext); + StreamTableJoinOperatorImpl streamTableJoinOperator = new StreamTableJoinOperatorImpl(mockJoinOpSpec, context); // Table has the key Collection<TestMessageEnvelope> result; http://git-wip-us.apache.org/repos/asf/samza/blob/9d2d49e9/samza-core/src/test/java/org/apache/samza/operators/impl/TestWindowOperator.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/operators/impl/TestWindowOperator.java b/samza-core/src/test/java/org/apache/samza/operators/impl/TestWindowOperator.java index 7d468c9..20d5e25 100644 --- a/samza-core/src/test/java/org/apache/samza/operators/impl/TestWindowOperator.java +++ b/samza-core/src/test/java/org/apache/samza/operators/impl/TestWindowOperator.java @@ -30,13 +30,15 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import org.apache.samza.Partition; -import org.apache.samza.application.StreamApplicationDescriptorImpl; import org.apache.samza.application.StreamApplication; +import org.apache.samza.application.StreamApplicationDescriptorImpl; import org.apache.samza.config.Config; -import org.apache.samza.config.JobConfig; import org.apache.samza.config.MapConfig; -import org.apache.samza.container.TaskContextImpl; import org.apache.samza.container.TaskName; +import org.apache.samza.context.Context; +import org.apache.samza.context.MockContext; +import org.apache.samza.context.TaskContextImpl; +import org.apache.samza.job.model.TaskModel; import org.apache.samza.metrics.MetricsRegistryMap; import org.apache.samza.operators.KV; import org.apache.samza.operators.MessageStream; @@ -67,8 +69,6 @@ import org.junit.Assert; import org.junit.Before; import org.junit.Test; -import static org.mockito.Matchers.anyString; -import static org.mockito.Matchers.eq; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; @@ -77,41 +77,41 @@ import static org.mockito.Mockito.when; public class TestWindowOperator { private final TaskCoordinator taskCoordinator = mock(TaskCoordinator.class); private final List<Integer> integers = ImmutableList.of(1, 2, 1, 2, 1, 2, 1, 2, 3); + private Context context; private Config config; - private TaskContextImpl taskContext; @Before - public void setup() throws Exception { - config = mock(Config.class); - when(config.get(JobConfig.JOB_NAME())).thenReturn("jobName"); - when(config.get(eq(JobConfig.JOB_ID()), anyString())).thenReturn("jobId"); - taskContext = mock(TaskContextImpl.class); + public void setup() { + Map<String, String> configMap = new HashMap<>(); + configMap.put("job.default.system", "kafka"); + configMap.put("job.name", "jobName"); + configMap.put("job.id", "jobId"); + this.config = new MapConfig(configMap); + + this.context = new MockContext(); + when(this.context.getJobContext().getConfig()).thenReturn(this.config); Serde storeKeySerde = new TimeSeriesKeySerde(new IntegerSerde()); Serde storeValSerde = KVSerde.of(new IntegerSerde(), new IntegerSerde()); - when(taskContext.getSystemStreamPartitions()).thenReturn(ImmutableSet + TaskModel taskModel = mock(TaskModel.class); + when(taskModel.getSystemStreamPartitions()).thenReturn(ImmutableSet .of(new SystemStreamPartition("kafka", "integTestExecutionPlannerers", new Partition(0)))); - when(taskContext.getMetricsRegistry()).thenReturn(new MetricsRegistryMap()); - when(taskContext.getStore("jobName-jobId-window-w1")) + when(taskModel.getTaskName()).thenReturn(new TaskName("task 1")); + when(this.context.getTaskContext().getTaskModel()).thenReturn(taskModel); + when(this.context.getTaskContext().getTaskMetricsRegistry()).thenReturn(new MetricsRegistryMap()); + when(this.context.getTaskContext().getStore("jobName-jobId-window-w1")) .thenReturn(new TestInMemoryStore<>(storeKeySerde, storeValSerde)); - - Map<String, String> mapConfig = new HashMap<>(); - mapConfig.put("job.default.system", "kafka"); - mapConfig.put("job.name", "jobName"); - mapConfig.put("job.id", "jobId"); - config = new MapConfig(mapConfig); } @Test public void testTumblingWindowsDiscardingMode() throws Exception { - OperatorSpecGraph sgb = this.getKeyedTumblingWindowStreamGraph(AccumulationMode.DISCARDING, Duration.ofSeconds(1), Triggers.repeat(Triggers.count(2))).getOperatorSpecGraph(); List<WindowPane<Integer, Collection<IntegerEnvelope>>> windowPanes = new ArrayList<>(); TestClock testClock = new TestClock(); - StreamOperatorTask task = new StreamOperatorTask(sgb, null, testClock); - task.init(config, taskContext); + StreamOperatorTask task = new StreamOperatorTask(sgb, testClock); + task.init(this.context); MessageCollector messageCollector = envelope -> windowPanes.add((WindowPane<Integer, Collection<IntegerEnvelope>>) envelope.getMessage()); integers.forEach(n -> task.process(new IntegerEnvelope(n), messageCollector, taskCoordinator)); @@ -143,8 +143,8 @@ public class TestWindowOperator { List<WindowPane<Integer, Collection<IntegerEnvelope>>> windowPanes = new ArrayList<>(); TestClock testClock = new TestClock(); - StreamOperatorTask task = new StreamOperatorTask(sgb, null, testClock); - task.init(config, taskContext); + StreamOperatorTask task = new StreamOperatorTask(sgb, testClock); + task.init(this.context); MessageCollector messageCollector = envelope -> windowPanes.add((WindowPane<Integer, Collection<IntegerEnvelope>>) envelope.getMessage()); @@ -163,8 +163,7 @@ public class TestWindowOperator { @Test public void testTumblingAggregatingWindowsDiscardingMode() throws Exception { - - when(taskContext.getStore("jobName-jobId-window-w1")) + when(this.context.getTaskContext().getStore("jobName-jobId-window-w1")) .thenReturn(new TestInMemoryStore<>(new TimeSeriesKeySerde(new IntegerSerde()), new IntegerSerde())); OperatorSpecGraph sgb = this.getAggregateTumblingWindowStreamGraph(AccumulationMode.DISCARDING, @@ -172,8 +171,8 @@ public class TestWindowOperator { List<WindowPane<Integer, Integer>> windowPanes = new ArrayList<>(); TestClock testClock = new TestClock(); - StreamOperatorTask task = new StreamOperatorTask(sgb, null, testClock); - task.init(config, taskContext); + StreamOperatorTask task = new StreamOperatorTask(sgb, testClock); + task.init(this.context); MessageCollector messageCollector = envelope -> windowPanes.add((WindowPane<Integer, Integer>) envelope.getMessage()); integers.forEach(n -> task.process(new IntegerEnvelope(n), messageCollector, taskCoordinator)); testClock.advanceTime(Duration.ofSeconds(1)); @@ -193,8 +192,8 @@ public class TestWindowOperator { Duration.ofSeconds(1), Triggers.repeat(Triggers.count(2))).getOperatorSpecGraph(); List<WindowPane<Integer, Collection<IntegerEnvelope>>> windowPanes = new ArrayList<>(); TestClock testClock = new TestClock(); - StreamOperatorTask task = new StreamOperatorTask(sgb, null, testClock); - task.init(config, taskContext); + StreamOperatorTask task = new StreamOperatorTask(sgb, testClock); + task.init(this.context); MessageCollector messageCollector = envelope -> windowPanes.add((WindowPane<Integer, Collection<IntegerEnvelope>>) envelope.getMessage()); @@ -222,8 +221,8 @@ public class TestWindowOperator { this.getKeyedSessionWindowStreamGraph(AccumulationMode.DISCARDING, Duration.ofMillis(500)).getOperatorSpecGraph(); TestClock testClock = new TestClock(); List<WindowPane<Integer, Collection<IntegerEnvelope>>> windowPanes = new ArrayList<>(); - StreamOperatorTask task = new StreamOperatorTask(sgb, null, testClock); - task.init(config, taskContext); + StreamOperatorTask task = new StreamOperatorTask(sgb, testClock); + task.init(this.context); MessageCollector messageCollector = envelope -> windowPanes.add((WindowPane<Integer, Collection<IntegerEnvelope>>) envelope.getMessage()); task.process(new IntegerEnvelope(1), messageCollector, taskCoordinator); @@ -267,12 +266,12 @@ public class TestWindowOperator { OperatorSpecGraph sgb = this.getKeyedSessionWindowStreamGraph(AccumulationMode.DISCARDING, Duration.ofMillis(500)).getOperatorSpecGraph(); TestClock testClock = new TestClock(); - StreamOperatorTask task = new StreamOperatorTask(sgb, null, testClock); + StreamOperatorTask task = new StreamOperatorTask(sgb, testClock); List<WindowPane<Integer, Collection<IntegerEnvelope>>> windowPanes = new ArrayList<>(); MessageCollector messageCollector = envelope -> windowPanes.add((WindowPane<Integer, Collection<IntegerEnvelope>>) envelope.getMessage()); - task.init(config, taskContext); + task.init(this.context); task.process(new IntegerEnvelope(1), messageCollector, taskCoordinator); task.process(new IntegerEnvelope(1), messageCollector, taskCoordinator); @@ -299,8 +298,8 @@ public class TestWindowOperator { OperatorSpecGraph sgb = this.getKeyedTumblingWindowStreamGraph(AccumulationMode.ACCUMULATING, Duration.ofSeconds(1), Triggers.count(2)).getOperatorSpecGraph(); TestClock testClock = new TestClock(); - StreamOperatorTask task = new StreamOperatorTask(sgb, null, testClock); - task.init(config, taskContext); + StreamOperatorTask task = new StreamOperatorTask(sgb, testClock); + task.init(this.context); List<WindowPane<Integer, Collection<IntegerEnvelope>>> windowPanes = new ArrayList<>(); MessageCollector messageCollector = @@ -343,8 +342,8 @@ public class TestWindowOperator { OperatorSpecGraph sgb = this.getKeyedTumblingWindowStreamGraph(AccumulationMode.ACCUMULATING, Duration.ofSeconds(1), Triggers.any(Triggers.count(2), Triggers.timeSinceFirstMessage(Duration.ofMillis(500)))).getOperatorSpecGraph(); TestClock testClock = new TestClock(); - StreamOperatorTask task = new StreamOperatorTask(sgb, null, testClock); - task.init(config, taskContext); + StreamOperatorTask task = new StreamOperatorTask(sgb, testClock); + task.init(this.context); List<WindowPane<Integer, Collection<IntegerEnvelope>>> windowPanes = new ArrayList<>(); MessageCollector messageCollector = @@ -406,8 +405,8 @@ public class TestWindowOperator { envelope -> windowPanes.add((WindowPane<Integer, Collection<IntegerEnvelope>>) envelope.getMessage()); TestClock testClock = new TestClock(); - StreamOperatorTask task = new StreamOperatorTask(sgb, null, testClock); - task.init(config, taskContext); + StreamOperatorTask task = new StreamOperatorTask(sgb, testClock); + task.init(this.context); task.process(new IntegerEnvelope(1), messageCollector, taskCoordinator); @@ -439,17 +438,18 @@ public class TestWindowOperator { EndOfStreamStates endOfStreamStates = new EndOfStreamStates(ImmutableSet.of(new SystemStreamPartition("kafka", "integers", new Partition(0))), Collections.emptyMap()); - when(taskContext.getTaskName()).thenReturn(new TaskName("task 1")); - when(taskContext.fetchObject(EndOfStreamStates.class.getName())).thenReturn(endOfStreamStates); - when(taskContext.fetchObject(WatermarkStates.class.getName())).thenReturn(mock(WatermarkStates.class)); + when(((TaskContextImpl) this.context.getTaskContext()).fetchObject(EndOfStreamStates.class.getName())).thenReturn( + endOfStreamStates); + when(((TaskContextImpl) this.context.getTaskContext()).fetchObject(WatermarkStates.class.getName())).thenReturn( + mock(WatermarkStates.class)); OperatorSpecGraph sgb = this.getTumblingWindowStreamGraph(AccumulationMode.DISCARDING, Duration.ofSeconds(1), Triggers.repeat(Triggers.count(2))).getOperatorSpecGraph(); List<WindowPane<Integer, Collection<IntegerEnvelope>>> windowPanes = new ArrayList<>(); TestClock testClock = new TestClock(); - StreamOperatorTask task = new StreamOperatorTask(sgb, null, testClock); - task.init(config, taskContext); + StreamOperatorTask task = new StreamOperatorTask(sgb, testClock); + task.init(this.context); MessageCollector messageCollector = envelope -> windowPanes.add((WindowPane<Integer, Collection<IntegerEnvelope>>) envelope.getMessage()); @@ -480,16 +480,17 @@ public class TestWindowOperator { EndOfStreamStates endOfStreamStates = new EndOfStreamStates(ImmutableSet.of(new SystemStreamPartition("kafka", "integers", new Partition(0))), Collections.emptyMap()); - when(taskContext.getTaskName()).thenReturn(new TaskName("task 1")); - when(taskContext.fetchObject(EndOfStreamStates.class.getName())).thenReturn(endOfStreamStates); - when(taskContext.fetchObject(WatermarkStates.class.getName())).thenReturn(mock(WatermarkStates.class)); + when(((TaskContextImpl) this.context.getTaskContext()).fetchObject(EndOfStreamStates.class.getName())).thenReturn( + endOfStreamStates); + when(((TaskContextImpl) this.context.getTaskContext()).fetchObject(WatermarkStates.class.getName())).thenReturn( + mock(WatermarkStates.class)); OperatorSpecGraph sgb = this.getKeyedSessionWindowStreamGraph(AccumulationMode.DISCARDING, Duration.ofMillis(500)).getOperatorSpecGraph(); TestClock testClock = new TestClock(); List<WindowPane<Integer, Collection<IntegerEnvelope>>> windowPanes = new ArrayList<>(); - StreamOperatorTask task = new StreamOperatorTask(sgb, null, testClock); - task.init(config, taskContext); + StreamOperatorTask task = new StreamOperatorTask(sgb, testClock); + task.init(this.context); MessageCollector messageCollector = envelope -> windowPanes.add((WindowPane<Integer, Collection<IntegerEnvelope>>) envelope.getMessage()); @@ -517,16 +518,17 @@ public class TestWindowOperator { EndOfStreamStates endOfStreamStates = new EndOfStreamStates(ImmutableSet.of(new SystemStreamPartition("kafka", "integers", new Partition(0))), Collections.emptyMap()); - when(taskContext.getTaskName()).thenReturn(new TaskName("task 1")); - when(taskContext.fetchObject(EndOfStreamStates.class.getName())).thenReturn(endOfStreamStates); - when(taskContext.fetchObject(WatermarkStates.class.getName())).thenReturn(mock(WatermarkStates.class)); + when(((TaskContextImpl) this.context.getTaskContext()).fetchObject(EndOfStreamStates.class.getName())).thenReturn( + endOfStreamStates); + when(((TaskContextImpl) this.context.getTaskContext()).fetchObject(WatermarkStates.class.getName())).thenReturn( + mock(WatermarkStates.class)); OperatorSpecGraph sgb = this.getKeyedSessionWindowStreamGraph(AccumulationMode.DISCARDING, Duration.ofMillis(500)).getOperatorSpecGraph(); TestClock testClock = new TestClock(); List<WindowPane<Integer, Collection<IntegerEnvelope>>> windowPanes = new ArrayList<>(); - StreamOperatorTask task = new StreamOperatorTask(sgb, null, testClock); - task.init(config, taskContext); + StreamOperatorTask task = new StreamOperatorTask(sgb, testClock); + task.init(this.context); MessageCollector messageCollector = envelope -> windowPanes.add((WindowPane<Integer, Collection<IntegerEnvelope>>) envelope.getMessage()); http://git-wip-us.apache.org/repos/asf/samza/blob/9d2d49e9/samza-core/src/test/java/org/apache/samza/operators/spec/TestOperatorSpec.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/operators/spec/TestOperatorSpec.java b/samza-core/src/test/java/org/apache/samza/operators/spec/TestOperatorSpec.java index 860e630..6e91e2a 100644 --- a/samza-core/src/test/java/org/apache/samza/operators/spec/TestOperatorSpec.java +++ b/samza-core/src/test/java/org/apache/samza/operators/spec/TestOperatorSpec.java @@ -31,9 +31,9 @@ import org.apache.samza.operators.functions.FilterFunction; import org.apache.samza.operators.functions.FlatMapFunction; import org.apache.samza.operators.functions.JoinFunction; import org.apache.samza.operators.functions.MapFunction; +import org.apache.samza.operators.functions.ScheduledFunction; import org.apache.samza.operators.functions.SinkFunction; import org.apache.samza.operators.functions.StreamTableJoinFunction; -import org.apache.samza.operators.functions.ScheduledFunction; import org.apache.samza.operators.functions.WatermarkFunction; import org.apache.samza.serializers.JsonSerdeV2; import org.apache.samza.serializers.KVSerde; http://git-wip-us.apache.org/repos/asf/samza/blob/9d2d49e9/samza-core/src/test/java/org/apache/samza/operators/spec/TestPartitionByOperatorSpec.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/operators/spec/TestPartitionByOperatorSpec.java b/samza-core/src/test/java/org/apache/samza/operators/spec/TestPartitionByOperatorSpec.java index e1342e3..fd4a7fb 100644 --- a/samza-core/src/test/java/org/apache/samza/operators/spec/TestPartitionByOperatorSpec.java +++ b/samza-core/src/test/java/org/apache/samza/operators/spec/TestPartitionByOperatorSpec.java @@ -23,9 +23,9 @@ import java.util.Map; import org.apache.samza.application.StreamApplicationDescriptorImpl; import org.apache.samza.config.Config; import org.apache.samza.config.JobConfig; -import org.apache.samza.operators.Scheduler; import org.apache.samza.operators.MessageStream; import org.apache.samza.operators.OperatorSpecGraph; +import org.apache.samza.operators.Scheduler; import org.apache.samza.operators.descriptors.GenericInputDescriptor; import org.apache.samza.operators.descriptors.GenericSystemDescriptor; import org.apache.samza.operators.functions.MapFunction; http://git-wip-us.apache.org/repos/asf/samza/blob/9d2d49e9/samza-core/src/test/java/org/apache/samza/operators/spec/TestWindowOperatorSpec.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/operators/spec/TestWindowOperatorSpec.java b/samza-core/src/test/java/org/apache/samza/operators/spec/TestWindowOperatorSpec.java index 41973b2..b73f8e3 100644 --- a/samza-core/src/test/java/org/apache/samza/operators/spec/TestWindowOperatorSpec.java +++ b/samza-core/src/test/java/org/apache/samza/operators/spec/TestWindowOperatorSpec.java @@ -20,16 +20,16 @@ package org.apache.samza.operators.spec; import org.apache.samza.operators.Scheduler; -import org.apache.samza.operators.functions.ScheduledFunction; -import org.apache.samza.operators.functions.WatermarkFunction; -import org.apache.samza.serializers.Serde; import org.apache.samza.operators.functions.FoldLeftFunction; import org.apache.samza.operators.functions.MapFunction; +import org.apache.samza.operators.functions.ScheduledFunction; import org.apache.samza.operators.functions.SupplierFunction; +import org.apache.samza.operators.functions.WatermarkFunction; import org.apache.samza.operators.triggers.Trigger; import org.apache.samza.operators.triggers.Triggers; import org.apache.samza.operators.windows.internal.WindowInternal; import org.apache.samza.operators.windows.internal.WindowType; +import org.apache.samza.serializers.Serde; import org.junit.Assert; import org.junit.Before; import org.junit.Test; @@ -38,7 +38,8 @@ import java.time.Duration; import java.util.ArrayList; import java.util.Collection; -import static org.junit.Assert.*; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; import static org.mockito.Mockito.mock; public class TestWindowOperatorSpec { http://git-wip-us.apache.org/repos/asf/samza/blob/9d2d49e9/samza-core/src/test/java/org/apache/samza/processor/TestStreamProcessor.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/processor/TestStreamProcessor.java b/samza-core/src/test/java/org/apache/samza/processor/TestStreamProcessor.java index 93b157a..b002e2a 100644 --- a/samza-core/src/test/java/org/apache/samza/processor/TestStreamProcessor.java +++ b/samza-core/src/test/java/org/apache/samza/processor/TestStreamProcessor.java @@ -49,7 +49,8 @@ import org.mockito.Mockito; import org.powermock.api.mockito.PowerMockito; import static org.junit.Assert.assertEquals; -import static org.mockito.Matchers.*; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyString; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; @@ -465,8 +466,10 @@ public class TestStreamProcessor { @Test public void testStreamProcessorWithStreamProcessorListenerFactory() { AtomicReference<MockStreamProcessorLifecycleListener> mockListener = new AtomicReference<>(); - StreamProcessor streamProcessor = new StreamProcessor(mock(Config.class), new HashMap<>(), mock(TaskFactory.class), - sp -> mockListener.updateAndGet(old -> new MockStreamProcessorLifecycleListener(sp)), mock(JobCoordinator.class)); + StreamProcessor streamProcessor = + new StreamProcessor(mock(Config.class), new HashMap<>(), mock(TaskFactory.class), null, null, + sp -> mockListener.updateAndGet(old -> new MockStreamProcessorLifecycleListener(sp)), + mock(JobCoordinator.class)); assertEquals(streamProcessor, mockListener.get().processor); } http://git-wip-us.apache.org/repos/asf/samza/blob/9d2d49e9/samza-core/src/test/java/org/apache/samza/storage/MockStorageEngineFactory.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/storage/MockStorageEngineFactory.java b/samza-core/src/test/java/org/apache/samza/storage/MockStorageEngineFactory.java index d483ae6..8eff4ad 100644 --- a/samza-core/src/test/java/org/apache/samza/storage/MockStorageEngineFactory.java +++ b/samza-core/src/test/java/org/apache/samza/storage/MockStorageEngineFactory.java @@ -20,8 +20,8 @@ package org.apache.samza.storage; import java.io.File; - -import org.apache.samza.container.SamzaContainerContext; +import org.apache.samza.context.ContainerContext; +import org.apache.samza.context.JobContext; import org.apache.samza.metrics.MetricsRegistry; import org.apache.samza.serializers.Serde; import org.apache.samza.system.SystemStreamPartition; @@ -29,9 +29,15 @@ import org.apache.samza.task.MessageCollector; public class MockStorageEngineFactory implements StorageEngineFactory<Object, Object> { @Override - public StorageEngine getStorageEngine(String storeName, File storeDir, Serde<Object> keySerde, Serde<Object> msgSerde, - MessageCollector collector, MetricsRegistry registry, SystemStreamPartition changeLogSystemStreamPartition, - SamzaContainerContext containerContext) { + public StorageEngine getStorageEngine(String storeName, + File storeDir, + Serde<Object> keySerde, + Serde<Object> msgSerde, + MessageCollector collector, + MetricsRegistry registry, + SystemStreamPartition changeLogSystemStreamPartition, + JobContext jobContext, + ContainerContext containerContext) { StoreProperties storeProperties = new StoreProperties.StorePropertiesBuilder().setLoggedStore(true).build(); return new MockStorageEngine(storeName, storeDir, changeLogSystemStreamPartition, storeProperties); } http://git-wip-us.apache.org/repos/asf/samza/blob/9d2d49e9/samza-core/src/test/java/org/apache/samza/table/TestTableManager.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/table/TestTableManager.java b/samza-core/src/test/java/org/apache/samza/table/TestTableManager.java index 42f05c0..0952a87 100644 --- a/samza-core/src/test/java/org/apache/samza/table/TestTableManager.java +++ b/samza-core/src/test/java/org/apache/samza/table/TestTableManager.java @@ -18,25 +18,23 @@ */ package org.apache.samza.table; -import java.lang.reflect.Field; -import java.util.Base64; -import java.util.HashMap; -import java.util.Map; - +import junit.framework.Assert; import org.apache.samza.SamzaException; import org.apache.samza.config.JavaTableConfig; import org.apache.samza.config.MapConfig; import org.apache.samza.config.SerializerConfig; -import org.apache.samza.container.SamzaContainerContext; +import org.apache.samza.context.MockContext; import org.apache.samza.serializers.IntegerSerde; import org.apache.samza.serializers.Serde; import org.apache.samza.serializers.SerializableSerde; import org.apache.samza.serializers.StringSerde; import org.apache.samza.storage.StorageEngine; -import org.apache.samza.task.TaskContext; import org.junit.Test; -import junit.framework.Assert; +import java.lang.reflect.Field; +import java.util.Base64; +import java.util.HashMap; +import java.util.Map; import static org.mockito.Matchers.anyObject; import static org.mockito.Mockito.mock; @@ -122,11 +120,11 @@ public class TestTableManager { }); TableManager tableManager = new TableManager(new MapConfig(map), serdeMap); - tableManager.init(mock(SamzaContainerContext.class), mock(TaskContext.class)); + tableManager.init(new MockContext()); for (int i = 0; i < 2; i++) { Table table = tableManager.getTable(TABLE_ID); - verify(DummyTableProviderFactory.tableProvider, times(1)).init(anyObject(), anyObject()); + verify(DummyTableProviderFactory.tableProvider, times(1)).init(anyObject()); verify(DummyTableProviderFactory.tableProvider, times(1)).getTable(); Assert.assertEquals(DummyTableProviderFactory.table, table); } http://git-wip-us.apache.org/repos/asf/samza/blob/9d2d49e9/samza-core/src/test/java/org/apache/samza/table/caching/TestCachingTable.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/table/caching/TestCachingTable.java b/samza-core/src/test/java/org/apache/samza/table/caching/TestCachingTable.java index ec1c915..dc13d00 100644 --- a/samza-core/src/test/java/org/apache/samza/table/caching/TestCachingTable.java +++ b/samza-core/src/test/java/org/apache/samza/table/caching/TestCachingTable.java @@ -19,19 +19,11 @@ package org.apache.samza.table.caching; -import java.time.Duration; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.Executors; - +import com.google.common.cache.Cache; +import com.google.common.cache.CacheBuilder; import org.apache.commons.lang3.tuple.Pair; -import org.apache.samza.container.SamzaContainerContext; +import org.apache.samza.context.Context; +import org.apache.samza.context.MockContext; import org.apache.samza.metrics.Counter; import org.apache.samza.metrics.Gauge; import org.apache.samza.metrics.MetricsRegistry; @@ -45,17 +37,24 @@ import org.apache.samza.table.TableSpec; import org.apache.samza.table.caching.guava.GuavaCacheTable; import org.apache.samza.table.caching.guava.GuavaCacheTableDescriptor; import org.apache.samza.table.caching.guava.GuavaCacheTableProvider; -import org.apache.samza.table.remote.TableRateLimiter; import org.apache.samza.table.remote.RemoteReadWriteTable; +import org.apache.samza.table.remote.TableRateLimiter; import org.apache.samza.table.remote.TableReadFunction; import org.apache.samza.table.remote.TableWriteFunction; -import org.apache.samza.task.TaskContext; import org.apache.samza.util.NoOpMetricsRegistry; import org.junit.Assert; import org.junit.Test; -import com.google.common.cache.Cache; -import com.google.common.cache.CacheBuilder; +import java.time.Duration; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Executors; import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyString; @@ -139,15 +138,14 @@ public class TestCachingTable { } private void initTables(ReadableTable ... tables) { - SamzaContainerContext containerContext = mock(SamzaContainerContext.class); - TaskContext taskContext = mock(TaskContext.class); + Context context = new MockContext(); MetricsRegistry metricsRegistry = mock(MetricsRegistry.class); doReturn(mock(Timer.class)).when(metricsRegistry).newTimer(anyString(), anyString()); doReturn(mock(Counter.class)).when(metricsRegistry).newCounter(anyString(), anyString()); doReturn(mock(Gauge.class)).when(metricsRegistry).newGauge(anyString(), any()); - when(taskContext.getMetricsRegistry()).thenReturn(metricsRegistry); + when(context.getTaskContext().getTaskMetricsRegistry()).thenReturn(metricsRegistry); for (ReadableTable table : tables) { - table.init(containerContext, taskContext); + table.init(context); } } @@ -160,9 +158,7 @@ public class TestCachingTable { } CachingTableProvider tableProvider = new CachingTableProvider(desc.getTableSpec()); - SamzaContainerContext containerContext = mock(SamzaContainerContext.class); - - TaskContext taskContext = mock(TaskContext.class); + Context context = new MockContext(); final ReadWriteTable cacheTable = getMockCache().getLeft(); final ReadWriteTable realTable = mock(ReadWriteTable.class); @@ -185,11 +181,11 @@ public class TestCachingTable { Assert.fail(); return null; - }).when(taskContext).getTable(anyString()); + }).when(context.getTaskContext()).getTable(anyString()); - when(taskContext.getMetricsRegistry()).thenReturn(new NoOpMetricsRegistry()); + when(context.getTaskContext().getTaskMetricsRegistry()).thenReturn(new NoOpMetricsRegistry()); - tableProvider.init(containerContext, taskContext); + tableProvider.init(context); CachingTable cachingTable = (CachingTable) tableProvider.getTable(); http://git-wip-us.apache.org/repos/asf/samza/blob/9d2d49e9/samza-core/src/test/java/org/apache/samza/table/remote/TestRemoteTable.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/table/remote/TestRemoteTable.java b/samza-core/src/test/java/org/apache/samza/table/remote/TestRemoteTable.java index 3e844c3..571f87b 100644 --- a/samza-core/src/test/java/org/apache/samza/table/remote/TestRemoteTable.java +++ b/samza-core/src/test/java/org/apache/samza/table/remote/TestRemoteTable.java @@ -19,17 +19,9 @@ package org.apache.samza.table.remote; -import java.util.Arrays; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; - -import org.apache.samza.container.SamzaContainerContext; +import junit.framework.Assert; +import org.apache.samza.context.Context; +import org.apache.samza.context.MockContext; import org.apache.samza.metrics.Counter; import org.apache.samza.metrics.Gauge; import org.apache.samza.metrics.MetricsRegistry; @@ -38,11 +30,18 @@ import org.apache.samza.storage.kv.Entry; import org.apache.samza.table.retry.RetriableReadFunction; import org.apache.samza.table.retry.RetriableWriteFunction; import org.apache.samza.table.retry.TableRetryPolicy; -import org.apache.samza.task.TaskContext; import org.junit.Test; import org.mockito.ArgumentCaptor; -import junit.framework.Assert; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyCollection; @@ -57,14 +56,14 @@ import static org.mockito.Mockito.verify; public class TestRemoteTable { private final ScheduledExecutorService schedExec = Executors.newSingleThreadScheduledExecutor(); - public static TaskContext getMockTaskContext() { + public static Context getMockContext() { + Context context = new MockContext(); MetricsRegistry metricsRegistry = mock(MetricsRegistry.class); doAnswer(args -> new Timer((String) args.getArguments()[0])).when(metricsRegistry).newTimer(anyString(), anyString()); doAnswer(args -> new Counter((String) args.getArguments()[0])).when(metricsRegistry).newCounter(anyString(), anyString()); doAnswer(args -> new Gauge((String) args.getArguments()[0], 0)).when(metricsRegistry).newGauge(anyString(), any()); - TaskContext taskContext = mock(TaskContext.class); - doReturn(metricsRegistry).when(taskContext).getMetricsRegistry(); - return taskContext; + doReturn(metricsRegistry).when(context.getTaskContext()).getTaskMetricsRegistry(); + return context; } private <K, V, T extends RemoteReadableTable<K, V>> T getTable(String tableId, @@ -89,11 +88,9 @@ public class TestRemoteTable { table = new RemoteReadWriteTable<K, V>(tableId, readFn, writeFn, readRateLimiter, writeRateLimiter, tableExecutor, cbExecutor); } - TaskContext taskContext = getMockTaskContext(); - - SamzaContainerContext containerContext = mock(SamzaContainerContext.class); + Context context = getMockContext(); - table.init(containerContext, taskContext); + table.init(context); return (T) table; }
