http://git-wip-us.apache.org/repos/asf/samza/blob/9d2d49e9/samza-core/src/test/java/org/apache/samza/table/remote/TestRemoteTableDescriptor.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/table/remote/TestRemoteTableDescriptor.java b/samza-core/src/test/java/org/apache/samza/table/remote/TestRemoteTableDescriptor.java index efe1acf..f587885 100644 --- a/samza-core/src/test/java/org/apache/samza/table/remote/TestRemoteTableDescriptor.java +++ b/samza-core/src/test/java/org/apache/samza/table/remote/TestRemoteTableDescriptor.java @@ -19,13 +19,12 @@ package org.apache.samza.table.remote; -import java.util.Collections; -import java.util.HashMap; -import java.util.Map; -import java.util.concurrent.ThreadPoolExecutor; - -import org.apache.samza.container.SamzaContainerContext; +import com.google.common.collect.ImmutableMap; import org.apache.samza.container.TaskName; +import org.apache.samza.context.Context; +import org.apache.samza.context.MockContext; +import org.apache.samza.job.model.ContainerModel; +import org.apache.samza.job.model.TaskModel; import org.apache.samza.metrics.Counter; import org.apache.samza.metrics.MetricsRegistry; import org.apache.samza.metrics.Timer; @@ -34,18 +33,22 @@ import org.apache.samza.table.TableSpec; import org.apache.samza.table.retry.RetriableReadFunction; import org.apache.samza.table.retry.RetriableWriteFunction; import org.apache.samza.table.retry.TableRetryPolicy; -import org.apache.samza.task.TaskContext; import org.apache.samza.util.EmbeddedTaggedRateLimiter; import org.apache.samza.util.RateLimiter; import org.junit.Assert; import org.junit.Test; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.ThreadPoolExecutor; + import static org.apache.samza.table.remote.RemoteTableDescriptor.RL_READ_TAG; import static org.apache.samza.table.remote.RemoteTableDescriptor.RL_WRITE_TAG; import static org.mockito.Matchers.anyString; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.when; public class TestRemoteTableDescriptor { @@ -117,16 +120,24 @@ public class TestRemoteTableDescriptor { desc.getTableSpec(); } - private TaskContext createMockTaskContext() { + private Context createMockContext() { + Context context = new MockContext(); + MetricsRegistry metricsRegistry = mock(MetricsRegistry.class); doReturn(mock(Timer.class)).when(metricsRegistry).newTimer(anyString(), anyString()); doReturn(mock(Counter.class)).when(metricsRegistry).newCounter(anyString(), anyString()); - TaskContext taskContext = mock(TaskContext.class); - doReturn(metricsRegistry).when(taskContext).getMetricsRegistry(); - SamzaContainerContext containerCtx = new SamzaContainerContext( - "1", null, Collections.singleton(new TaskName("MyTask")), null); - doReturn(containerCtx).when(taskContext).getSamzaContainerContext(); - return taskContext; + doReturn(metricsRegistry).when(context.getTaskContext()).getTaskMetricsRegistry(); + + TaskName taskName = new TaskName("MyTask"); + TaskModel taskModel = mock(TaskModel.class); + when(taskModel.getTaskName()).thenReturn(taskName); + when(context.getTaskContext().getTaskModel()).thenReturn(taskModel); + + ContainerModel containerModel = mock(ContainerModel.class); + when(containerModel.getTasks()).thenReturn(ImmutableMap.of(taskName, taskModel)); + when(context.getContainerContext().getContainerModel()).thenReturn(containerModel); + + return context; } static class CountingCreditFunction<K, V> implements TableRateLimiter.CreditFunction<K, V> { @@ -172,7 +183,7 @@ public class TestRemoteTableDescriptor { TableSpec spec = desc.getTableSpec(); RemoteTableProvider provider = new RemoteTableProvider(spec); - provider.init(mock(SamzaContainerContext.class), createMockTaskContext()); + provider.init(createMockContext()); Table table = provider.getTable(); Assert.assertTrue(table instanceof RemoteReadWriteTable); RemoteReadWriteTable rwTable = (RemoteReadWriteTable) table;
http://git-wip-us.apache.org/repos/asf/samza/blob/9d2d49e9/samza-core/src/test/java/org/apache/samza/table/retry/TestRetriableTableFunctions.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/table/retry/TestRetriableTableFunctions.java b/samza-core/src/test/java/org/apache/samza/table/retry/TestRetriableTableFunctions.java index 9dd5a74..050ea55 100644 --- a/samza-core/src/test/java/org/apache/samza/table/retry/TestRetriableTableFunctions.java +++ b/samza-core/src/test/java/org/apache/samza/table/retry/TestRetriableTableFunctions.java @@ -29,19 +29,16 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; - -import org.apache.samza.container.SamzaContainerContext; +import junit.framework.Assert; +import org.apache.samza.context.Context; import org.apache.samza.storage.kv.Entry; import org.apache.samza.table.Table; import org.apache.samza.table.remote.TableReadFunction; import org.apache.samza.table.remote.TableWriteFunction; import org.apache.samza.table.remote.TestRemoteTable; import org.apache.samza.table.utils.TableMetricsUtil; -import org.apache.samza.task.TaskContext; import org.junit.Test; -import junit.framework.Assert; - import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyString; import static org.mockito.Mockito.atLeast; @@ -57,9 +54,8 @@ public class TestRetriableTableFunctions { public TableMetricsUtil getMetricsUtil(String tableId) { Table table = mock(Table.class); - SamzaContainerContext cntCtx = mock(SamzaContainerContext.class); - TaskContext taskCtx = TestRemoteTable.getMockTaskContext(); - return new TableMetricsUtil(cntCtx, taskCtx, table, tableId); + Context context = TestRemoteTable.getMockContext(); + return new TableMetricsUtil(context, table, tableId); } @Test http://git-wip-us.apache.org/repos/asf/samza/blob/9d2d49e9/samza-core/src/test/java/org/apache/samza/task/IdentityStreamTask.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/task/IdentityStreamTask.java b/samza-core/src/test/java/org/apache/samza/task/IdentityStreamTask.java index 1f71abd..13ce5f4 100644 --- a/samza-core/src/test/java/org/apache/samza/task/IdentityStreamTask.java +++ b/samza-core/src/test/java/org/apache/samza/task/IdentityStreamTask.java @@ -20,19 +20,21 @@ package org.apache.samza.task; import org.apache.samza.config.Config; +import org.apache.samza.context.Context; import org.apache.samza.system.IncomingMessageEnvelope; import org.apache.samza.system.OutgoingMessageEnvelope; import org.apache.samza.system.SystemStream; -public class IdentityStreamTask implements StreamTask , InitableTask { +public class IdentityStreamTask implements StreamTask, InitableTask { private int processedMessageCount = 0; private int expectedMessageCount; private String outputTopic; private String outputSystem; @Override - public void init(Config config, TaskContext taskContext) throws Exception { + public void init(Context context) throws Exception { + Config config = context.getJobContext().getConfig(); this.expectedMessageCount = config.getInt("app.messageCount"); this.outputTopic = config.get("app.outputTopic", "output"); this.outputSystem = config.get("app.outputSystem", "test-system"); http://git-wip-us.apache.org/repos/asf/samza/blob/9d2d49e9/samza-core/src/test/java/org/apache/samza/task/TestAsyncRunLoop.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/task/TestAsyncRunLoop.java b/samza-core/src/test/java/org/apache/samza/task/TestAsyncRunLoop.java index 9cdbfe6..be8d344 100644 --- a/samza-core/src/test/java/org/apache/samza/task/TestAsyncRunLoop.java +++ b/samza-core/src/test/java/org/apache/samza/task/TestAsyncRunLoop.java @@ -28,17 +28,17 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicInteger; - import org.apache.samza.Partition; import org.apache.samza.checkpoint.Checkpoint; import org.apache.samza.checkpoint.OffsetManager; -import org.apache.samza.config.Config; -import org.apache.samza.container.SamzaContainerContext; import org.apache.samza.container.SamzaContainerMetrics; import org.apache.samza.container.TaskInstance; import org.apache.samza.container.TaskInstanceExceptionHandler; import org.apache.samza.container.TaskInstanceMetrics; import org.apache.samza.container.TaskName; +import org.apache.samza.context.ContainerContext; +import org.apache.samza.context.JobContext; +import org.apache.samza.job.model.TaskModel; import org.apache.samza.metrics.MetricsRegistryMap; import org.apache.samza.system.IncomingMessageEnvelope; import org.apache.samza.system.SystemConsumer; @@ -47,13 +47,20 @@ import org.apache.samza.system.SystemStreamPartition; import org.apache.samza.system.TestSystemConsumers; import org.junit.Rule; import org.junit.Test; - import org.junit.rules.Timeout; import scala.Option; import scala.collection.JavaConverters; import static org.junit.Assert.assertEquals; -import static org.mockito.Mockito.*; +import static org.mockito.Mockito.any; +import static org.mockito.Mockito.anyLong; +import static org.mockito.Mockito.anyObject; +import static org.mockito.Mockito.atLeastOnce; +import static org.mockito.Mockito.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; public class TestAsyncRunLoop { // Immutable objects shared by all test methods. @@ -77,12 +84,31 @@ public class TestAsyncRunLoop { private final IncomingMessageEnvelope ssp1EndOfStream = IncomingMessageEnvelope.buildEndOfStreamEnvelope(ssp1); TaskInstance createTaskInstance(AsyncStreamTask task, TaskName taskName, SystemStreamPartition ssp, OffsetManager manager, SystemConsumers consumers) { + TaskModel taskModel = mock(TaskModel.class); + when(taskModel.getTaskName()).thenReturn(taskName); TaskInstanceMetrics taskInstanceMetrics = new TaskInstanceMetrics("task", new MetricsRegistryMap()); scala.collection.immutable.Set<SystemStreamPartition> sspSet = JavaConverters.asScalaSetConverter(Collections.singleton(ssp)).asScala().toSet(); - return new TaskInstance(task, taskName, mock(Config.class), taskInstanceMetrics, - null, consumers, mock(TaskInstanceCollector.class), mock(SamzaContainerContext.class), - manager, null, null, null, sspSet, new TaskInstanceExceptionHandler(taskInstanceMetrics, - new scala.collection.immutable.HashSet<String>()), null, null, null, new scala.collection.immutable.HashSet<>(), null); + return new TaskInstance(task, + taskModel, + taskInstanceMetrics, + null, + consumers, + mock(TaskInstanceCollector.class), + manager, + null, + null, + null, + sspSet, + new TaskInstanceExceptionHandler(taskInstanceMetrics, new scala.collection.immutable.HashSet<String>()), + null, + null, + null, + new scala.collection.immutable.HashSet<>(), + null, + mock(JobContext.class), + mock(ContainerContext.class), + Option.apply(null), + Option.apply(null)); } interface TestCode { http://git-wip-us.apache.org/repos/asf/samza/blob/9d2d49e9/samza-core/src/test/java/org/apache/samza/task/TestAsyncStreamAdapter.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/task/TestAsyncStreamAdapter.java b/samza-core/src/test/java/org/apache/samza/task/TestAsyncStreamAdapter.java index d0b820a..0538980 100644 --- a/samza-core/src/test/java/org/apache/samza/task/TestAsyncStreamAdapter.java +++ b/samza-core/src/test/java/org/apache/samza/task/TestAsyncStreamAdapter.java @@ -22,7 +22,7 @@ package org.apache.samza.task; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; -import org.apache.samza.config.Config; +import org.apache.samza.context.Context; import org.apache.samza.system.IncomingMessageEnvelope; import org.junit.Before; import org.junit.Test; @@ -64,7 +64,7 @@ public class TestAsyncStreamAdapter { } @Override - public void init(Config config, TaskContext context) throws Exception { + public void init(Context context) throws Exception { inited = true; } @@ -95,7 +95,7 @@ public class TestAsyncStreamAdapter { TestCallbackListener listener = new TestCallbackListener(); TaskCallback callback = new TaskCallbackImpl(listener, null, envelope, null, 0L, 0L); - taskAdaptor.init(null, null); + taskAdaptor.init(null); assertTrue(task.inited); taskAdaptor.processAsync(null, null, null, callback); http://git-wip-us.apache.org/repos/asf/samza/blob/9d2d49e9/samza-core/src/test/java/org/apache/samza/task/TestEpochTimeScheduler.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/task/TestEpochTimeScheduler.java b/samza-core/src/test/java/org/apache/samza/task/TestEpochTimeScheduler.java index e0da2e9..da137e6 100644 --- a/samza-core/src/test/java/org/apache/samza/task/TestEpochTimeScheduler.java +++ b/samza-core/src/test/java/org/apache/samza/task/TestEpochTimeScheduler.java @@ -19,12 +19,11 @@ package org.apache.samza.task; -import org.junit.Test; - import java.util.ArrayList; import java.util.List; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; +import org.junit.Test; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; http://git-wip-us.apache.org/repos/asf/samza/blob/9d2d49e9/samza-core/src/test/java/org/apache/samza/task/TestStreamOperatorTask.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/task/TestStreamOperatorTask.java b/samza-core/src/test/java/org/apache/samza/task/TestStreamOperatorTask.java index 1bc23d4..ab5e295 100644 --- a/samza-core/src/test/java/org/apache/samza/task/TestStreamOperatorTask.java +++ b/samza-core/src/test/java/org/apache/samza/task/TestStreamOperatorTask.java @@ -19,14 +19,17 @@ package org.apache.samza.task; -import org.apache.samza.config.Config; -import org.apache.samza.operators.ContextManager; +import org.apache.samza.context.Context; +import org.apache.samza.context.JobContext; import org.apache.samza.operators.OperatorSpecGraph; import org.apache.samza.operators.impl.OperatorImplGraph; +import org.apache.samza.util.Clock; import org.junit.Test; -import static org.junit.Assert.*; -import static org.mockito.Mockito.*; +import static org.junit.Assert.fail; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; public class TestStreamOperatorTask { @@ -36,20 +39,19 @@ public class TestStreamOperatorTask { } @Test - public void testCloseDuringInitializationErrors() { - ContextManager mockContextManager = mock(ContextManager.class); - StreamOperatorTask operatorTask = new StreamOperatorTask(mock(OperatorSpecGraph.class), mockContextManager); - - doThrow(new RuntimeException("Failed to initialize context manager")) - .when(mockContextManager).init(any(), any()); - + public void testCloseDuringInitializationErrors() throws Exception { + Context context = mock(Context.class); + JobContext jobContext = mock(JobContext.class); + when(context.getJobContext()).thenReturn(jobContext); + doThrow(new RuntimeException("Failed to get config")).when(jobContext).getConfig(); + StreamOperatorTask operatorTask = new StreamOperatorTask(mock(OperatorSpecGraph.class), mock(Clock.class)); try { - operatorTask.init(mock(Config.class), mock(TaskContext.class)); - operatorTask.close(); - } catch (Exception e) { + operatorTask.init(context); + } catch (RuntimeException e) { if (e instanceof NullPointerException) { fail("Unexpected null pointer exception"); } } + operatorTask.close(); } } http://git-wip-us.apache.org/repos/asf/samza/blob/9d2d49e9/samza-core/src/test/java/org/apache/samza/util/TestEmbeddedTaggedRateLimiter.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/util/TestEmbeddedTaggedRateLimiter.java b/samza-core/src/test/java/org/apache/samza/util/TestEmbeddedTaggedRateLimiter.java index ef606c0..8559bb3 100644 --- a/samza-core/src/test/java/org/apache/samza/util/TestEmbeddedTaggedRateLimiter.java +++ b/samza-core/src/test/java/org/apache/samza/util/TestEmbeddedTaggedRateLimiter.java @@ -18,19 +18,22 @@ */ package org.apache.samza.util; -import java.lang.reflect.Field; -import java.util.Collection; -import java.util.HashMap; -import java.util.Map; - -import org.apache.samza.SamzaException; import org.apache.samza.config.Config; -import org.apache.samza.container.SamzaContainerContext; -import org.apache.samza.task.TaskContext; +import org.apache.samza.container.TaskName; +import org.apache.samza.context.Context; +import org.apache.samza.context.MockContext; +import org.apache.samza.job.model.ContainerModel; +import org.apache.samza.job.model.TaskModel; import org.junit.Assert; import org.junit.Ignore; import org.junit.Test; +import java.util.HashMap; +import java.util.Map; +import java.util.function.Function; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + import static java.util.concurrent.TimeUnit.MILLISECONDS; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -206,25 +209,14 @@ public class TestEmbeddedTaggedRateLimiter { } static void initRateLimiter(RateLimiter rateLimiter) { - Config config = mock(Config.class); - TaskContext taskContext = mock(TaskContext.class); - SamzaContainerContext containerContext = mockSamzaContainerContext(); - when(taskContext.getSamzaContainerContext()).thenReturn(containerContext); - rateLimiter.init(config, taskContext); - } - - static SamzaContainerContext mockSamzaContainerContext() { - try { - Collection<String> taskNames = mock(Collection.class); - when(taskNames.size()).thenReturn(NUMBER_OF_TASKS); - SamzaContainerContext containerContext = mock(SamzaContainerContext.class); - Field taskNamesField = SamzaContainerContext.class.getDeclaredField("taskNames"); - taskNamesField.setAccessible(true); - taskNamesField.set(containerContext, taskNames); - taskNamesField.setAccessible(false); - return containerContext; - } catch (Exception ex) { - throw new SamzaException(ex); - } + Context context = new MockContext(mock(Config.class)); + when(context.getTaskContext().getTaskModel()).thenReturn(mock(TaskModel.class)); + ContainerModel containerModel = mock(ContainerModel.class); + Map<TaskName, TaskModel> tasks = IntStream.range(0, NUMBER_OF_TASKS) + .mapToObj(i -> new TaskName("task-" + i)) + .collect(Collectors.toMap(Function.identity(), x -> mock(TaskModel.class))); + when(containerModel.getTasks()).thenReturn(tasks); + when(context.getContainerContext().getContainerModel()).thenReturn(containerModel); + rateLimiter.init(context); } } http://git-wip-us.apache.org/repos/asf/samza/blob/9d2d49e9/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala b/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala index 57c0bf0..a35366d 100644 --- a/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala +++ b/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala @@ -22,7 +22,8 @@ package org.apache.samza.container import java.util import java.util.concurrent.atomic.AtomicReference -import org.apache.samza.config.MapConfig +import org.apache.samza.config.{Config, MapConfig} +import org.apache.samza.context.{ApplicationContainerContext, ContainerContext} import org.apache.samza.coordinator.JobModelManager import org.apache.samza.coordinator.server.{HttpServer, JobServlet} import org.apache.samza.job.model.{ContainerModel, JobModel, TaskModel} @@ -46,7 +47,7 @@ class TestSamzaContainer extends AssertionsForJUnit with MockitoSugar { private val TASK_NAME = new TaskName("taskName") @Mock - private var containerContext: SamzaContainerContext = null + private var config: Config = null @Mock private var taskInstance: TaskInstance = null @Mock @@ -60,6 +61,10 @@ class TestSamzaContainer extends AssertionsForJUnit with MockitoSugar { @Mock private var metrics: SamzaContainerMetrics = null @Mock + private var containerContext: ContainerContext = null + @Mock + private var applicationContainerContext: ApplicationContainerContext = null + @Mock private var samzaContainerListener: SamzaContainerListener = null private var samzaContainer: SamzaContainer = null @@ -67,15 +72,7 @@ class TestSamzaContainer extends AssertionsForJUnit with MockitoSugar { @Before def setup(): Unit = { MockitoAnnotations.initMocks(this) - this.samzaContainer = new SamzaContainer( - this.containerContext, - Map(TASK_NAME -> this.taskInstance), - this.runLoop, - this.systemAdmins, - this.consumerMultiplexer, - this.producerMultiplexer, - metrics) - this.samzaContainer.setContainerListener(this.samzaContainerListener) + setupSamzaContainer(Some(this.applicationContainerContext)) when(this.metrics.containerStartupTime).thenReturn(mock[Timer]) } @@ -173,6 +170,24 @@ class TestSamzaContainer extends AssertionsForJUnit with MockitoSugar { } @Test + def testApplicationContainerContext() { + val orderVerifier = inOrder(this.applicationContainerContext, this.runLoop) + this.samzaContainer.run + orderVerifier.verify(this.applicationContainerContext).start() + orderVerifier.verify(this.runLoop).run() + orderVerifier.verify(this.applicationContainerContext).stop() + } + + @Test + def testNullApplicationContainerContextFactory() { + setupSamzaContainer(None) + this.samzaContainer.run + verify(this.runLoop).run() + // applicationContainerContext is not even wired into the container anymore, but just double check it is not used + verifyZeroInteractions(this.applicationContainerContext) + } + + @Test def testReadJobModel() { val config = new MapConfig(Map("a" -> "b").asJava) val offsets = new util.HashMap[SystemStreamPartition, String]() @@ -258,6 +273,20 @@ class TestSamzaContainer extends AssertionsForJUnit with MockitoSugar { assertEquals(Set(), SamzaContainer.getChangelogSSPsForContainer(containerModel, Map())) } + private def setupSamzaContainer(applicationContainerContext: Option[ApplicationContainerContext]) { + this.samzaContainer = new SamzaContainer( + this.config, + Map(TASK_NAME -> this.taskInstance), + this.runLoop, + this.systemAdmins, + this.consumerMultiplexer, + this.producerMultiplexer, + metrics, + containerContext = this.containerContext, + applicationContainerContextOption = applicationContainerContext) + this.samzaContainer.setContainerListener(this.samzaContainerListener) + } + class MockJobServlet(exceptionLimit: Int, jobModelRef: AtomicReference[JobModel]) extends JobServlet(jobModelRef) { var exceptionCount = 0 http://git-wip-us.apache.org/repos/asf/samza/blob/9d2d49e9/samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala b/samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala index b196131..15534cd 100644 --- a/samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala +++ b/samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala @@ -22,7 +22,8 @@ package org.apache.samza.container import org.apache.samza.Partition import org.apache.samza.checkpoint.{Checkpoint, OffsetManager} -import org.apache.samza.config.Config +import org.apache.samza.context.{TaskContext => _, _} +import org.apache.samza.job.model.TaskModel import org.apache.samza.metrics.Counter import org.apache.samza.storage.TaskStorageManager import org.apache.samza.system.{IncomingMessageEnvelope, SystemAdmin, SystemConsumers, SystemStream, _} @@ -34,11 +35,12 @@ import org.mockito.Mockito._ import org.mockito.invocation.InvocationOnMock import org.mockito.stubbing.Answer import org.mockito.{Matchers, Mock, MockitoAnnotations} +import org.scalatest.junit.AssertionsForJUnit import org.scalatest.mockito.MockitoSugar import scala.collection.JavaConverters._ -class TestTaskInstance extends MockitoSugar { +class TestTaskInstance extends AssertionsForJUnit with MockitoSugar { private val SYSTEM_NAME = "test-system" private val TASK_NAME = new TaskName("taskName") private val SYSTEM_STREAM_PARTITION = @@ -48,7 +50,7 @@ class TestTaskInstance extends MockitoSugar { @Mock private var task: AllTask = null @Mock - private var config: Config = null + private var taskModel: TaskModel = null @Mock private var metrics: TaskInstanceMetrics = null @Mock @@ -60,13 +62,21 @@ class TestTaskInstance extends MockitoSugar { @Mock private var collector: TaskInstanceCollector = null @Mock - private var containerContext: SamzaContainerContext = null - @Mock private var offsetManager: OffsetManager = null @Mock private var taskStorageManager: TaskStorageManager = null // not a mock; using MockTaskInstanceExceptionHandler private var taskInstanceExceptionHandler: MockTaskInstanceExceptionHandler = null + @Mock + private var jobContext: JobContext = null + @Mock + private var containerContext: ContainerContext = null + @Mock + private var applicationContainerContext: ApplicationContainerContext = null + @Mock + private var applicationTaskContextFactory: ApplicationTaskContextFactory[ApplicationTaskContext] = null + @Mock + private var applicationTaskContext: ApplicationTaskContext = null private var taskInstance: TaskInstance = null @@ -75,19 +85,12 @@ class TestTaskInstance extends MockitoSugar { MockitoAnnotations.initMocks(this) // not using Mockito mock since Mockito doesn't work well with the call-by-name argument in maybeHandle this.taskInstanceExceptionHandler = new MockTaskInstanceExceptionHandler - this.taskInstance = new TaskInstance(this.task, - TASK_NAME, - this.config, - this.metrics, - this.systemAdmins, - this.consumerMultiplexer, - this.collector, - this.containerContext, - this.offsetManager, - storageManager = this.taskStorageManager, - systemStreamPartitions = SYSTEM_STREAM_PARTITIONS, - exceptionHandler = this.taskInstanceExceptionHandler) + when(this.taskModel.getTaskName).thenReturn(TASK_NAME) + when(this.applicationTaskContextFactory.create(Matchers.eq(this.jobContext), Matchers.eq(this.containerContext), + any(), Matchers.eq(this.applicationContainerContext))) + .thenReturn(this.applicationTaskContext) when(this.systemAdmins.getSystemAdmin(SYSTEM_NAME)).thenReturn(this.systemAdmin) + setupTaskInstance(Some(this.applicationTaskContextFactory)) } @Test @@ -133,10 +136,10 @@ class TestTaskInstance extends MockitoSugar { */ @Test def testManualOffsetReset() { - when(this.task.init(any(), any())).thenAnswer(new Answer[Void] { + when(this.task.init(any())).thenAnswer(new Answer[Void] { override def answer(invocation: InvocationOnMock): Void = { - val taskContext = invocation.getArgumentAt(1, classOf[TaskContext]) - taskContext.setStartingOffset(SYSTEM_STREAM_PARTITION, "10") + val context = invocation.getArgumentAt(0, classOf[Context]) + context.getTaskContext.setStartingOffset(SYSTEM_STREAM_PARTITION, "10") null } }) @@ -198,6 +201,35 @@ class TestTaskInstance extends MockitoSugar { verify(commitsCounter).inc() } + /** + * Given that an application task context factory is provided, then lifecycle calls should be made and the context + * should be accessible. + */ + @Test + def testApplicationTaskContextFactoryProvided(): Unit = { + assertEquals(this.applicationTaskContext, this.taskInstance.context.getApplicationTaskContext) + this.taskInstance.initTask + verify(this.applicationTaskContext).start() + verify(this.applicationTaskContext, never()).stop() + this.taskInstance.shutdownTask + verify(this.applicationTaskContext).stop() + } + + /** + * Given that no application task context factory is provided, then no lifecycle calls should be made. Also, an + * exception should be thrown if the application task context is accessed. + */ + @Test + def testNoApplicationTaskContextFactoryProvided() { + setupTaskInstance(None) + this.taskInstance.initTask + this.taskInstance.shutdownTask + verifyZeroInteractions(this.applicationTaskContext) + intercept[IllegalStateException] { + this.taskInstance.context.getApplicationTaskContext + } + } + @Test(expected = classOf[SystemProducerException]) def testProducerExceptionsIsPropagated() { when(this.metrics.commits).thenReturn(mock[Counter]) @@ -210,6 +242,24 @@ class TestTaskInstance extends MockitoSugar { } } + private def setupTaskInstance( + applicationTaskContextFactory: Option[ApplicationTaskContextFactory[ApplicationTaskContext]]): Unit = { + this.taskInstance = new TaskInstance(this.task, + this.taskModel, + this.metrics, + this.systemAdmins, + this.consumerMultiplexer, + this.collector, + offsetManager = this.offsetManager, + storageManager = this.taskStorageManager, + systemStreamPartitions = SYSTEM_STREAM_PARTITIONS, + exceptionHandler = this.taskInstanceExceptionHandler, + jobContext = this.jobContext, + containerContext = this.containerContext, + applicationContainerContextOption = Some(this.applicationContainerContext), + applicationTaskContextFactoryOption = applicationTaskContextFactory) + } + /** * Task type which has all task traits, which can be mocked. */ http://git-wip-us.apache.org/repos/asf/samza/blob/9d2d49e9/samza-core/src/test/scala/org/apache/samza/processor/StreamProcessorTestUtils.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/test/scala/org/apache/samza/processor/StreamProcessorTestUtils.scala b/samza-core/src/test/scala/org/apache/samza/processor/StreamProcessorTestUtils.scala index 0b951f4..59f8662 100644 --- a/samza-core/src/test/scala/org/apache/samza/processor/StreamProcessorTestUtils.scala +++ b/samza-core/src/test/scala/org/apache/samza/processor/StreamProcessorTestUtils.scala @@ -18,21 +18,25 @@ */ package org.apache.samza.processor -import java.util.Collections +import java.util +import org.apache.samza.Partition import org.apache.samza.config.MapConfig import org.apache.samza.container._ -import org.apache.samza.metrics.MetricsRegistryMap +import org.apache.samza.context.{ContainerContext, JobContext} +import org.apache.samza.job.model.TaskModel import org.apache.samza.serializers.SerdeManager -import org.apache.samza.system.chooser.RoundRobinChooser import org.apache.samza.system._ +import org.apache.samza.system.chooser.RoundRobinChooser import org.apache.samza.task.{StreamTask, TaskInstanceCollector} +import org.mockito.Mockito object StreamProcessorTestUtils { def getDummyContainer(mockRunloop: RunLoop, streamTask: StreamTask) = { - val config = new MapConfig + val config = new MapConfig() val taskName = new TaskName("taskName") + val taskModel = new TaskModel(taskName, new util.HashSet[SystemStreamPartition](), new Partition(0)) val adminMultiplexer = new SystemAdmins(config) val consumerMultiplexer = new SystemConsumers( new RoundRobinChooser, @@ -41,26 +45,29 @@ object StreamProcessorTestUtils { Map[String, SystemProducer](), new SerdeManager) val collector = new TaskInstanceCollector(producerMultiplexer) - val containerContext = new SamzaContainerContext("0", config, Collections.singleton[TaskName](taskName), new MetricsRegistryMap) + val containerContext = Mockito.mock(classOf[ContainerContext]) val taskInstance: TaskInstance = new TaskInstance( streamTask, - taskName, - config, + taskModel, new TaskInstanceMetrics, - null, + adminMultiplexer, consumerMultiplexer, collector, - containerContext - ) + jobContext = Mockito.mock(classOf[JobContext]), + containerContext = containerContext, + applicationContainerContextOption = None, + applicationTaskContextFactoryOption = None) val container = new SamzaContainer( - containerContext = containerContext, + config = config, taskInstances = Map(taskName -> taskInstance), runLoop = mockRunloop, systemAdmins = adminMultiplexer, consumerMultiplexer = consumerMultiplexer, producerMultiplexer = producerMultiplexer, - metrics = new SamzaContainerMetrics) + metrics = new SamzaContainerMetrics, + containerContext = containerContext, + applicationContainerContextOption = None) container } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/samza/blob/9d2d49e9/samza-kv-inmemory/src/main/scala/org/apache/samza/storage/kv/inmemory/InMemoryKeyValueStorageEngineFactory.scala ---------------------------------------------------------------------- diff --git a/samza-kv-inmemory/src/main/scala/org/apache/samza/storage/kv/inmemory/InMemoryKeyValueStorageEngineFactory.scala b/samza-kv-inmemory/src/main/scala/org/apache/samza/storage/kv/inmemory/InMemoryKeyValueStorageEngineFactory.scala index 53147ad..e30328a 100644 --- a/samza-kv-inmemory/src/main/scala/org/apache/samza/storage/kv/inmemory/InMemoryKeyValueStorageEngineFactory.scala +++ b/samza-kv-inmemory/src/main/scala/org/apache/samza/storage/kv/inmemory/InMemoryKeyValueStorageEngineFactory.scala @@ -21,18 +21,19 @@ package org.apache.samza.storage.kv.inmemory import java.io.File -import org.apache.samza.container.SamzaContainerContext +import org.apache.samza.context.{ContainerContext, JobContext} import org.apache.samza.metrics.MetricsRegistry -import org.apache.samza.storage.kv.{KeyValueStoreMetrics, BaseKeyValueStorageEngineFactory, KeyValueStore} +import org.apache.samza.storage.kv.{BaseKeyValueStorageEngineFactory, KeyValueStore, KeyValueStoreMetrics} import org.apache.samza.system.SystemStreamPartition class InMemoryKeyValueStorageEngineFactory[K, V] extends BaseKeyValueStorageEngineFactory[K, V] { override def getKVStore(storeName: String, - storeDir: File, - registry: MetricsRegistry, - changeLogSystemStreamPartition: SystemStreamPartition, - containerContext: SamzaContainerContext): KeyValueStore[Array[Byte], Array[Byte]] = { + storeDir: File, + registry: MetricsRegistry, + changeLogSystemStreamPartition: SystemStreamPartition, + jobContext: JobContext, + containerContext: ContainerContext): KeyValueStore[Array[Byte], Array[Byte]] = { val metrics = new KeyValueStoreMetrics(storeName, registry) val inMemoryDb = new InMemoryKeyValueStore (metrics) inMemoryDb http://git-wip-us.apache.org/repos/asf/samza/blob/9d2d49e9/samza-kv-rocksdb/src/main/java/org/apache/samza/storage/kv/RocksDbKeyValueReader.java ---------------------------------------------------------------------- diff --git a/samza-kv-rocksdb/src/main/java/org/apache/samza/storage/kv/RocksDbKeyValueReader.java b/samza-kv-rocksdb/src/main/java/org/apache/samza/storage/kv/RocksDbKeyValueReader.java index 9dca23c..0734fe6 100644 --- a/samza-kv-rocksdb/src/main/java/org/apache/samza/storage/kv/RocksDbKeyValueReader.java +++ b/samza-kv-rocksdb/src/main/java/org/apache/samza/storage/kv/RocksDbKeyValueReader.java @@ -19,16 +19,11 @@ package org.apache.samza.storage.kv; -import java.util.ArrayList; - import org.apache.samza.SamzaException; import org.apache.samza.config.Config; import org.apache.samza.config.JavaSerializerConfig; import org.apache.samza.config.JavaStorageConfig; import org.apache.samza.config.SerializerConfig$; -import org.apache.samza.container.SamzaContainerContext; -import org.apache.samza.container.TaskName; -import org.apache.samza.metrics.MetricsRegistryMap; import org.apache.samza.serializers.Serde; import org.apache.samza.serializers.SerdeFactory; import org.apache.samza.util.Util; @@ -65,11 +60,7 @@ public class RocksDbKeyValueReader { valueSerde = getSerdeFromName(storageConfig.getStorageMsgSerde(storeName), serializerConfig); // get db options - ArrayList<TaskName> taskNameList = new ArrayList<TaskName>(); - taskNameList.add(new TaskName("read-rocks-db")); - SamzaContainerContext samzaContainerContext = - new SamzaContainerContext("0", config, taskNameList, new MetricsRegistryMap()); - Options options = RocksDbOptionsHelper.options(config, samzaContainerContext); + Options options = RocksDbOptionsHelper.options(config, 1); // open the db RocksDB.loadLibrary(); http://git-wip-us.apache.org/repos/asf/samza/blob/9d2d49e9/samza-kv-rocksdb/src/main/java/org/apache/samza/storage/kv/RocksDbOptionsHelper.java ---------------------------------------------------------------------- diff --git a/samza-kv-rocksdb/src/main/java/org/apache/samza/storage/kv/RocksDbOptionsHelper.java b/samza-kv-rocksdb/src/main/java/org/apache/samza/storage/kv/RocksDbOptionsHelper.java index 9389681..7beb066 100644 --- a/samza-kv-rocksdb/src/main/java/org/apache/samza/storage/kv/RocksDbOptionsHelper.java +++ b/samza-kv-rocksdb/src/main/java/org/apache/samza/storage/kv/RocksDbOptionsHelper.java @@ -20,7 +20,8 @@ package org.apache.samza.storage.kv; import org.apache.samza.config.Config; -import org.apache.samza.container.SamzaContainerContext; +import org.apache.samza.context.ContainerContext; +import org.apache.samza.context.JobContext; import org.rocksdb.BlockBasedTableConfig; import org.rocksdb.CompactionStyle; import org.rocksdb.CompressionType; @@ -41,12 +42,11 @@ public class RocksDbOptionsHelper { private static final String ROCKSDB_MAX_LOG_FILE_SIZE_BYTES = "rocksdb.max.log.file.size.bytes"; private static final String ROCKSDB_KEEP_LOG_FILE_NUM = "rocksdb.keep.log.file.num"; - public static Options options(Config storeConfig, SamzaContainerContext containerContext) { + public static Options options(Config storeConfig, int numTasksForContainer) { Options options = new Options(); Long writeBufSize = storeConfig.getLong("container.write.buffer.size.bytes", 32 * 1024 * 1024); // Cache size and write buffer size are specified on a per-container basis. - int numTasks = containerContext.taskNames.size(); - options.setWriteBufferSize((int) (writeBufSize / numTasks)); + options.setWriteBufferSize((int) (writeBufSize / numTasksForContainer)); CompressionType compressionType = CompressionType.SNAPPY_COMPRESSION; String compressionInConfig = storeConfig.get(ROCKSDB_COMPRESSION, "snappy"); @@ -75,7 +75,7 @@ public class RocksDbOptionsHelper { } options.setCompressionType(compressionType); - long blockCacheSize = getBlockCacheSize(storeConfig, containerContext); + long blockCacheSize = getBlockCacheSize(storeConfig, numTasksForContainer); int blockSize = storeConfig.getInt(ROCKSDB_BLOCK_SIZE_BYTES, 4096); BlockBasedTableConfig tableOptions = new BlockBasedTableConfig(); tableOptions.setBlockCacheSize(blockCacheSize).setBlockSize(blockSize); @@ -109,9 +109,8 @@ public class RocksDbOptionsHelper { return options; } - public static Long getBlockCacheSize(Config storeConfig, SamzaContainerContext containerContext) { - int numTasks = containerContext.taskNames.size(); + public static Long getBlockCacheSize(Config storeConfig, int numTasksForContainer) { long cacheSize = storeConfig.getLong("container.cache.size.bytes", 100 * 1024 * 1024L); - return cacheSize / numTasks; + return cacheSize / numTasksForContainer; } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/samza/blob/9d2d49e9/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStorageEngineFactory.scala ---------------------------------------------------------------------- diff --git a/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStorageEngineFactory.scala b/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStorageEngineFactory.scala index 2b7ffb5..704af4a 100644 --- a/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStorageEngineFactory.scala +++ b/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStorageEngineFactory.scala @@ -20,11 +20,12 @@ package org.apache.samza.storage.kv import java.io.File -import org.apache.samza.container.SamzaContainerContext + +import org.apache.samza.config.StorageConfig._ +import org.apache.samza.context.{ContainerContext, JobContext} import org.apache.samza.metrics.MetricsRegistry import org.apache.samza.system.SystemStreamPartition import org.rocksdb.{FlushOptions, WriteOptions} -import org.apache.samza.config.StorageConfig._ class RocksDbKeyValueStorageEngineFactory [K, V] extends BaseKeyValueStorageEngineFactory[K, V] { /** @@ -37,17 +38,19 @@ class RocksDbKeyValueStorageEngineFactory [K, V] extends BaseKeyValueStorageEngi * @return A valid KeyValueStore instance */ override def getKVStore(storeName: String, - storeDir: File, - registry: MetricsRegistry, - changeLogSystemStreamPartition: SystemStreamPartition, - containerContext: SamzaContainerContext): KeyValueStore[Array[Byte], Array[Byte]] = { - val storageConfig = containerContext.config.subset("stores." + storeName + ".", true) - val isLoggedStore = containerContext.config.getChangelogStream(storeName).isDefined + storeDir: File, + registry: MetricsRegistry, + changeLogSystemStreamPartition: SystemStreamPartition, + jobContext: JobContext, + containerContext: ContainerContext): KeyValueStore[Array[Byte], Array[Byte]] = { + val storageConfig = jobContext.getConfig.subset("stores." + storeName + ".", true) + val isLoggedStore = jobContext.getConfig.getChangelogStream(storeName).isDefined val rocksDbMetrics = new KeyValueStoreMetrics(storeName, registry) + val numTasksForContainer = containerContext.getContainerModel.getTasks.keySet().size() rocksDbMetrics.newGauge("rocksdb.block-cache-size", - () => RocksDbOptionsHelper.getBlockCacheSize(storageConfig, containerContext)) + () => RocksDbOptionsHelper.getBlockCacheSize(storageConfig, numTasksForContainer)) - val rocksDbOptions = RocksDbOptionsHelper.options(storageConfig, containerContext) + val rocksDbOptions = RocksDbOptionsHelper.options(storageConfig, numTasksForContainer) val rocksDbWriteOptions = new WriteOptions().setDisableWAL(true) val rocksDbFlushOptions = new FlushOptions().setWaitForFlush(true) val rocksDb = new RocksDbKeyValueStore( http://git-wip-us.apache.org/repos/asf/samza/blob/9d2d49e9/samza-kv-rocksdb/src/test/java/org/apache/samza/storage/kv/TestRocksDbTableDescriptor.java ---------------------------------------------------------------------- diff --git a/samza-kv-rocksdb/src/test/java/org/apache/samza/storage/kv/TestRocksDbTableDescriptor.java b/samza-kv-rocksdb/src/test/java/org/apache/samza/storage/kv/TestRocksDbTableDescriptor.java index 35a66e8..cd7e85c 100644 --- a/samza-kv-rocksdb/src/test/java/org/apache/samza/storage/kv/TestRocksDbTableDescriptor.java +++ b/samza-kv-rocksdb/src/test/java/org/apache/samza/storage/kv/TestRocksDbTableDescriptor.java @@ -18,14 +18,13 @@ */ package org.apache.samza.storage.kv; +import junit.framework.Assert; import org.apache.samza.serializers.IntegerSerde; import org.apache.samza.serializers.KVSerde; import org.apache.samza.serializers.StringSerde; import org.apache.samza.table.TableSpec; import org.junit.Test; -import junit.framework.Assert; - public class TestRocksDbTableDescriptor { http://git-wip-us.apache.org/repos/asf/samza/blob/9d2d49e9/samza-kv/src/main/java/org/apache/samza/storage/kv/BaseLocalStoreBackedTableProvider.java ---------------------------------------------------------------------- diff --git a/samza-kv/src/main/java/org/apache/samza/storage/kv/BaseLocalStoreBackedTableProvider.java b/samza-kv/src/main/java/org/apache/samza/storage/kv/BaseLocalStoreBackedTableProvider.java index 8231905..e56c977 100644 --- a/samza-kv/src/main/java/org/apache/samza/storage/kv/BaseLocalStoreBackedTableProvider.java +++ b/samza-kv/src/main/java/org/apache/samza/storage/kv/BaseLocalStoreBackedTableProvider.java @@ -18,11 +18,11 @@ */ package org.apache.samza.storage.kv; +import com.google.common.base.Preconditions; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.regex.Pattern; - import org.apache.commons.lang3.StringUtils; import org.apache.samza.SamzaException; import org.apache.samza.config.Config; @@ -31,15 +31,12 @@ import org.apache.samza.config.JavaTableConfig; import org.apache.samza.config.JobConfig; import org.apache.samza.config.MapConfig; import org.apache.samza.config.StorageConfig; -import org.apache.samza.container.SamzaContainerContext; +import org.apache.samza.context.Context; import org.apache.samza.table.ReadableTable; import org.apache.samza.table.Table; import org.apache.samza.table.TableSpec; import org.apache.samza.table.utils.BaseTableProvider; import org.apache.samza.table.utils.SerdeUtils; -import org.apache.samza.task.TaskContext; - -import com.google.common.base.Preconditions; /** @@ -59,13 +56,12 @@ abstract public class BaseLocalStoreBackedTableProvider extends BaseTableProvide } @Override - public void init(SamzaContainerContext containerContext, TaskContext taskContext) { - - super.init(containerContext, taskContext); + public void init(Context context) { + super.init(context); - Preconditions.checkNotNull(this.taskContext, "Must specify task context for local tables."); + Preconditions.checkNotNull(this.context, "Must specify context for local tables."); - kvStore = (KeyValueStore) taskContext.getStore(tableSpec.getId()); + kvStore = (KeyValueStore) this.context.getTaskContext().getStore(tableSpec.getId()); if (kvStore == null) { throw new SamzaException(String.format( @@ -81,7 +77,7 @@ abstract public class BaseLocalStoreBackedTableProvider extends BaseTableProvide throw new SamzaException("Store not initialized for table " + tableSpec.getId()); } ReadableTable table = new LocalStoreBackedReadWriteTable(tableSpec.getId(), kvStore); - table.init(containerContext, taskContext); + table.init(this.context); return table; } http://git-wip-us.apache.org/repos/asf/samza/blob/9d2d49e9/samza-kv/src/main/java/org/apache/samza/storage/kv/LocalStoreBackedReadWriteTable.java ---------------------------------------------------------------------- diff --git a/samza-kv/src/main/java/org/apache/samza/storage/kv/LocalStoreBackedReadWriteTable.java b/samza-kv/src/main/java/org/apache/samza/storage/kv/LocalStoreBackedReadWriteTable.java index 9eeb55e..804df43 100644 --- a/samza-kv/src/main/java/org/apache/samza/storage/kv/LocalStoreBackedReadWriteTable.java +++ b/samza-kv/src/main/java/org/apache/samza/storage/kv/LocalStoreBackedReadWriteTable.java @@ -20,11 +20,9 @@ package org.apache.samza.storage.kv; import java.util.List; import java.util.concurrent.CompletableFuture; - -import org.apache.samza.container.SamzaContainerContext; +import org.apache.samza.context.Context; import org.apache.samza.table.ReadWriteTable; import org.apache.samza.table.utils.DefaultTableWriteMetrics; -import org.apache.samza.task.TaskContext; /** @@ -51,9 +49,9 @@ public class LocalStoreBackedReadWriteTable<K, V> extends LocalStoreBackedReadab * {@inheritDoc} */ @Override - public void init(SamzaContainerContext containerContext, TaskContext taskContext) { - super.init(containerContext, taskContext); - writeMetrics = new DefaultTableWriteMetrics(containerContext, taskContext, this, tableId); + public void init(Context context) { + super.init(context); + writeMetrics = new DefaultTableWriteMetrics(context, this, tableId); } @Override http://git-wip-us.apache.org/repos/asf/samza/blob/9d2d49e9/samza-kv/src/main/java/org/apache/samza/storage/kv/LocalStoreBackedReadableTable.java ---------------------------------------------------------------------- diff --git a/samza-kv/src/main/java/org/apache/samza/storage/kv/LocalStoreBackedReadableTable.java b/samza-kv/src/main/java/org/apache/samza/storage/kv/LocalStoreBackedReadableTable.java index d0629c4..d440d42 100644 --- a/samza-kv/src/main/java/org/apache/samza/storage/kv/LocalStoreBackedReadableTable.java +++ b/samza-kv/src/main/java/org/apache/samza/storage/kv/LocalStoreBackedReadableTable.java @@ -18,15 +18,13 @@ */ package org.apache.samza.storage.kv; +import com.google.common.base.Preconditions; import java.util.List; import java.util.Map; import java.util.concurrent.CompletableFuture; - -import com.google.common.base.Preconditions; -import org.apache.samza.container.SamzaContainerContext; +import org.apache.samza.context.Context; import org.apache.samza.table.ReadableTable; import org.apache.samza.table.utils.DefaultTableReadMetrics; -import org.apache.samza.task.TaskContext; /** @@ -58,8 +56,8 @@ public class LocalStoreBackedReadableTable<K, V> implements ReadableTable<K, V> * {@inheritDoc} */ @Override - public void init(SamzaContainerContext containerContext, TaskContext taskContext) { - readMetrics = new DefaultTableReadMetrics(containerContext, taskContext, this, tableId); + public void init(Context context) { + readMetrics = new DefaultTableReadMetrics(context, this, tableId); } @Override http://git-wip-us.apache.org/repos/asf/samza/blob/9d2d49e9/samza-kv/src/main/scala/org/apache/samza/storage/kv/BaseKeyValueStorageEngineFactory.scala ---------------------------------------------------------------------- diff --git a/samza-kv/src/main/scala/org/apache/samza/storage/kv/BaseKeyValueStorageEngineFactory.scala b/samza-kv/src/main/scala/org/apache/samza/storage/kv/BaseKeyValueStorageEngineFactory.scala index da80560..d962e93 100644 --- a/samza-kv/src/main/scala/org/apache/samza/storage/kv/BaseKeyValueStorageEngineFactory.scala +++ b/samza-kv/src/main/scala/org/apache/samza/storage/kv/BaseKeyValueStorageEngineFactory.scala @@ -22,14 +22,14 @@ package org.apache.samza.storage.kv import java.io.File import org.apache.samza.SamzaException -import org.apache.samza.container.SamzaContainerContext +import org.apache.samza.config.MetricsConfig.Config2Metrics +import org.apache.samza.context.{ContainerContext, JobContext} import org.apache.samza.metrics.MetricsRegistry import org.apache.samza.serializers.Serde import org.apache.samza.storage.{StorageEngine, StorageEngineFactory, StoreProperties} import org.apache.samza.system.SystemStreamPartition import org.apache.samza.task.MessageCollector -import org.apache.samza.config.MetricsConfig.Config2Metrics -import org.apache.samza.util.{HighResolutionClock, ScalaJavaUtil} +import org.apache.samza.util.HighResolutionClock /** * A key value storage engine factory implementation @@ -52,11 +52,12 @@ trait BaseKeyValueStorageEngineFactory[K, V] extends StorageEngineFactory[K, V] * @param containerContext Information about the container in which the task is executing. * @return A valid KeyValueStore instance */ - def getKVStore( storeName: String, - storeDir: File, - registry: MetricsRegistry, - changeLogSystemStreamPartition: SystemStreamPartition, - containerContext: SamzaContainerContext): KeyValueStore[Array[Byte], Array[Byte]] + def getKVStore(storeName: String, + storeDir: File, + registry: MetricsRegistry, + changeLogSystemStreamPartition: SystemStreamPartition, + jobContext: JobContext, + containerContext: ContainerContext): KeyValueStore[Array[Byte], Array[Byte]] /** * Constructs a key-value StorageEngine and returns it to the caller @@ -70,15 +71,16 @@ trait BaseKeyValueStorageEngineFactory[K, V] extends StorageEngineFactory[K, V] * @param changeLogSystemStreamPartition Samza stream partition from which to receive the changelog. * @param containerContext Information about the container in which the task is executing. **/ - def getStorageEngine( storeName: String, - storeDir: File, - keySerde: Serde[K], - msgSerde: Serde[V], - collector: MessageCollector, - registry: MetricsRegistry, - changeLogSystemStreamPartition: SystemStreamPartition, - containerContext: SamzaContainerContext): StorageEngine = { - val storageConfig = containerContext.config.subset("stores." + storeName + ".", true) + def getStorageEngine(storeName: String, + storeDir: File, + keySerde: Serde[K], + msgSerde: Serde[V], + collector: MessageCollector, + registry: MetricsRegistry, + changeLogSystemStreamPartition: SystemStreamPartition, + jobContext: JobContext, + containerContext: ContainerContext): StorageEngine = { + val storageConfig = jobContext.getConfig.subset("stores." + storeName + ".", true) val storeFactory = storageConfig.get("factory") var storePropertiesBuilder = new StoreProperties.StorePropertiesBuilder() val accessLog = storageConfig.getBoolean("accesslog.enabled", false) @@ -106,7 +108,8 @@ trait BaseKeyValueStorageEngineFactory[K, V] extends StorageEngineFactory[K, V] throw new SamzaException("Must define a message serde when using key value storage.") } - val rawStore = getKVStore(storeName, storeDir, registry, changeLogSystemStreamPartition, containerContext) + val rawStore = + getKVStore(storeName, storeDir, registry, changeLogSystemStreamPartition, jobContext, containerContext) // maybe wrap with logging val maybeLoggedStore = if (changeLogSystemStreamPartition == null) { @@ -141,7 +144,7 @@ trait BaseKeyValueStorageEngineFactory[K, V] extends StorageEngineFactory[K, V] // create the storage engine and return // TODO: Decide if we should use raw bytes when restoring val keyValueStorageEngineMetrics = new KeyValueStorageEngineMetrics(storeName, registry) - val clock = if (containerContext.config.getMetricsTimerEnabled) { + val clock = if (jobContext.getConfig.getMetricsTimerEnabled) { new HighResolutionClock { override def nanoTime(): Long = System.nanoTime() } http://git-wip-us.apache.org/repos/asf/samza/blob/9d2d49e9/samza-kv/src/test/java/org/apache/samza/storage/kv/TestBaseLocalStoreBackedTableProvider.java ---------------------------------------------------------------------- diff --git a/samza-kv/src/test/java/org/apache/samza/storage/kv/TestBaseLocalStoreBackedTableProvider.java b/samza-kv/src/test/java/org/apache/samza/storage/kv/TestBaseLocalStoreBackedTableProvider.java index 2b0166c..399f9fd 100644 --- a/samza-kv/src/test/java/org/apache/samza/storage/kv/TestBaseLocalStoreBackedTableProvider.java +++ b/samza-kv/src/test/java/org/apache/samza/storage/kv/TestBaseLocalStoreBackedTableProvider.java @@ -28,33 +28,33 @@ import org.apache.samza.config.JavaTableConfig; import org.apache.samza.config.JobConfig; import org.apache.samza.config.MapConfig; import org.apache.samza.config.StorageConfig; -import org.apache.samza.container.SamzaContainerContext; -import org.apache.samza.storage.StorageEngine; +import org.apache.samza.context.Context; +import org.apache.samza.context.TaskContext; import org.apache.samza.table.TableProvider; import org.apache.samza.table.TableSpec; -import org.apache.samza.task.TaskContext; import org.apache.samza.util.NoOpMetricsRegistry; import org.junit.Test; import static org.mockito.Matchers.any; -import static org.mockito.Mockito.*; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; public class TestBaseLocalStoreBackedTableProvider { @Test public void testInit() { - StorageEngine store = mock(KeyValueStorageEngine.class); - SamzaContainerContext containerContext = mock(SamzaContainerContext.class); + Context context = mock(Context.class); TaskContext taskContext = mock(TaskContext.class); - when(taskContext.getStore(any())).thenReturn(store); - when(taskContext.getMetricsRegistry()).thenReturn(new NoOpMetricsRegistry()); + when(context.getTaskContext()).thenReturn(taskContext); + when(taskContext.getStore(any())).thenReturn(mock(KeyValueStore.class)); + when(taskContext.getTaskMetricsRegistry()).thenReturn(new NoOpMetricsRegistry()); TableSpec tableSpec = mock(TableSpec.class); when(tableSpec.getId()).thenReturn("t1"); TableProvider tableProvider = createTableProvider(tableSpec); - tableProvider.init(containerContext, taskContext); + tableProvider.init(context); Assert.assertNotNull(tableProvider.getTable()); } http://git-wip-us.apache.org/repos/asf/samza/blob/9d2d49e9/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplicationContext.java ---------------------------------------------------------------------- diff --git a/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplicationContext.java b/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplicationContext.java new file mode 100644 index 0000000..6841e15 --- /dev/null +++ b/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplicationContext.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.sql.runner; + +import org.apache.samza.context.ApplicationTaskContext; +import org.apache.samza.sql.translator.TranslatorContext; + + +public class SamzaSqlApplicationContext implements ApplicationTaskContext { + private final TranslatorContext translatorContext; + + public SamzaSqlApplicationContext(TranslatorContext translatorContext) { + this.translatorContext = translatorContext; + } + + public TranslatorContext getTranslatorContext() { + return translatorContext; + } + + @Override + public void start() { + } + + @Override + public void stop() { + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/9d2d49e9/samza-sql/src/main/java/org/apache/samza/sql/translator/FilterTranslator.java ---------------------------------------------------------------------- diff --git a/samza-sql/src/main/java/org/apache/samza/sql/translator/FilterTranslator.java b/samza-sql/src/main/java/org/apache/samza/sql/translator/FilterTranslator.java index f33c5ca..77a24f8 100644 --- a/samza-sql/src/main/java/org/apache/samza/sql/translator/FilterTranslator.java +++ b/samza-sql/src/main/java/org/apache/samza/sql/translator/FilterTranslator.java @@ -21,14 +21,13 @@ package org.apache.samza.sql.translator; import java.util.Arrays; import java.util.Collections; - import org.apache.calcite.rel.logical.LogicalFilter; -import org.apache.samza.config.Config; +import org.apache.samza.context.Context; import org.apache.samza.operators.MessageStream; import org.apache.samza.operators.functions.FilterFunction; import org.apache.samza.sql.data.Expression; import org.apache.samza.sql.data.SamzaSqlRelMessage; -import org.apache.samza.task.TaskContext; +import org.apache.samza.sql.runner.SamzaSqlApplicationContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -53,8 +52,8 @@ class FilterTranslator { } @Override - public void init(Config config, TaskContext context) { - this.context = (TranslatorContext) context.getUserContext(); + public void init(Context context) { + this.context = ((SamzaSqlApplicationContext) context.getApplicationTaskContext()).getTranslatorContext(); this.filter = (LogicalFilter) this.context.getRelNode(filterId); this.expr = this.context.getExpressionCompiler().compile(filter.getInputs(), Collections.singletonList(filter.getCondition())); } http://git-wip-us.apache.org/repos/asf/samza/blob/9d2d49e9/samza-sql/src/main/java/org/apache/samza/sql/translator/ModifyTranslator.java ---------------------------------------------------------------------- diff --git a/samza-sql/src/main/java/org/apache/samza/sql/translator/ModifyTranslator.java b/samza-sql/src/main/java/org/apache/samza/sql/translator/ModifyTranslator.java index 965338f..435a2cc 100644 --- a/samza-sql/src/main/java/org/apache/samza/sql/translator/ModifyTranslator.java +++ b/samza-sql/src/main/java/org/apache/samza/sql/translator/ModifyTranslator.java @@ -26,7 +26,7 @@ import org.apache.calcite.rel.core.TableModify; import org.apache.commons.lang.Validate; import org.apache.samza.SamzaException; import org.apache.samza.application.StreamApplicationDescriptor; -import org.apache.samza.config.Config; +import org.apache.samza.context.Context; import org.apache.samza.operators.KV; import org.apache.samza.operators.MessageStream; import org.apache.samza.operators.MessageStreamImpl; @@ -39,8 +39,8 @@ import org.apache.samza.serializers.NoOpSerde; import org.apache.samza.sql.data.SamzaSqlRelMessage; import org.apache.samza.sql.interfaces.SamzaRelConverter; import org.apache.samza.sql.interfaces.SqlIOConfig; +import org.apache.samza.sql.runner.SamzaSqlApplicationContext; import org.apache.samza.table.Table; -import org.apache.samza.task.TaskContext; /** @@ -70,9 +70,10 @@ class ModifyTranslator { } @Override - public void init(Config config, TaskContext taskContext) { - TranslatorContext context = (TranslatorContext) taskContext.getUserContext(); - this.samzaMsgConverter = context.getMsgConverter(outputTopic); + public void init(Context context) { + TranslatorContext translatorContext = + ((SamzaSqlApplicationContext) context.getApplicationTaskContext()).getTranslatorContext(); + this.samzaMsgConverter = translatorContext.getMsgConverter(outputTopic); } @Override http://git-wip-us.apache.org/repos/asf/samza/blob/9d2d49e9/samza-sql/src/main/java/org/apache/samza/sql/translator/ProjectTranslator.java ---------------------------------------------------------------------- diff --git a/samza-sql/src/main/java/org/apache/samza/sql/translator/ProjectTranslator.java b/samza-sql/src/main/java/org/apache/samza/sql/translator/ProjectTranslator.java index 8e6f687..9a1ff84 100644 --- a/samza-sql/src/main/java/org/apache/samza/sql/translator/ProjectTranslator.java +++ b/samza-sql/src/main/java/org/apache/samza/sql/translator/ProjectTranslator.java @@ -31,12 +31,12 @@ import org.apache.calcite.rex.RexInputRef; import org.apache.calcite.rex.RexNode; import org.apache.calcite.sql.validate.SqlUserDefinedFunction; import org.apache.samza.SamzaException; -import org.apache.samza.config.Config; +import org.apache.samza.context.Context; import org.apache.samza.operators.MessageStream; import org.apache.samza.operators.functions.MapFunction; import org.apache.samza.sql.data.Expression; import org.apache.samza.sql.data.SamzaSqlRelMessage; -import org.apache.samza.task.TaskContext; +import org.apache.samza.sql.runner.SamzaSqlApplicationContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -61,8 +61,8 @@ class ProjectTranslator { } @Override - public void init(Config config, TaskContext taskContext) { - this.context = (TranslatorContext) taskContext.getUserContext(); + public void init(Context context) { + this.context = ((SamzaSqlApplicationContext) context.getApplicationTaskContext()).getTranslatorContext(); this.project = (Project) this.context.getRelNode(projectId); this.expr = this.context.getExpressionCompiler().compile(project.getInputs(), project.getProjects()); } http://git-wip-us.apache.org/repos/asf/samza/blob/9d2d49e9/samza-sql/src/main/java/org/apache/samza/sql/translator/QueryTranslator.java ---------------------------------------------------------------------- diff --git a/samza-sql/src/main/java/org/apache/samza/sql/translator/QueryTranslator.java b/samza-sql/src/main/java/org/apache/samza/sql/translator/QueryTranslator.java index 3a35b97..b13043f 100644 --- a/samza-sql/src/main/java/org/apache/samza/sql/translator/QueryTranslator.java +++ b/samza-sql/src/main/java/org/apache/samza/sql/translator/QueryTranslator.java @@ -31,16 +31,17 @@ import org.apache.calcite.rel.logical.LogicalJoin; import org.apache.calcite.rel.logical.LogicalProject; import org.apache.samza.SamzaException; import org.apache.samza.application.StreamApplicationDescriptor; -import org.apache.samza.config.Config; -import org.apache.samza.operators.ContextManager; +import org.apache.samza.context.Context; +import org.apache.samza.operators.KV; +import org.apache.samza.operators.functions.MapFunction; import org.apache.samza.sql.data.SamzaSqlExecutionContext; +import org.apache.samza.sql.data.SamzaSqlRelMessage; import org.apache.samza.sql.interfaces.SamzaRelConverter; import org.apache.samza.sql.interfaces.SqlIOResolver; import org.apache.samza.sql.planner.QueryPlanner; import org.apache.samza.sql.runner.SamzaSqlApplicationConfig; +import org.apache.samza.sql.runner.SamzaSqlApplicationContext; import org.apache.samza.sql.testutil.SamzaSqlQueryParser; -import org.apache.samza.task.TaskContext; - /** * This class is used to populate the {@link StreamApplicationDescriptor} using the SQL queries. @@ -72,8 +73,8 @@ public class QueryTranslator { public void translate(RelRoot relRoot, StreamApplicationDescriptor appDesc) { final SamzaSqlExecutionContext executionContext = new SamzaSqlExecutionContext(this.sqlConfig); - final TranslatorContext context = new TranslatorContext(appDesc, relRoot, executionContext, this.converters); - final SqlIOResolver ioResolver = context.getExecutionContext().getSamzaSqlApplicationConfig().getIoResolver(); + final TranslatorContext translatorContext = new TranslatorContext(appDesc, relRoot, executionContext, this.converters); + final SqlIOResolver ioResolver = translatorContext.getExecutionContext().getSamzaSqlApplicationConfig().getIoResolver(); final RelNode node = relRoot.project(); node.accept(new RelShuttleImpl() { @@ -93,28 +94,28 @@ public class QueryTranslator { throw new SamzaException("Not a supported operation: " + modify.toString()); } RelNode node = super.visit(modify); - modifyTranslator.translate(modify, context); + modifyTranslator.translate(modify, translatorContext); return node; } @Override public RelNode visit(TableScan scan) { RelNode node = super.visit(scan); - scanTranslator.translate(scan, context); + scanTranslator.translate(scan, translatorContext); return node; } @Override public RelNode visit(LogicalFilter filter) { RelNode node = visitChild(filter, 0, filter.getInput()); - new FilterTranslator().translate(filter, context); + new FilterTranslator().translate(filter, translatorContext); return node; } @Override public RelNode visit(LogicalProject project) { RelNode node = super.visit(project); - new ProjectTranslator().translate(project, context); + new ProjectTranslator().translate(project, translatorContext); return node; } @@ -122,7 +123,7 @@ public class QueryTranslator { public RelNode visit(LogicalJoin join) { RelNode node = super.visit(join); joinId++; - new JoinTranslator(joinId, ioResolver).translate(join, context); + new JoinTranslator(joinId, ioResolver).translate(join, translatorContext); return node; } @@ -130,23 +131,21 @@ public class QueryTranslator { public RelNode visit(LogicalAggregate aggregate) { RelNode node = super.visit(aggregate); windowId++; - new LogicalAggregateTranslator(windowId).translate(aggregate, context); + new LogicalAggregateTranslator(windowId).translate(aggregate, translatorContext); return node; } }); - appDesc.withContextManager(new ContextManager() { - @Override - public void init(Config config, TaskContext taskContext) { - taskContext.setUserContext(context.clone()); - } - - @Override - public void close() { - - } - - }); - + /* + * TODO When serialization of ApplicationDescriptor is actually needed, then something will need to be updated here, + * since translatorContext is not Serializable. Currently, a new ApplicationDescriptor instance is created in each + * container, so it does not need to be serialized. Therefore, the translatorContext is recreated in each container + * and does not need to be serialized. + */ + appDesc.withApplicationTaskContextFactory((jobContext, + containerContext, + taskContext, + applicationContainerContext) -> + new SamzaSqlApplicationContext(translatorContext.clone())); } } http://git-wip-us.apache.org/repos/asf/samza/blob/9d2d49e9/samza-sql/src/main/java/org/apache/samza/sql/translator/ScanTranslator.java ---------------------------------------------------------------------- diff --git a/samza-sql/src/main/java/org/apache/samza/sql/translator/ScanTranslator.java b/samza-sql/src/main/java/org/apache/samza/sql/translator/ScanTranslator.java index 771a5d5..be94160 100644 --- a/samza-sql/src/main/java/org/apache/samza/sql/translator/ScanTranslator.java +++ b/samza-sql/src/main/java/org/apache/samza/sql/translator/ScanTranslator.java @@ -24,7 +24,7 @@ import java.util.Map; import org.apache.calcite.rel.core.TableScan; import org.apache.commons.lang.Validate; import org.apache.samza.application.StreamApplicationDescriptor; -import org.apache.samza.config.Config; +import org.apache.samza.context.Context; import org.apache.samza.operators.KV; import org.apache.samza.operators.MessageStream; import org.apache.samza.operators.descriptors.DelegatingSystemDescriptor; @@ -34,8 +34,8 @@ import org.apache.samza.serializers.KVSerde; import org.apache.samza.serializers.NoOpSerde; import org.apache.samza.sql.data.SamzaSqlRelMessage; import org.apache.samza.sql.interfaces.SamzaRelConverter; -import org.apache.samza.task.TaskContext; import org.apache.samza.sql.interfaces.SqlIOConfig; +import org.apache.samza.sql.runner.SamzaSqlApplicationContext; /** @@ -64,9 +64,10 @@ class ScanTranslator { } @Override - public void init(Config config, TaskContext taskContext) { - TranslatorContext context = (TranslatorContext) taskContext.getUserContext(); - this.msgConverter = context.getMsgConverter(streamName); + public void init(Context context) { + TranslatorContext translatorContext = + ((SamzaSqlApplicationContext) context.getApplicationTaskContext()).getTranslatorContext(); + this.msgConverter = translatorContext.getMsgConverter(streamName); } @Override http://git-wip-us.apache.org/repos/asf/samza/blob/9d2d49e9/samza-sql/src/test/java/org/apache/samza/sql/e2e/TestSamzaSqlTable.java ---------------------------------------------------------------------- diff --git a/samza-sql/src/test/java/org/apache/samza/sql/e2e/TestSamzaSqlTable.java b/samza-sql/src/test/java/org/apache/samza/sql/e2e/TestSamzaSqlTable.java index 2005c21..ec0a993 100644 --- a/samza-sql/src/test/java/org/apache/samza/sql/e2e/TestSamzaSqlTable.java +++ b/samza-sql/src/test/java/org/apache/samza/sql/e2e/TestSamzaSqlTable.java @@ -22,7 +22,6 @@ package org.apache.samza.sql.e2e; import java.util.Arrays; import java.util.List; import java.util.Map; - import org.apache.samza.config.MapConfig; import org.apache.samza.sql.runner.SamzaSqlApplicationConfig; import org.apache.samza.sql.runner.SamzaSqlApplicationRunner; http://git-wip-us.apache.org/repos/asf/samza/blob/9d2d49e9/samza-sql/src/test/java/org/apache/samza/sql/runner/TestSamzaSqlApplicationRunner.java ---------------------------------------------------------------------- diff --git a/samza-sql/src/test/java/org/apache/samza/sql/runner/TestSamzaSqlApplicationRunner.java b/samza-sql/src/test/java/org/apache/samza/sql/runner/TestSamzaSqlApplicationRunner.java index 1ac804e..f0df3a9 100644 --- a/samza-sql/src/test/java/org/apache/samza/sql/runner/TestSamzaSqlApplicationRunner.java +++ b/samza-sql/src/test/java/org/apache/samza/sql/runner/TestSamzaSqlApplicationRunner.java @@ -20,14 +20,12 @@ package org.apache.samza.sql.runner; import java.util.Map; - import org.apache.samza.config.Config; import org.apache.samza.config.MapConfig; import org.apache.samza.runtime.LocalApplicationRunner; import org.apache.samza.runtime.RemoteApplicationRunner; import org.apache.samza.sql.testutil.SamzaSqlTestConfig; import org.junit.Assert; - import org.junit.Test; http://git-wip-us.apache.org/repos/asf/samza/blob/9d2d49e9/samza-sql/src/test/java/org/apache/samza/sql/system/TestAvroSystemFactory.java ---------------------------------------------------------------------- diff --git a/samza-sql/src/test/java/org/apache/samza/sql/system/TestAvroSystemFactory.java b/samza-sql/src/test/java/org/apache/samza/sql/system/TestAvroSystemFactory.java index 458196f..fd811cd 100644 --- a/samza-sql/src/test/java/org/apache/samza/sql/system/TestAvroSystemFactory.java +++ b/samza-sql/src/test/java/org/apache/samza/sql/system/TestAvroSystemFactory.java @@ -28,7 +28,6 @@ import java.util.Map; import java.util.Set; import java.util.stream.Collectors; import java.util.stream.IntStream; - import org.apache.avro.generic.GenericData; import org.apache.avro.generic.GenericRecord; import org.apache.samza.config.Config; http://git-wip-us.apache.org/repos/asf/samza/blob/9d2d49e9/samza-sql/src/test/java/org/apache/samza/sql/testutil/TestIOResolverFactory.java ---------------------------------------------------------------------- diff --git a/samza-sql/src/test/java/org/apache/samza/sql/testutil/TestIOResolverFactory.java b/samza-sql/src/test/java/org/apache/samza/sql/testutil/TestIOResolverFactory.java index 8318e8a..4c78b5a 100644 --- a/samza-sql/src/test/java/org/apache/samza/sql/testutil/TestIOResolverFactory.java +++ b/samza-sql/src/test/java/org/apache/samza/sql/testutil/TestIOResolverFactory.java @@ -24,7 +24,6 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.CompletableFuture; - import org.apache.commons.lang.NotImplementedException; import org.apache.samza.config.Config; import org.apache.samza.operators.BaseTableDescriptor; http://git-wip-us.apache.org/repos/asf/samza/blob/9d2d49e9/samza-sql/src/test/java/org/apache/samza/sql/testutil/TestSamzaSqlFileParser.java ---------------------------------------------------------------------- diff --git a/samza-sql/src/test/java/org/apache/samza/sql/testutil/TestSamzaSqlFileParser.java b/samza-sql/src/test/java/org/apache/samza/sql/testutil/TestSamzaSqlFileParser.java index a84f347..dd98b92 100644 --- a/samza-sql/src/test/java/org/apache/samza/sql/testutil/TestSamzaSqlFileParser.java +++ b/samza-sql/src/test/java/org/apache/samza/sql/testutil/TestSamzaSqlFileParser.java @@ -23,8 +23,6 @@ import java.io.File; import java.io.IOException; import java.io.PrintWriter; import java.util.List; - -import org.apache.samza.sql.testutil.SqlFileParser; import org.junit.Assert; import org.junit.Test; http://git-wip-us.apache.org/repos/asf/samza/blob/9d2d49e9/samza-sql/src/test/java/org/apache/samza/sql/translator/TestFilterTranslator.java ---------------------------------------------------------------------- diff --git a/samza-sql/src/test/java/org/apache/samza/sql/translator/TestFilterTranslator.java b/samza-sql/src/test/java/org/apache/samza/sql/translator/TestFilterTranslator.java index e7c2195..07ebe33 100644 --- a/samza-sql/src/test/java/org/apache/samza/sql/translator/TestFilterTranslator.java +++ b/samza-sql/src/test/java/org/apache/samza/sql/translator/TestFilterTranslator.java @@ -21,14 +21,11 @@ package org.apache.samza.sql.translator; import java.io.IOException; import java.util.ArrayList; -import java.util.HashSet; import org.apache.calcite.DataContext; import org.apache.calcite.rel.RelNode; import org.apache.calcite.rel.logical.LogicalFilter; import org.apache.samza.application.StreamApplicationDescriptorImpl; -import org.apache.samza.config.Config; -import org.apache.samza.container.TaskContextImpl; -import org.apache.samza.container.TaskName; +import org.apache.samza.context.Context; import org.apache.samza.operators.MessageStream; import org.apache.samza.operators.MessageStreamImpl; import org.apache.samza.operators.functions.FilterFunction; @@ -38,6 +35,7 @@ import org.apache.samza.sql.data.Expression; import org.apache.samza.sql.data.RexToJavaCompiler; import org.apache.samza.sql.data.SamzaSqlExecutionContext; import org.apache.samza.sql.data.SamzaSqlRelMessage; +import org.apache.samza.sql.runner.SamzaSqlApplicationContext; import org.junit.Test; import org.junit.runner.RunWith; import org.mockito.internal.util.reflection.Whitebox; @@ -50,8 +48,8 @@ import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; import static org.mockito.Matchers.any; -import static org.mockito.Mockito.doAnswer; import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; @@ -96,11 +94,9 @@ public class TestFilterTranslator extends TranslatorTestBase { assertEquals(filterSpec.getOpCode(), OperatorSpec.OpCode.FILTER); // Verify that the describe() method will establish the context for the filter function - Config mockConfig = mock(Config.class); - TaskContextImpl taskContext = new TaskContextImpl(new TaskName("Partition-1"), null, null, - new HashSet<>(), null, null, null, null, null, null); - taskContext.setUserContext(mockContext); - filterSpec.getTransformFn().init(mockConfig, taskContext); + Context context = mock(Context.class); + when(context.getApplicationTaskContext()).thenReturn(new SamzaSqlApplicationContext(mockContext)); + filterSpec.getTransformFn().init(context); FilterFunction filterFn = (FilterFunction) Whitebox.getInternalState(filterSpec, "filterFn"); assertNotNull(filterFn); assertEquals(mockContext, Whitebox.getInternalState(filterFn, "context")); http://git-wip-us.apache.org/repos/asf/samza/blob/9d2d49e9/samza-sql/src/test/java/org/apache/samza/sql/translator/TestProjectTranslator.java ---------------------------------------------------------------------- diff --git a/samza-sql/src/test/java/org/apache/samza/sql/translator/TestProjectTranslator.java b/samza-sql/src/test/java/org/apache/samza/sql/translator/TestProjectTranslator.java index 3046c1f..2ed7a00 100644 --- a/samza-sql/src/test/java/org/apache/samza/sql/translator/TestProjectTranslator.java +++ b/samza-sql/src/test/java/org/apache/samza/sql/translator/TestProjectTranslator.java @@ -21,7 +21,6 @@ package org.apache.samza.sql.translator; import java.io.IOException; import java.util.ArrayList; import java.util.Collection; -import java.util.HashSet; import java.util.List; import org.apache.calcite.DataContext; import org.apache.calcite.rel.RelNode; @@ -33,9 +32,7 @@ import org.apache.calcite.rex.RexNode; import org.apache.calcite.sql.validate.SqlUserDefinedFunction; import org.apache.calcite.util.Pair; import org.apache.samza.application.StreamApplicationDescriptorImpl; -import org.apache.samza.config.Config; -import org.apache.samza.container.TaskContextImpl; -import org.apache.samza.container.TaskName; +import org.apache.samza.context.Context; import org.apache.samza.operators.MessageStream; import org.apache.samza.operators.MessageStreamImpl; import org.apache.samza.operators.functions.MapFunction; @@ -47,6 +44,7 @@ import org.apache.samza.sql.data.Expression; import org.apache.samza.sql.data.RexToJavaCompiler; import org.apache.samza.sql.data.SamzaSqlExecutionContext; import org.apache.samza.sql.data.SamzaSqlRelMessage; +import org.apache.samza.sql.runner.SamzaSqlApplicationContext; import org.junit.Test; import org.junit.runner.RunWith; import org.mockito.internal.util.reflection.Whitebox; @@ -58,8 +56,8 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; import static org.mockito.Matchers.any; -import static org.mockito.Mockito.doAnswer; import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; @@ -114,11 +112,9 @@ public class TestProjectTranslator extends TranslatorTestBase { assertEquals(projectSpec.getOpCode(), OperatorSpec.OpCode.MAP); // Verify that the bootstrap() method will establish the context for the map function - Config mockConfig = mock(Config.class); - TaskContextImpl taskContext = new TaskContextImpl(new TaskName("Partition-1"), null, null, - new HashSet<>(), null, null, null, null, null, null); - taskContext.setUserContext(mockContext); - projectSpec.getTransformFn().init(mockConfig, taskContext); + Context context = mock(Context.class); + when(context.getApplicationTaskContext()).thenReturn(new SamzaSqlApplicationContext(mockContext)); + projectSpec.getTransformFn().init(context); MapFunction mapFn = (MapFunction) Whitebox.getInternalState(projectSpec, "mapFn"); assertNotNull(mapFn); assertEquals(mockContext, Whitebox.getInternalState(mapFn, "context")); @@ -249,11 +245,9 @@ public class TestProjectTranslator extends TranslatorTestBase { assertEquals(projectSpec.getOpCode(), OperatorSpec.OpCode.MAP); // Verify that the describe() method will establish the context for the map function - Config mockConfig = mock(Config.class); - TaskContextImpl taskContext = new TaskContextImpl(new TaskName("Partition-1"), null, null, - new HashSet<>(), null, null, null, null, null, null); - taskContext.setUserContext(mockContext); - projectSpec.getTransformFn().init(mockConfig, taskContext); + Context context = mock(Context.class); + when(context.getApplicationTaskContext()).thenReturn(new SamzaSqlApplicationContext(mockContext)); + projectSpec.getTransformFn().init(context); MapFunction mapFn = (MapFunction) Whitebox.getInternalState(projectSpec, "mapFn"); assertNotNull(mapFn); assertEquals(mockContext, Whitebox.getInternalState(mapFn, "context")); @@ -285,5 +279,4 @@ public class TestProjectTranslator extends TranslatorTestBase { }}); } - }
