Repository: samza Updated Branches: refs/heads/master d2c9e8162 -> 9d2d49e9e
http://git-wip-us.apache.org/repos/asf/samza/blob/9d2d49e9/samza-sql/src/test/java/org/apache/samza/sql/translator/TestQueryTranslator.java ---------------------------------------------------------------------- diff --git a/samza-sql/src/test/java/org/apache/samza/sql/translator/TestQueryTranslator.java b/samza-sql/src/test/java/org/apache/samza/sql/translator/TestQueryTranslator.java index e5d3659..7a194db 100644 --- a/samza-sql/src/test/java/org/apache/samza/sql/translator/TestQueryTranslator.java +++ b/samza-sql/src/test/java/org/apache/samza/sql/translator/TestQueryTranslator.java @@ -21,7 +21,6 @@ package org.apache.samza.sql.translator; import java.util.Collection; import java.util.HashMap; -import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.stream.Collectors; @@ -30,13 +29,13 @@ import org.apache.samza.application.StreamApplicationDescriptorImpl; import org.apache.samza.config.Config; import org.apache.samza.config.MapConfig; import org.apache.samza.config.StreamConfig; -import org.apache.samza.container.TaskContextImpl; -import org.apache.samza.container.TaskName; +import org.apache.samza.context.ApplicationTaskContext; import org.apache.samza.operators.OperatorSpecGraph; import org.apache.samza.operators.spec.OperatorSpec; import org.apache.samza.sql.data.SamzaSqlExecutionContext; import org.apache.samza.sql.impl.ConfigBasedIOResolverFactory; import org.apache.samza.sql.runner.SamzaSqlApplicationConfig; +import org.apache.samza.sql.runner.SamzaSqlApplicationContext; import org.apache.samza.sql.runner.SamzaSqlApplicationRunner; import org.apache.samza.sql.testutil.SamzaSqlQueryParser; import org.apache.samza.sql.testutil.SamzaSqlTestConfig; @@ -45,7 +44,9 @@ import org.junit.Before; import org.junit.Test; import org.mockito.internal.util.reflection.Whitebox; -import static org.apache.samza.sql.dsl.SamzaSqlDslConverter.*; +import static org.apache.samza.sql.dsl.SamzaSqlDslConverter.fetchQueryInfo; +import static org.apache.samza.sql.dsl.SamzaSqlDslConverter.fetchSqlFromConfig; +import static org.junit.Assert.assertTrue; public class TestQueryTranslator { @@ -53,11 +54,11 @@ public class TestQueryTranslator { // Helper functions to validate the cloned copies of TranslatorContext and SamzaSqlExecutionContext private void validateClonedTranslatorContext(TranslatorContext originContext, TranslatorContext clonedContext) { Assert.assertNotEquals(originContext, clonedContext); - Assert.assertTrue(originContext.getExpressionCompiler() == clonedContext.getExpressionCompiler()); - Assert.assertTrue(originContext.getStreamAppDescriptor() == clonedContext.getStreamAppDescriptor()); - Assert.assertTrue(Whitebox.getInternalState(originContext, "relSamzaConverters") == Whitebox.getInternalState(clonedContext, "relSamzaConverters")); - Assert.assertTrue(Whitebox.getInternalState(originContext, "messageStreams") == Whitebox.getInternalState(clonedContext, "messageStreams")); - Assert.assertTrue(Whitebox.getInternalState(originContext, "relNodes") == Whitebox.getInternalState(clonedContext, "relNodes")); + assertTrue(originContext.getExpressionCompiler() == clonedContext.getExpressionCompiler()); + assertTrue(originContext.getStreamAppDescriptor() == clonedContext.getStreamAppDescriptor()); + assertTrue(Whitebox.getInternalState(originContext, "relSamzaConverters") == Whitebox.getInternalState(clonedContext, "relSamzaConverters")); + assertTrue(Whitebox.getInternalState(originContext, "messageStreams") == Whitebox.getInternalState(clonedContext, "messageStreams")); + assertTrue(Whitebox.getInternalState(originContext, "relNodes") == Whitebox.getInternalState(clonedContext, "relNodes")); Assert.assertNotEquals(originContext.getDataContext(), clonedContext.getDataContext()); validateClonedExecutionContext(originContext.getExecutionContext(), clonedContext.getExecutionContext()); } @@ -65,11 +66,11 @@ public class TestQueryTranslator { private void validateClonedExecutionContext(SamzaSqlExecutionContext originContext, SamzaSqlExecutionContext clonedContext) { Assert.assertNotEquals(originContext, clonedContext); - Assert.assertTrue( + assertTrue( Whitebox.getInternalState(originContext, "sqlConfig") == Whitebox.getInternalState(clonedContext, "sqlConfig")); - Assert.assertTrue(Whitebox.getInternalState(originContext, "udfMetadata") == Whitebox.getInternalState(clonedContext, + assertTrue(Whitebox.getInternalState(originContext, "udfMetadata") == Whitebox.getInternalState(clonedContext, "udfMetadata")); - Assert.assertTrue(Whitebox.getInternalState(originContext, "udfInstances") != Whitebox.getInternalState(clonedContext, + assertTrue(Whitebox.getInternalState(originContext, "udfInstances") != Whitebox.getInternalState(clonedContext, "udfInstances")); } @@ -121,18 +122,14 @@ public class TestQueryTranslator { private void validatePerTaskContextInit(StreamApplicationDescriptorImpl appDesc, Config samzaConfig) { // make sure that each task context would have a separate instance of cloned TranslatorContext - TaskContextImpl testContext = new TaskContextImpl(new TaskName("Partition 1"), null, null, - new HashSet<>(), null, null, null, null, null, null); - // call ContextManager.bootstrap() to instantiate the per-task TranslatorContext - appDesc.getContextManager().init(samzaConfig, testContext); - Assert.assertNotNull(testContext.getUserContext()); - Assert.assertTrue(testContext.getUserContext() instanceof TranslatorContext); - TranslatorContext contextPerTaskOne = (TranslatorContext) testContext.getUserContext(); - // call ContextManager.bootstrap() second time to instantiate another clone of TranslatorContext - appDesc.getContextManager().init(samzaConfig, testContext); - Assert.assertTrue(testContext.getUserContext() instanceof TranslatorContext); - // validate the two copies of TranslatorContext are clones of each other - validateClonedTranslatorContext(contextPerTaskOne, (TranslatorContext) testContext.getUserContext()); + ApplicationTaskContext contextPerTaskOne = + appDesc.getApplicationTaskContextFactory().get().create(null, null, null, null); + ApplicationTaskContext contextPerTaskTwo = + appDesc.getApplicationTaskContextFactory().get().create(null, null, null, null); + assertTrue(contextPerTaskOne instanceof SamzaSqlApplicationContext); + assertTrue(contextPerTaskTwo instanceof SamzaSqlApplicationContext); + validateClonedTranslatorContext(((SamzaSqlApplicationContext) contextPerTaskOne).getTranslatorContext(), + ((SamzaSqlApplicationContext) contextPerTaskTwo).getTranslatorContext()); } @Test @@ -734,7 +731,7 @@ public class TestQueryTranslator { Assert.assertEquals(1, specGraph.getInputOperators().size()); Assert.assertEquals(1, specGraph.getOutputStreams().size()); - Assert.assertTrue(specGraph.hasWindowOrJoins()); + assertTrue(specGraph.hasWindowOrJoins()); Collection<OperatorSpec> operatorSpecs = specGraph.getAllOperatorSpecs(); } http://git-wip-us.apache.org/repos/asf/samza/blob/9d2d49e9/samza-test/src/main/java/org/apache/samza/example/KeyValueStoreExample.java ---------------------------------------------------------------------- diff --git a/samza-test/src/main/java/org/apache/samza/example/KeyValueStoreExample.java b/samza-test/src/main/java/org/apache/samza/example/KeyValueStoreExample.java index dfc4b42..e991c4e 100644 --- a/samza-test/src/main/java/org/apache/samza/example/KeyValueStoreExample.java +++ b/samza-test/src/main/java/org/apache/samza/example/KeyValueStoreExample.java @@ -25,6 +25,7 @@ import java.util.concurrent.TimeUnit; import org.apache.samza.application.StreamApplication; import org.apache.samza.application.StreamApplicationDescriptor; import org.apache.samza.config.Config; +import org.apache.samza.context.Context; import org.apache.samza.operators.KV; import org.apache.samza.operators.MessageStream; import org.apache.samza.operators.OutputStream; @@ -38,7 +39,6 @@ import org.apache.samza.storage.kv.KeyValueStore; import org.apache.samza.system.kafka.KafkaInputDescriptor; import org.apache.samza.system.kafka.KafkaOutputDescriptor; import org.apache.samza.system.kafka.KafkaSystemDescriptor; -import org.apache.samza.task.TaskContext; import org.apache.samza.util.CommandLine; @@ -115,8 +115,9 @@ public class KeyValueStoreExample implements StreamApplication { } @Override - public void init(Config config, TaskContext context) { - this.statsStore = (KeyValueStore<String, StatsWindowState>) context.getStore("my-stats-wnd-store"); + public void init(Context context) { + this.statsStore = + (KeyValueStore<String, StatsWindowState>) context.getTaskContext().getStore("my-stats-wnd-store"); } } http://git-wip-us.apache.org/repos/asf/samza/blob/9d2d49e9/samza-test/src/main/java/org/apache/samza/test/framework/MessageStreamAssert.java ---------------------------------------------------------------------- diff --git a/samza-test/src/main/java/org/apache/samza/test/framework/MessageStreamAssert.java b/samza-test/src/main/java/org/apache/samza/test/framework/MessageStreamAssert.java index 1a1c24c..de98feb 100644 --- a/samza-test/src/main/java/org/apache/samza/test/framework/MessageStreamAssert.java +++ b/samza-test/src/main/java/org/apache/samza/test/framework/MessageStreamAssert.java @@ -21,9 +21,7 @@ package org.apache.samza.test.framework; import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.Iterables; -import java.io.IOException; -import java.io.ObjectInputStream; -import org.apache.samza.config.Config; +import org.apache.samza.context.Context; import org.apache.samza.operators.MessageStream; import org.apache.samza.operators.functions.SinkFunction; import org.apache.samza.serializers.KVSerde; @@ -31,10 +29,11 @@ import org.apache.samza.serializers.Serde; import org.apache.samza.serializers.StringSerde; import org.apache.samza.system.SystemStreamPartition; import org.apache.samza.task.MessageCollector; -import org.apache.samza.task.TaskContext; import org.apache.samza.task.TaskCoordinator; import org.hamcrest.Matchers; +import java.io.IOException; +import java.io.ObjectInputStream; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -148,10 +147,12 @@ class MessageStreamAssert<M> { } @Override - public void init(Config config, TaskContext context) { - final SystemStreamPartition ssp = Iterables.getFirst(context.getSystemStreamPartitions(), null); + public void init(Context context) { + final SystemStreamPartition ssp = + Iterables.getFirst(context.getTaskContext().getTaskModel().getSystemStreamPartitions(), null); if (ssp != null || ssp.getPartition().getPartitionId() == 0) { - final int count = checkEachTask ? context.getSamzaContainerContext().taskNames.size() : 1; + final int count = + checkEachTask ? context.getContainerContext().getContainerModel().getTasks().keySet().size() : 1; LATCHES.put(id, new CountDownLatch(count)); timer.schedule(timerTask, TIMEOUT); } http://git-wip-us.apache.org/repos/asf/samza/blob/9d2d49e9/samza-test/src/main/java/org/apache/samza/test/integration/NegateNumberTask.java ---------------------------------------------------------------------- diff --git a/samza-test/src/main/java/org/apache/samza/test/integration/NegateNumberTask.java b/samza-test/src/main/java/org/apache/samza/test/integration/NegateNumberTask.java index 4309d92..80057d4 100644 --- a/samza-test/src/main/java/org/apache/samza/test/integration/NegateNumberTask.java +++ b/samza-test/src/main/java/org/apache/samza/test/integration/NegateNumberTask.java @@ -19,15 +19,14 @@ package org.apache.samza.test.integration; -import org.apache.samza.config.Config; import org.apache.samza.config.ConfigException; +import org.apache.samza.context.Context; import org.apache.samza.system.IncomingMessageEnvelope; import org.apache.samza.system.OutgoingMessageEnvelope; import org.apache.samza.system.SystemStream; import org.apache.samza.task.InitableTask; import org.apache.samza.task.MessageCollector; import org.apache.samza.task.StreamTask; -import org.apache.samza.task.TaskContext; import org.apache.samza.task.TaskCoordinator; import org.apache.samza.task.TaskCoordinator.RequestScope; import org.apache.samza.util.StreamUtil; @@ -53,9 +52,9 @@ public class NegateNumberTask implements StreamTask, InitableTask { private SystemStream outputSystemStream; @Override - public void init(Config config, TaskContext context) throws Exception { - maxMessages = config.getInt("task.max.messages", 50); - String outputSystemStreamString = config.get("task.outputs", null); + public void init(Context context) throws Exception { + maxMessages = context.getJobContext().getConfig().getInt("task.max.messages", 50); + String outputSystemStreamString = context.getJobContext().getConfig().get("task.outputs", null); if (outputSystemStreamString == null) { throw new ConfigException("Missing required configuration: task.outputs"); } http://git-wip-us.apache.org/repos/asf/samza/blob/9d2d49e9/samza-test/src/main/java/org/apache/samza/test/integration/SimpleStatefulTask.java ---------------------------------------------------------------------- diff --git a/samza-test/src/main/java/org/apache/samza/test/integration/SimpleStatefulTask.java b/samza-test/src/main/java/org/apache/samza/test/integration/SimpleStatefulTask.java index 6fafabc..213fc71 100644 --- a/samza-test/src/main/java/org/apache/samza/test/integration/SimpleStatefulTask.java +++ b/samza-test/src/main/java/org/apache/samza/test/integration/SimpleStatefulTask.java @@ -19,7 +19,7 @@ package org.apache.samza.test.integration; -import org.apache.samza.config.Config; +import org.apache.samza.context.Context; import org.apache.samza.storage.kv.Entry; import org.apache.samza.storage.kv.KeyValueIterator; import org.apache.samza.storage.kv.KeyValueStore; @@ -27,22 +27,21 @@ import org.apache.samza.system.IncomingMessageEnvelope; import org.apache.samza.task.InitableTask; import org.apache.samza.task.MessageCollector; import org.apache.samza.task.StreamTask; -import org.apache.samza.task.TaskContext; import org.apache.samza.task.TaskCoordinator; import org.apache.samza.task.TaskCoordinator.RequestScope; /** * This is a simple task that writes each message to a state store and prints them all out on reload. - * + * * It is useful for command line testing with the kafka console producer and consumer and text messages. */ public class SimpleStatefulTask implements StreamTask, InitableTask { - + private KeyValueStore<String, String> store; @SuppressWarnings("unchecked") - public void init(Config config, TaskContext context) { - this.store = (KeyValueStore<String, String>) context.getStore("mystore"); + public void init(Context context) { + this.store = (KeyValueStore<String, String>) context.getTaskContext().getStore("mystore"); System.out.println("Contents of store: "); KeyValueIterator<String, String> iter = store.all(); while (iter.hasNext()) { @@ -51,7 +50,7 @@ public class SimpleStatefulTask implements StreamTask, InitableTask { } iter.close(); } - + public void process(IncomingMessageEnvelope envelope, MessageCollector collector, TaskCoordinator coordinator) { System.out.println("Adding " + envelope.getMessage() + " => " + envelope.getMessage() + " to the store."); store.put((String) envelope.getMessage(), (String) envelope.getMessage()); http://git-wip-us.apache.org/repos/asf/samza/blob/9d2d49e9/samza-test/src/main/java/org/apache/samza/test/integration/StatePerfTestTask.java ---------------------------------------------------------------------- diff --git a/samza-test/src/main/java/org/apache/samza/test/integration/StatePerfTestTask.java b/samza-test/src/main/java/org/apache/samza/test/integration/StatePerfTestTask.java index 1d524a8..a637ea0 100644 --- a/samza-test/src/main/java/org/apache/samza/test/integration/StatePerfTestTask.java +++ b/samza-test/src/main/java/org/apache/samza/test/integration/StatePerfTestTask.java @@ -19,13 +19,12 @@ package org.apache.samza.test.integration; -import org.apache.samza.config.Config; +import org.apache.samza.context.Context; import org.apache.samza.storage.kv.KeyValueStore; import org.apache.samza.system.IncomingMessageEnvelope; import org.apache.samza.task.InitableTask; import org.apache.samza.task.MessageCollector; import org.apache.samza.task.StreamTask; -import org.apache.samza.task.TaskContext; import org.apache.samza.task.TaskCoordinator; import org.apache.samza.task.TaskCoordinator.RequestScope; @@ -41,10 +40,10 @@ public class StatePerfTestTask implements StreamTask, InitableTask { private long start = System.currentTimeMillis(); @SuppressWarnings("unchecked") - public void init(Config config, TaskContext context) { - this.store = (KeyValueStore<String, String>) context.getStore("mystore"); + public void init(Context context) { + this.store = (KeyValueStore<String, String>) context.getTaskContext().getStore("mystore"); } - + public void process(IncomingMessageEnvelope envelope, MessageCollector collector, TaskCoordinator coordinator) { store.put((String) envelope.getMessage(), (String) envelope.getMessage()); count++; http://git-wip-us.apache.org/repos/asf/samza/blob/9d2d49e9/samza-test/src/main/java/org/apache/samza/test/integration/join/Checker.java ---------------------------------------------------------------------- diff --git a/samza-test/src/main/java/org/apache/samza/test/integration/join/Checker.java b/samza-test/src/main/java/org/apache/samza/test/integration/join/Checker.java index ea3aeb1..0238d23 100644 --- a/samza-test/src/main/java/org/apache/samza/test/integration/join/Checker.java +++ b/samza-test/src/main/java/org/apache/samza/test/integration/join/Checker.java @@ -19,7 +19,7 @@ package org.apache.samza.test.integration.join; -import org.apache.samza.config.Config; +import org.apache.samza.context.Context; import org.apache.samza.storage.kv.Entry; import org.apache.samza.storage.kv.KeyValueIterator; import org.apache.samza.storage.kv.KeyValueStore; @@ -29,14 +29,13 @@ import org.apache.samza.system.SystemStream; import org.apache.samza.task.InitableTask; import org.apache.samza.task.MessageCollector; import org.apache.samza.task.StreamTask; -import org.apache.samza.task.TaskContext; import org.apache.samza.task.TaskCoordinator; import org.apache.samza.task.WindowableTask; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class Checker implements StreamTask, WindowableTask, InitableTask { - + private static Logger logger = LoggerFactory.getLogger(Checker.class); private static final String CURRENT_EPOCH = "current-epoch"; @@ -44,13 +43,13 @@ public class Checker implements StreamTask, WindowableTask, InitableTask { private KeyValueStore<String, String> store; private int expectedKeys; private int numPartitions; - + @Override @SuppressWarnings("unchecked") - public void init(Config config, TaskContext context) { - this.store = (KeyValueStore<String, String>) context.getStore("checker-state"); - this.expectedKeys = config.getInt("expected.keys"); - this.numPartitions = config.getInt("num.partitions"); + public void init(Context context) { + this.store = (KeyValueStore<String, String>) context.getTaskContext().getStore("checker-state"); + this.expectedKeys = context.getJobContext().getConfig().getInt("expected.keys"); + this.numPartitions = context.getJobContext().getConfig().getInt("num.partitions"); } @Override @@ -61,7 +60,7 @@ public class Checker implements StreamTask, WindowableTask, InitableTask { checkEpoch(epoch); this.store.put(key, epoch); } - + @Override public void window(MessageCollector collector, TaskCoordinator coordinator) { String currentEpoch = this.store.get(CURRENT_EPOCH); @@ -93,7 +92,7 @@ public class Checker implements StreamTask, WindowableTask, InitableTask { logger.info("Only found " + count + " valid keys, try again later."); } } - + private void checkEpoch(String epoch) { String curr = this.store.get(CURRENT_EPOCH); if (curr == null) http://git-wip-us.apache.org/repos/asf/samza/blob/9d2d49e9/samza-test/src/main/java/org/apache/samza/test/integration/join/Emitter.java ---------------------------------------------------------------------- diff --git a/samza-test/src/main/java/org/apache/samza/test/integration/join/Emitter.java b/samza-test/src/main/java/org/apache/samza/test/integration/join/Emitter.java index 6661d8e..38ef6e1 100644 --- a/samza-test/src/main/java/org/apache/samza/test/integration/join/Emitter.java +++ b/samza-test/src/main/java/org/apache/samza/test/integration/join/Emitter.java @@ -19,8 +19,8 @@ package org.apache.samza.test.integration.join; -import org.apache.samza.config.Config; import org.apache.samza.container.TaskName; +import org.apache.samza.context.Context; import org.apache.samza.storage.kv.KeyValueStore; import org.apache.samza.system.IncomingMessageEnvelope; import org.apache.samza.system.OutgoingMessageEnvelope; @@ -28,7 +28,6 @@ import org.apache.samza.system.SystemStream; import org.apache.samza.task.InitableTask; import org.apache.samza.task.MessageCollector; import org.apache.samza.task.StreamTask; -import org.apache.samza.task.TaskContext; import org.apache.samza.task.TaskCoordinator; import org.apache.samza.task.TaskCoordinator.RequestScope; import org.apache.samza.task.WindowableTask; @@ -38,9 +37,9 @@ import org.slf4j.LoggerFactory; @SuppressWarnings("unchecked") public class Emitter implements StreamTask, InitableTask, WindowableTask { - + private static Logger logger = LoggerFactory.getLogger(Emitter.class); - + private static final String EPOCH = "the-epoch"; private static final String COUNT = "the-count"; @@ -49,10 +48,10 @@ public class Emitter implements StreamTask, InitableTask, WindowableTask { private TaskName taskName; @Override - public void init(Config config, TaskContext context) { - this.state = (KeyValueStore<String, String>) context.getStore("emitter-state"); - this.taskName = context.getTaskName(); - this.max = config.getInt("count"); + public void init(Context context) { + this.state = (KeyValueStore<String, String>) context.getTaskContext().getStore("emitter-state"); + this.taskName = context.getTaskContext().getTaskModel().getTaskName(); + this.max = context.getJobContext().getConfig().getInt("count"); } @Override @@ -66,7 +65,7 @@ public class Emitter implements StreamTask, InitableTask, WindowableTask { return; if (newEpoch < epoch) throw new IllegalArgumentException("Got new epoch " + newEpoch + " which is less than current epoch " + epoch); - + // it's a new era, reset current epoch and count logger.info("Epoch: " + newEpoch); this.state.put(EPOCH, Integer.toString(newEpoch)); @@ -74,7 +73,7 @@ public class Emitter implements StreamTask, InitableTask, WindowableTask { coordinator.commit(RequestScope.ALL_TASKS_IN_CONTAINER); } } - + public void window(MessageCollector collector, TaskCoordinator coordinator) { Integer epoch = getInt(EPOCH); if (epoch == null) { @@ -89,13 +88,13 @@ public class Emitter implements StreamTask, InitableTask, WindowableTask { this.state.put(COUNT, Integer.toString(getInt(COUNT) + 1)); } } - + private void resetEpoch() { logger.info("Resetting epoch to 0"); state.put(EPOCH, "0"); state.put(COUNT, "0"); } - + private Integer getInt(String key) { String value = this.state.get(key); return value == null ? null : Integer.parseInt(value); http://git-wip-us.apache.org/repos/asf/samza/blob/9d2d49e9/samza-test/src/main/java/org/apache/samza/test/integration/join/Joiner.java ---------------------------------------------------------------------- diff --git a/samza-test/src/main/java/org/apache/samza/test/integration/join/Joiner.java b/samza-test/src/main/java/org/apache/samza/test/integration/join/Joiner.java index d1dd1f8..d006bdd 100644 --- a/samza-test/src/main/java/org/apache/samza/test/integration/join/Joiner.java +++ b/samza-test/src/main/java/org/apache/samza/test/integration/join/Joiner.java @@ -21,9 +21,8 @@ package org.apache.samza.test.integration.join; import java.util.HashSet; import java.util.Set; - -import org.apache.samza.config.Config; import org.apache.samza.container.TaskName; +import org.apache.samza.context.Context; import org.apache.samza.storage.kv.KeyValueStore; import org.apache.samza.system.IncomingMessageEnvelope; import org.apache.samza.system.OutgoingMessageEnvelope; @@ -31,25 +30,24 @@ import org.apache.samza.system.SystemStream; import org.apache.samza.task.InitableTask; import org.apache.samza.task.MessageCollector; import org.apache.samza.task.StreamTask; -import org.apache.samza.task.TaskContext; import org.apache.samza.task.TaskCoordinator; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @SuppressWarnings("unchecked") public class Joiner implements StreamTask, InitableTask { - + private static Logger logger = LoggerFactory.getLogger(Joiner.class); - + private KeyValueStore<String, String> store; private int expected; private TaskName taskName; @Override - public void init(Config config, TaskContext context) { - this.store = (KeyValueStore<String, String>) context.getStore("joiner-state"); - this.expected = config.getInt("num.partitions"); - this.taskName = context.getTaskName(); + public void init(Context context) { + this.store = (KeyValueStore<String, String>) context.getTaskContext().getStore("joiner-state"); + this.expected = context.getJobContext().getConfig().getInt("num.partitions"); + this.taskName = context.getTaskContext().getTaskModel().getTaskName(); } @Override @@ -83,7 +81,7 @@ public class Joiner implements StreamTask, InitableTask { this.store.put(key, partitions.toString()); logger.info("Join store in Task " + this.taskName + " " + key + " -> " + partitions.toString()); } - + private Partitions loadPartitions(int epoch, String key) { String current = this.store.get(key); Partitions partitions; @@ -93,16 +91,16 @@ public class Joiner implements StreamTask, InitableTask { partitions = Partitions.parse(current); return partitions; } - + private static class Partitions { int epoch; Set<Integer> partitions; - + public Partitions(int epoch, Set<Integer> partitions) { this.epoch = epoch; this.partitions = partitions; } - + public static Partitions parse(String s) { String[] pieces = s.split("\\|", -1); int epoch = Integer.parseInt(pieces[1]); @@ -111,7 +109,7 @@ public class Joiner implements StreamTask, InitableTask { set.add(Integer.parseInt(pieces[i])); return new Partitions(epoch, set); } - + public String toString() { StringBuilder b = new StringBuilder("|"); b.append(epoch); http://git-wip-us.apache.org/repos/asf/samza/blob/9d2d49e9/samza-test/src/main/java/org/apache/samza/test/integration/join/Watcher.java ---------------------------------------------------------------------- diff --git a/samza-test/src/main/java/org/apache/samza/test/integration/join/Watcher.java b/samza-test/src/main/java/org/apache/samza/test/integration/join/Watcher.java index 4a299b6..b8ab073 100644 --- a/samza-test/src/main/java/org/apache/samza/test/integration/join/Watcher.java +++ b/samza-test/src/main/java/org/apache/samza/test/integration/join/Watcher.java @@ -19,31 +19,30 @@ package org.apache.samza.test.integration.join; -import org.apache.samza.config.Config; +import org.apache.samza.context.Context; import org.apache.samza.system.IncomingMessageEnvelope; import org.apache.samza.task.InitableTask; import org.apache.samza.task.MessageCollector; import org.apache.samza.task.StreamTask; -import org.apache.samza.task.TaskContext; import org.apache.samza.task.TaskCoordinator; import org.apache.samza.task.WindowableTask; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class Watcher implements StreamTask, WindowableTask, InitableTask { - + private static Logger logger = LoggerFactory.getLogger(Watcher.class); private boolean inError = false; private long lastEpochChange = System.currentTimeMillis(); private long maxTimeBetweenEpochsMs; private int currentEpoch = 0; - + @Override - public void init(Config config, TaskContext context) { - this.maxTimeBetweenEpochsMs = config.getLong("max.time.between.epochs.ms"); + public void init(Context context) { + this.maxTimeBetweenEpochsMs = context.getJobContext().getConfig().getLong("max.time.between.epochs.ms"); } - + @Override public void process(IncomingMessageEnvelope envelope, MessageCollector collector, TaskCoordinator coordinator) { int epoch = Integer.parseInt((String) envelope.getMessage()); @@ -54,7 +53,7 @@ public class Watcher implements StreamTask, WindowableTask, InitableTask { this.inError = false; } } - + @Override public void window(MessageCollector collector, TaskCoordinator coordinator) { boolean isLagging = System.currentTimeMillis() - lastEpochChange > maxTimeBetweenEpochsMs; @@ -64,5 +63,5 @@ public class Watcher implements StreamTask, WindowableTask, InitableTask { logger.error("Job failed to make progress!" + String.format("No epoch change for %d minutes.", this.maxTimeBetweenEpochsMs / (60 * 1000))); } } - + } http://git-wip-us.apache.org/repos/asf/samza/blob/9d2d49e9/samza-test/src/main/scala/org/apache/samza/test/performance/TestKeyValuePerformance.scala ---------------------------------------------------------------------- diff --git a/samza-test/src/main/scala/org/apache/samza/test/performance/TestKeyValuePerformance.scala b/samza-test/src/main/scala/org/apache/samza/test/performance/TestKeyValuePerformance.scala index 36c86cc..1c2b333 100644 --- a/samza-test/src/main/scala/org/apache/samza/test/performance/TestKeyValuePerformance.scala +++ b/samza-test/src/main/scala/org/apache/samza/test/performance/TestKeyValuePerformance.scala @@ -20,19 +20,20 @@ package org.apache.samza.test.performance import java.io.File -import java.util -import java.util.UUID import java.util.concurrent.TimeUnit +import java.util.{Collections, UUID} import com.google.common.base.Stopwatch import org.apache.samza.config.Config import org.apache.samza.config.StorageConfig._ -import org.apache.samza.container.{SamzaContainerContext, TaskName} +import org.apache.samza.container.TaskName +import org.apache.samza.context.{ContainerContextImpl, JobContextImpl} +import org.apache.samza.job.model.{ContainerModel, TaskModel} import org.apache.samza.metrics.MetricsRegistryMap import org.apache.samza.serializers.{ByteSerde, SerdeManager, UUIDSerde} import org.apache.samza.storage.StorageEngineFactory import org.apache.samza.storage.kv.{KeyValueStorageEngine, KeyValueStore} -import org.apache.samza.system.{SystemProducer, SystemProducers} +import org.apache.samza.system.{SystemProducer, SystemProducers, SystemStreamPartition} import org.apache.samza.task.TaskInstanceCollector import org.apache.samza.util.{CommandLine, FileUtil, Logging, Util} import org.apache.samza.{Partition, SamzaException} @@ -84,9 +85,14 @@ object TestKeyValuePerformance extends Logging { } def invokeTest(testName: String, testMethod: (KeyValueStorageEngine[Array[Byte], Array[Byte]], Config) => Unit, config: Config) { - val taskNames = new java.util.ArrayList[TaskName]() val partitionCount = config.getInt("partition.count", 1) - (0 until partitionCount).map(p => taskNames.add(new TaskName(new Partition(p).toString))) + val tasks = (0 until partitionCount) + .map(i => new Partition(i)) + .map(partition => (new TaskName(partition.toString), + new TaskModel(new TaskName(partition.toString), + Collections.singleton(new SystemStreamPartition("system", "stream", partition)), + partition))) + .toMap val producerMultiplexer = new SystemProducers( Map[String, SystemProducer](), @@ -116,7 +122,8 @@ object TestKeyValuePerformance extends Logging { new TaskInstanceCollector(producerMultiplexer), new MetricsRegistryMap, null, - new SamzaContainerContext("0", config, taskNames, new MetricsRegistryMap) + JobContextImpl.fromConfigWithDefaults(config), + new ContainerContextImpl(new ContainerModel("0", tasks.asJava), new MetricsRegistryMap) ) val db = if(!engine.isInstanceOf[KeyValueStorageEngine[_,_]]) { http://git-wip-us.apache.org/repos/asf/samza/blob/9d2d49e9/samza-test/src/main/scala/org/apache/samza/test/performance/TestPerformanceTask.scala ---------------------------------------------------------------------- diff --git a/samza-test/src/main/scala/org/apache/samza/test/performance/TestPerformanceTask.scala b/samza-test/src/main/scala/org/apache/samza/test/performance/TestPerformanceTask.scala index 99d047d..5ab8c2c 100644 --- a/samza-test/src/main/scala/org/apache/samza/test/performance/TestPerformanceTask.scala +++ b/samza-test/src/main/scala/org/apache/samza/test/performance/TestPerformanceTask.scala @@ -19,21 +19,15 @@ package org.apache.samza.test.performance -import org.apache.samza.task.TaskContext -import org.apache.samza.task.InitableTask -import org.apache.samza.system.IncomingMessageEnvelope -import org.apache.samza.task.MessageCollector -import org.apache.samza.task.StreamTask -import org.apache.samza.task.TaskCoordinator +import org.apache.samza.context.Context +import org.apache.samza.system.{IncomingMessageEnvelope, OutgoingMessageEnvelope, SystemStream} import org.apache.samza.task.TaskCoordinator.RequestScope -import org.apache.samza.config.Config +import org.apache.samza.task.{InitableTask, MessageCollector, StreamTask, TaskCoordinator} import org.apache.samza.util.{Logging, StreamUtil} -import org.apache.samza.system.SystemStream -import org.apache.samza.system.OutgoingMessageEnvelope object TestPerformanceTask { - // No thread safety is needed for these variables because they're mutated in + // No thread safety is needed for these variables because they're mutated in // the process method, which is single threaded. var messagesProcessed = 0 var startTime = 0L @@ -59,7 +53,7 @@ object TestPerformanceTask { * <pre> * task.outputs=kafka.MyOutputTopic * <pre> - * + * * If undefined, the task simply drops incoming messages, rather than * forwarding them to the output stream. */ @@ -82,7 +76,8 @@ class TestPerformanceTask extends StreamTask with InitableTask with Logging { */ var outputSystemStream: Option[SystemStream] = None - def init(config: Config, context: TaskContext) { + def init(context: Context) { + val config = context.getJobContext.getConfig logInterval = config.getInt("task.log.interval", 10000) maxMessages = config.getInt("task.max.messages", 10000000) outputSystemStream = Option(config.get("task.outputs", null)).map(StreamUtil.getSystemStreamFromNames) http://git-wip-us.apache.org/repos/asf/samza/blob/9d2d49e9/samza-test/src/test/java/org/apache/samza/processor/TestZkStreamProcessorBase.java ---------------------------------------------------------------------- diff --git a/samza-test/src/test/java/org/apache/samza/processor/TestZkStreamProcessorBase.java b/samza-test/src/test/java/org/apache/samza/processor/TestZkStreamProcessorBase.java index 66cf061..585af0f 100644 --- a/samza-test/src/test/java/org/apache/samza/processor/TestZkStreamProcessorBase.java +++ b/samza-test/src/test/java/org/apache/samza/processor/TestZkStreamProcessorBase.java @@ -44,6 +44,7 @@ import org.apache.samza.config.JobCoordinatorConfig; import org.apache.samza.config.MapConfig; import org.apache.samza.config.TaskConfigJava; import org.apache.samza.config.ZkConfig; +import org.apache.samza.context.Context; import org.apache.samza.coordinator.JobCoordinator; import org.apache.samza.coordinator.JobCoordinatorFactory; import org.apache.samza.runtime.ProcessorLifecycleListener; @@ -54,7 +55,6 @@ import org.apache.samza.task.InitableTask; import org.apache.samza.task.MessageCollector; import org.apache.samza.task.StreamTask; import org.apache.samza.task.StreamTaskFactory; -import org.apache.samza.task.TaskContext; import org.apache.samza.task.TaskCoordinator; import org.apache.samza.test.StandaloneIntegrationTestHarness; import org.apache.samza.test.StandaloneTestUtils; @@ -340,8 +340,8 @@ public class TestZkStreamProcessorBase extends StandaloneIntegrationTestHarness protected String processorIdToFail; @Override - public void init(Config config, TaskContext taskContext) - throws Exception { + public void init(Context context) { + Config config = context.getJobContext().getConfig(); this.processorId = config.get(ApplicationConfig.PROCESSOR_ID); this.outputTopic = config.get("app.outputTopic", "output"); this.outputSystem = config.get("app.outputSystem", "test-system"); http://git-wip-us.apache.org/repos/asf/samza/blob/9d2d49e9/samza-test/src/test/java/org/apache/samza/test/framework/TestSchedulingApp.java ---------------------------------------------------------------------- diff --git a/samza-test/src/test/java/org/apache/samza/test/framework/TestSchedulingApp.java b/samza-test/src/test/java/org/apache/samza/test/framework/TestSchedulingApp.java index db78e8c..1644a0f 100644 --- a/samza-test/src/test/java/org/apache/samza/test/framework/TestSchedulingApp.java +++ b/samza-test/src/test/java/org/apache/samza/test/framework/TestSchedulingApp.java @@ -26,8 +26,8 @@ import java.util.Collections; import java.util.List; import org.apache.samza.application.StreamApplication; import org.apache.samza.application.StreamApplicationDescriptor; -import org.apache.samza.operators.Scheduler; import org.apache.samza.operators.MessageStream; +import org.apache.samza.operators.Scheduler; import org.apache.samza.operators.functions.FlatMapFunction; import org.apache.samza.operators.functions.ScheduledFunction; import org.apache.samza.serializers.JsonSerdeV2; http://git-wip-us.apache.org/repos/asf/samza/blob/9d2d49e9/samza-test/src/test/java/org/apache/samza/test/processor/IdentityStreamTask.java ---------------------------------------------------------------------- diff --git a/samza-test/src/test/java/org/apache/samza/test/processor/IdentityStreamTask.java b/samza-test/src/test/java/org/apache/samza/test/processor/IdentityStreamTask.java index 4f1d1df..16e8777 100644 --- a/samza-test/src/test/java/org/apache/samza/test/processor/IdentityStreamTask.java +++ b/samza-test/src/test/java/org/apache/samza/test/processor/IdentityStreamTask.java @@ -20,13 +20,13 @@ package org.apache.samza.test.processor; import org.apache.samza.config.Config; +import org.apache.samza.context.Context; import org.apache.samza.system.IncomingMessageEnvelope; import org.apache.samza.system.OutgoingMessageEnvelope; import org.apache.samza.system.SystemStream; import org.apache.samza.task.InitableTask; import org.apache.samza.task.MessageCollector; import org.apache.samza.task.StreamTask; -import org.apache.samza.task.TaskContext; import org.apache.samza.task.TaskCoordinator; @@ -37,7 +37,8 @@ public class IdentityStreamTask implements StreamTask , InitableTask { private String outputSystem; @Override - public void init(Config config, TaskContext taskContext) throws Exception { + public void init(Context context) throws Exception { + Config config = context.getJobContext().getConfig(); this.expectedMessageCount = config.getInt("app.messageCount"); this.outputTopic = config.get("app.outputTopic", "output"); this.outputSystem = config.get("app.outputSystem", "test-system"); http://git-wip-us.apache.org/repos/asf/samza/blob/9d2d49e9/samza-test/src/test/java/org/apache/samza/test/processor/TestStreamProcessor.java ---------------------------------------------------------------------- diff --git a/samza-test/src/test/java/org/apache/samza/test/processor/TestStreamProcessor.java b/samza-test/src/test/java/org/apache/samza/test/processor/TestStreamProcessor.java index fc62b0a..e7040ca 100644 --- a/samza-test/src/test/java/org/apache/samza/test/processor/TestStreamProcessor.java +++ b/samza-test/src/test/java/org/apache/samza/test/processor/TestStreamProcessor.java @@ -52,7 +52,10 @@ import org.junit.Assert; import org.junit.Test; import scala.Option$; -import static org.mockito.Mockito.*; +import static org.mockito.Mockito.any; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.doNothing; +import static org.mockito.Mockito.mock; public class TestStreamProcessor extends StandaloneIntegrationTestHarness { http://git-wip-us.apache.org/repos/asf/samza/blob/9d2d49e9/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTable.java ---------------------------------------------------------------------- diff --git a/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTable.java b/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTable.java index da8af9e..d24cf57 100644 --- a/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTable.java +++ b/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTable.java @@ -28,13 +28,13 @@ import java.util.Map; import org.apache.samza.application.StreamApplication; import org.apache.samza.application.TaskApplication; import org.apache.samza.application.TaskApplicationDescriptor; -import org.apache.samza.config.Config; import org.apache.samza.config.JobConfig; import org.apache.samza.config.JobCoordinatorConfig; import org.apache.samza.config.MapConfig; import org.apache.samza.config.TaskConfig; -import org.apache.samza.container.SamzaContainerContext; import org.apache.samza.container.grouper.task.SingleContainerGrouperFactory; +import org.apache.samza.context.Context; +import org.apache.samza.context.TaskContext; import org.apache.samza.metrics.Counter; import org.apache.samza.metrics.Gauge; import org.apache.samza.metrics.MetricsRegistry; @@ -61,7 +61,6 @@ import org.apache.samza.task.InitableTask; import org.apache.samza.task.MessageCollector; import org.apache.samza.task.StreamTask; import org.apache.samza.task.StreamTaskFactory; -import org.apache.samza.task.TaskContext; import org.apache.samza.task.TaskCoordinator; import org.apache.samza.test.harness.AbstractIntegrationTestHarness; import org.apache.samza.test.util.ArraySystemFactory; @@ -69,8 +68,12 @@ import org.apache.samza.test.util.Base64Serializer; import org.junit.Assert; import org.junit.Test; -import static org.apache.samza.test.table.TestTableData.*; - +import static org.apache.samza.test.table.TestTableData.EnrichedPageView; +import static org.apache.samza.test.table.TestTableData.PageView; +import static org.apache.samza.test.table.TestTableData.PageViewJsonSerde; +import static org.apache.samza.test.table.TestTableData.PageViewJsonSerdeFactory; +import static org.apache.samza.test.table.TestTableData.Profile; +import static org.apache.samza.test.table.TestTableData.ProfileJsonSerde; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; import static org.mockito.Matchers.any; @@ -80,6 +83,8 @@ import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + /** * This test class tests sendTo() and join() for local tables @@ -333,11 +338,11 @@ public class TestLocalTable extends AbstractIntegrationTestHarness { private transient ReadableTable table; @Override - public void init(Config config, TaskContext context) { - table = (ReadableTable) context.getTable("t1"); + public void init(Context context) { + table = (ReadableTable) context.getTaskContext().getTable("t1"); this.received = new ArrayList<>(); - taskToMapFunctionMap.put(context.getTaskName().getTaskName(), this); + taskToMapFunctionMap.put(context.getTaskContext().getTaskModel().getTaskName().getTaskName(), this); } @Override @@ -355,16 +360,16 @@ public class TestLocalTable extends AbstractIntegrationTestHarness { public void testAsyncOperation() throws Exception { KeyValueStore kvStore = mock(KeyValueStore.class); LocalStoreBackedReadWriteTable<String, String> table = new LocalStoreBackedReadWriteTable<>("table1", kvStore); + Context context = mock(Context.class); TaskContext taskContext = mock(TaskContext.class); + when(context.getTaskContext()).thenReturn(taskContext); MetricsRegistry metricsRegistry = mock(MetricsRegistry.class); doReturn(mock(Timer.class)).when(metricsRegistry).newTimer(anyString(), anyString()); doReturn(mock(Counter.class)).when(metricsRegistry).newCounter(anyString(), anyString()); doReturn(mock(Gauge.class)).when(metricsRegistry).newGauge(anyString(), any()); - doReturn(metricsRegistry).when(taskContext).getMetricsRegistry(); - - SamzaContainerContext containerContext = mock(SamzaContainerContext.class); + doReturn(metricsRegistry).when(taskContext).getTaskMetricsRegistry(); - table.init(containerContext, taskContext); + table.init(context); // GET doReturn("bar").when(kvStore).get(anyString()); @@ -423,8 +428,8 @@ public class TestLocalTable extends AbstractIntegrationTestHarness { static public class MyStreamTask implements StreamTask, InitableTask { private ReadWriteTable<Integer, PageView> pageViewTable; @Override - public void init(Config config, TaskContext context) throws Exception { - pageViewTable = (ReadWriteTable<Integer, PageView>) context.getTable("t1"); + public void init(Context context) throws Exception { + pageViewTable = (ReadWriteTable<Integer, PageView>) context.getTaskContext().getTable("t1"); } @Override public void process(IncomingMessageEnvelope message, MessageCollector collector, TaskCoordinator coordinator) { http://git-wip-us.apache.org/repos/asf/samza/blob/9d2d49e9/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTableWithSideInputs.java ---------------------------------------------------------------------- diff --git a/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTableWithSideInputs.java b/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTableWithSideInputs.java index c31052d..2fa00fe 100644 --- a/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTableWithSideInputs.java +++ b/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTableWithSideInputs.java @@ -20,17 +20,9 @@ package org.apache.samza.test.table; import com.google.common.collect.ImmutableList; - -import java.nio.file.FileSystems; -import java.time.Duration; -import java.util.Arrays; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.stream.Collectors; import org.apache.samza.SamzaException; -import org.apache.samza.application.StreamApplicationDescriptor; import org.apache.samza.application.StreamApplication; +import org.apache.samza.application.StreamApplicationDescriptor; import org.apache.samza.config.JobConfig; import org.apache.samza.config.MapConfig; import org.apache.samza.config.StreamConfig; @@ -51,7 +43,18 @@ import org.apache.samza.test.framework.system.InMemorySystemDescriptor; import org.apache.samza.test.harness.AbstractIntegrationTestHarness; import org.junit.Test; -import static org.apache.samza.test.table.TestTableData.*; +import java.nio.file.FileSystems; +import java.time.Duration; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +import static org.apache.samza.test.table.TestTableData.EnrichedPageView; +import static org.apache.samza.test.table.TestTableData.PageView; +import static org.apache.samza.test.table.TestTableData.Profile; +import static org.apache.samza.test.table.TestTableData.ProfileJsonSerde; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; http://git-wip-us.apache.org/repos/asf/samza/blob/9d2d49e9/samza-test/src/test/java/org/apache/samza/test/table/TestRemoteTable.java ---------------------------------------------------------------------- diff --git a/samza-test/src/test/java/org/apache/samza/test/table/TestRemoteTable.java b/samza-test/src/test/java/org/apache/samza/test/table/TestRemoteTable.java index e23cb58..a48bb7f 100644 --- a/samza-test/src/test/java/org/apache/samza/test/table/TestRemoteTable.java +++ b/samza-test/src/test/java/org/apache/samza/test/table/TestRemoteTable.java @@ -34,17 +34,18 @@ import java.util.concurrent.TimeUnit; import java.util.function.Function; import java.util.stream.Collectors; import org.apache.samza.SamzaException; -import org.apache.samza.application.StreamApplicationDescriptor; import org.apache.samza.application.StreamApplication; +import org.apache.samza.application.StreamApplicationDescriptor; import org.apache.samza.config.MapConfig; -import org.apache.samza.container.SamzaContainerContext; +import org.apache.samza.context.Context; +import org.apache.samza.context.TaskContext; import org.apache.samza.metrics.Counter; import org.apache.samza.metrics.MetricsRegistry; import org.apache.samza.metrics.Timer; import org.apache.samza.operators.KV; import org.apache.samza.operators.TableDescriptor; -import org.apache.samza.operators.descriptors.GenericInputDescriptor; import org.apache.samza.operators.descriptors.DelegatingSystemDescriptor; +import org.apache.samza.operators.descriptors.GenericInputDescriptor; import org.apache.samza.runtime.LocalApplicationRunner; import org.apache.samza.serializers.NoOpSerde; import org.apache.samza.table.Table; @@ -56,18 +57,22 @@ import org.apache.samza.table.remote.RemoteTableDescriptor; import org.apache.samza.table.remote.TableRateLimiter; import org.apache.samza.table.remote.TableReadFunction; import org.apache.samza.table.remote.TableWriteFunction; -import org.apache.samza.task.TaskContext; import org.apache.samza.test.harness.AbstractIntegrationTestHarness; import org.apache.samza.test.util.Base64Serializer; import org.apache.samza.util.RateLimiter; import org.junit.Assert; import org.junit.Test; -import static org.apache.samza.test.table.TestTableData.*; +import static org.apache.samza.test.table.TestTableData.EnrichedPageView; +import static org.apache.samza.test.table.TestTableData.PageView; +import static org.apache.samza.test.table.TestTableData.Profile; +import static org.apache.samza.test.table.TestTableData.generatePageViews; +import static org.apache.samza.test.table.TestTableData.generateProfiles; import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyString; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; import static org.mockito.Mockito.withSettings; @@ -239,12 +244,14 @@ public class TestRemoteTable extends AbstractIntegrationTestHarness { doTestStreamTableJoinRemoteTable(true, true, "testStreamTableJoinRemoteTableWithDefaultCache"); } - private TaskContext createMockTaskContext() { + private Context createMockContext() { MetricsRegistry metricsRegistry = mock(MetricsRegistry.class); doReturn(new Counter("")).when(metricsRegistry).newCounter(anyString(), anyString()); doReturn(new Timer("")).when(metricsRegistry).newTimer(anyString(), anyString()); - TaskContext context = mock(TaskContext.class); - doReturn(metricsRegistry).when(context).getMetricsRegistry(); + Context context = mock(Context.class); + TaskContext taskContext = mock(TaskContext.class); + when(context.getTaskContext()).thenReturn(taskContext); + doReturn(metricsRegistry).when(taskContext).getTaskMetricsRegistry(); return context; } @@ -257,7 +264,7 @@ public class TestRemoteTable extends AbstractIntegrationTestHarness { TableRateLimiter rateLimitHelper = mock(TableRateLimiter.class); RemoteReadableTable<String, ?> table = new RemoteReadableTable<>( "table1", reader, rateLimitHelper, Executors.newSingleThreadExecutor(), null); - table.init(mock(SamzaContainerContext.class), createMockTaskContext()); + table.init(createMockContext()); table.get("abc"); } @@ -271,7 +278,7 @@ public class TestRemoteTable extends AbstractIntegrationTestHarness { TableRateLimiter rateLimitHelper = mock(TableRateLimiter.class); RemoteReadWriteTable<String, String> table = new RemoteReadWriteTable<String, String>( "table1", reader, writer, rateLimitHelper, rateLimitHelper, Executors.newSingleThreadExecutor(), null); - table.init(mock(SamzaContainerContext.class), createMockTaskContext()); + table.init(createMockContext()); table.put("abc", "efg"); } } http://git-wip-us.apache.org/repos/asf/samza/blob/9d2d49e9/samza-test/src/test/java/org/apache/samza/test/table/TestTableDescriptorsProvider.java ---------------------------------------------------------------------- diff --git a/samza-test/src/test/java/org/apache/samza/test/table/TestTableDescriptorsProvider.java b/samza-test/src/test/java/org/apache/samza/test/table/TestTableDescriptorsProvider.java index 6186ca7..f868fdc 100644 --- a/samza-test/src/test/java/org/apache/samza/test/table/TestTableDescriptorsProvider.java +++ b/samza-test/src/test/java/org/apache/samza/test/table/TestTableDescriptorsProvider.java @@ -25,7 +25,6 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.CompletableFuture; - import org.apache.samza.config.Config; import org.apache.samza.config.ConfigException; import org.apache.samza.config.ConfigRewriter; @@ -50,7 +49,7 @@ import org.apache.samza.util.Util; import org.junit.Assert; import org.junit.Test; -import static org.mockito.Mockito.*; +import static org.mockito.Mockito.mock; /** http://git-wip-us.apache.org/repos/asf/samza/blob/9d2d49e9/samza-test/src/test/scala/org/apache/samza/test/integration/StreamTaskTestUtil.scala ---------------------------------------------------------------------- diff --git a/samza-test/src/test/scala/org/apache/samza/test/integration/StreamTaskTestUtil.scala b/samza-test/src/test/scala/org/apache/samza/test/integration/StreamTaskTestUtil.scala index 8405c63..d248dac 100644 --- a/samza-test/src/test/scala/org/apache/samza/test/integration/StreamTaskTestUtil.scala +++ b/samza-test/src/test/scala/org/apache/samza/test/integration/StreamTaskTestUtil.scala @@ -22,8 +22,8 @@ package org.apache.samza.test.integration import java.util import java.util.Properties import java.util.concurrent.{CountDownLatch, TimeUnit} -import javax.security.auth.login.Configuration +import javax.security.auth.login.Configuration import kafka.admin.AdminUtils import kafka.consumer.{Consumer, ConsumerConfig} import kafka.message.MessageAndMetadata @@ -31,13 +31,14 @@ import kafka.server.{KafkaConfig, KafkaServer} import kafka.utils.{CoreUtils, TestUtils, ZkUtils} import kafka.zk.EmbeddedZookeeper import org.apache.kafka.clients.producer.{KafkaProducer, Producer, ProducerConfig, ProducerRecord} -import org.apache.samza.Partition -import org.apache.samza.checkpoint.Checkpoint import org.apache.kafka.common.protocol.SecurityProtocol import org.apache.kafka.common.security.JaasUtils +import org.apache.samza.Partition +import org.apache.samza.checkpoint.Checkpoint import org.apache.samza.config._ import org.apache.samza.container.TaskName -import org.apache.samza.job.local.{ThreadJob, ThreadJobFactory} +import org.apache.samza.context.Context +import org.apache.samza.job.local.ThreadJobFactory import org.apache.samza.job.model.{ContainerModel, JobModel} import org.apache.samza.job.{ApplicationStatus, JobRunner, StreamJob} import org.apache.samza.metrics.MetricsRegistryMap @@ -45,7 +46,7 @@ import org.apache.samza.storage.ChangelogStreamManager import org.apache.samza.system.kafka.TopicMetadataCache import org.apache.samza.system.{IncomingMessageEnvelope, SystemStreamPartition} import org.apache.samza.task._ -import org.apache.samza.util.{ClientUtilTopicMetadataStore, KafkaUtil, TopicMetadataStore, Util} +import org.apache.samza.util.{ClientUtilTopicMetadataStore, KafkaUtil, TopicMetadataStore} import org.junit.Assert._ import scala.collection.JavaConverters._ @@ -336,9 +337,9 @@ abstract class TestTask extends StreamTask with InitableTask { val eventProcessed = new CountDownLatch(1) @volatile var gotMessage = new CountDownLatch(1) - def init(config: Config, context: TaskContext) { - TestTask.register(context.getTaskName, this) - testInit(config, context) + def init(context: Context) { + TestTask.register(context.getTaskContext.getTaskModel.getTaskName, this) + testInit(context) initFinished.countDown() } @@ -363,7 +364,7 @@ abstract class TestTask extends StreamTask with InitableTask { gotMessage = new CountDownLatch(1) } - def testInit(config: Config, context: TaskContext) + def testInit(context: Context) def testProcess(envelope: IncomingMessageEnvelope, collector: MessageCollector, coordinator: TaskCoordinator) http://git-wip-us.apache.org/repos/asf/samza/blob/9d2d49e9/samza-test/src/test/scala/org/apache/samza/test/integration/TestShutdownStatefulTask.scala ---------------------------------------------------------------------- diff --git a/samza-test/src/test/scala/org/apache/samza/test/integration/TestShutdownStatefulTask.scala b/samza-test/src/test/scala/org/apache/samza/test/integration/TestShutdownStatefulTask.scala index ccb7cd4..4de0260 100644 --- a/samza-test/src/test/scala/org/apache/samza/test/integration/TestShutdownStatefulTask.scala +++ b/samza-test/src/test/scala/org/apache/samza/test/integration/TestShutdownStatefulTask.scala @@ -19,10 +19,10 @@ package org.apache.samza.test.integration -import org.apache.samza.config.Config +import org.apache.samza.context.Context import org.apache.samza.storage.kv.KeyValueStore import org.apache.samza.system.IncomingMessageEnvelope -import org.apache.samza.task.{MessageCollector, TaskContext, TaskCoordinator} +import org.apache.samza.task.{MessageCollector, TaskCoordinator} import org.junit.Assert._ import org.junit.{AfterClass, BeforeClass, Test} @@ -112,8 +112,8 @@ class ShutdownStateStoreTask extends TestTask { var store: KeyValueStore[String, String] = null var restored = scala.collection.mutable.Map[String, String]() - override def testInit(config: Config, context: TaskContext) { - store = context + override def testInit(context: Context) { + store = context.getTaskContext .getStore(TestShutdownStatefulTask.STORE_NAME) .asInstanceOf[KeyValueStore[String, String]] val iter = store.all http://git-wip-us.apache.org/repos/asf/samza/blob/9d2d49e9/samza-test/src/test/scala/org/apache/samza/test/integration/TestStatefulTask.scala ---------------------------------------------------------------------- diff --git a/samza-test/src/test/scala/org/apache/samza/test/integration/TestStatefulTask.scala b/samza-test/src/test/scala/org/apache/samza/test/integration/TestStatefulTask.scala index ccd5eaa..cf6d4fe 100644 --- a/samza-test/src/test/scala/org/apache/samza/test/integration/TestStatefulTask.scala +++ b/samza-test/src/test/scala/org/apache/samza/test/integration/TestStatefulTask.scala @@ -19,11 +19,11 @@ package org.apache.samza.test.integration -import org.apache.samza.config.Config +import org.apache.samza.context.Context import org.apache.samza.storage.kv.KeyValueStore import org.apache.samza.system.IncomingMessageEnvelope import org.apache.samza.task.TaskCoordinator.RequestScope -import org.apache.samza.task.{MessageCollector, TaskContext, TaskCoordinator} +import org.apache.samza.task.{MessageCollector, TaskCoordinator} import org.junit.Assert._ import org.junit.{AfterClass, BeforeClass, Test} @@ -146,8 +146,8 @@ class StateStoreTestTask extends TestTask { var store: KeyValueStore[String, String] = null var restored = Set[String]() - override def testInit(config: Config, context: TaskContext): Unit = { - store = context.getStore(TestStatefulTask.STORE_NAME).asInstanceOf[KeyValueStore[String, String]] + override def testInit(context: Context): Unit = { + store = context.getTaskContext.getStore(TestStatefulTask.STORE_NAME).asInstanceOf[KeyValueStore[String, String]] val iter = store.all restored ++= iter .asScala
