Repository: samza
Updated Branches:
  refs/heads/master d2c9e8162 -> 9d2d49e9e


http://git-wip-us.apache.org/repos/asf/samza/blob/9d2d49e9/samza-sql/src/test/java/org/apache/samza/sql/translator/TestQueryTranslator.java
----------------------------------------------------------------------
diff --git 
a/samza-sql/src/test/java/org/apache/samza/sql/translator/TestQueryTranslator.java
 
b/samza-sql/src/test/java/org/apache/samza/sql/translator/TestQueryTranslator.java
index e5d3659..7a194db 100644
--- 
a/samza-sql/src/test/java/org/apache/samza/sql/translator/TestQueryTranslator.java
+++ 
b/samza-sql/src/test/java/org/apache/samza/sql/translator/TestQueryTranslator.java
@@ -21,7 +21,6 @@ package org.apache.samza.sql.translator;
 
 import java.util.Collection;
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.stream.Collectors;
@@ -30,13 +29,13 @@ import 
org.apache.samza.application.StreamApplicationDescriptorImpl;
 import org.apache.samza.config.Config;
 import org.apache.samza.config.MapConfig;
 import org.apache.samza.config.StreamConfig;
-import org.apache.samza.container.TaskContextImpl;
-import org.apache.samza.container.TaskName;
+import org.apache.samza.context.ApplicationTaskContext;
 import org.apache.samza.operators.OperatorSpecGraph;
 import org.apache.samza.operators.spec.OperatorSpec;
 import org.apache.samza.sql.data.SamzaSqlExecutionContext;
 import org.apache.samza.sql.impl.ConfigBasedIOResolverFactory;
 import org.apache.samza.sql.runner.SamzaSqlApplicationConfig;
+import org.apache.samza.sql.runner.SamzaSqlApplicationContext;
 import org.apache.samza.sql.runner.SamzaSqlApplicationRunner;
 import org.apache.samza.sql.testutil.SamzaSqlQueryParser;
 import org.apache.samza.sql.testutil.SamzaSqlTestConfig;
@@ -45,7 +44,9 @@ import org.junit.Before;
 import org.junit.Test;
 import org.mockito.internal.util.reflection.Whitebox;
 
-import static org.apache.samza.sql.dsl.SamzaSqlDslConverter.*;
+import static org.apache.samza.sql.dsl.SamzaSqlDslConverter.fetchQueryInfo;
+import static org.apache.samza.sql.dsl.SamzaSqlDslConverter.fetchSqlFromConfig;
+import static org.junit.Assert.assertTrue;
 
 
 public class TestQueryTranslator {
@@ -53,11 +54,11 @@ public class TestQueryTranslator {
   // Helper functions to validate the cloned copies of TranslatorContext and 
SamzaSqlExecutionContext
   private void validateClonedTranslatorContext(TranslatorContext 
originContext, TranslatorContext clonedContext) {
     Assert.assertNotEquals(originContext, clonedContext);
-    Assert.assertTrue(originContext.getExpressionCompiler() == 
clonedContext.getExpressionCompiler());
-    Assert.assertTrue(originContext.getStreamAppDescriptor() == 
clonedContext.getStreamAppDescriptor());
-    Assert.assertTrue(Whitebox.getInternalState(originContext, 
"relSamzaConverters") == Whitebox.getInternalState(clonedContext, 
"relSamzaConverters"));
-    Assert.assertTrue(Whitebox.getInternalState(originContext, 
"messageStreams") == Whitebox.getInternalState(clonedContext, 
"messageStreams"));
-    Assert.assertTrue(Whitebox.getInternalState(originContext, "relNodes") == 
Whitebox.getInternalState(clonedContext, "relNodes"));
+    assertTrue(originContext.getExpressionCompiler() == 
clonedContext.getExpressionCompiler());
+    assertTrue(originContext.getStreamAppDescriptor() == 
clonedContext.getStreamAppDescriptor());
+    assertTrue(Whitebox.getInternalState(originContext, "relSamzaConverters") 
== Whitebox.getInternalState(clonedContext, "relSamzaConverters"));
+    assertTrue(Whitebox.getInternalState(originContext, "messageStreams") == 
Whitebox.getInternalState(clonedContext, "messageStreams"));
+    assertTrue(Whitebox.getInternalState(originContext, "relNodes") == 
Whitebox.getInternalState(clonedContext, "relNodes"));
     Assert.assertNotEquals(originContext.getDataContext(), 
clonedContext.getDataContext());
     validateClonedExecutionContext(originContext.getExecutionContext(), 
clonedContext.getExecutionContext());
   }
@@ -65,11 +66,11 @@ public class TestQueryTranslator {
   private void validateClonedExecutionContext(SamzaSqlExecutionContext 
originContext,
       SamzaSqlExecutionContext clonedContext) {
     Assert.assertNotEquals(originContext, clonedContext);
-    Assert.assertTrue(
+    assertTrue(
         Whitebox.getInternalState(originContext, "sqlConfig") == 
Whitebox.getInternalState(clonedContext, "sqlConfig"));
-    Assert.assertTrue(Whitebox.getInternalState(originContext, "udfMetadata") 
== Whitebox.getInternalState(clonedContext,
+    assertTrue(Whitebox.getInternalState(originContext, "udfMetadata") == 
Whitebox.getInternalState(clonedContext,
         "udfMetadata"));
-    Assert.assertTrue(Whitebox.getInternalState(originContext, "udfInstances") 
!= Whitebox.getInternalState(clonedContext,
+    assertTrue(Whitebox.getInternalState(originContext, "udfInstances") != 
Whitebox.getInternalState(clonedContext,
         "udfInstances"));
   }
 
@@ -121,18 +122,14 @@ public class TestQueryTranslator {
 
   private void validatePerTaskContextInit(StreamApplicationDescriptorImpl 
appDesc, Config samzaConfig) {
     // make sure that each task context would have a separate instance of 
cloned TranslatorContext
-    TaskContextImpl testContext = new TaskContextImpl(new TaskName("Partition 
1"), null, null,
-        new HashSet<>(), null, null, null, null, null, null);
-    // call ContextManager.bootstrap() to instantiate the per-task 
TranslatorContext
-    appDesc.getContextManager().init(samzaConfig, testContext);
-    Assert.assertNotNull(testContext.getUserContext());
-    Assert.assertTrue(testContext.getUserContext() instanceof 
TranslatorContext);
-    TranslatorContext contextPerTaskOne = (TranslatorContext) 
testContext.getUserContext();
-    // call ContextManager.bootstrap() second time to instantiate another 
clone of TranslatorContext
-    appDesc.getContextManager().init(samzaConfig, testContext);
-    Assert.assertTrue(testContext.getUserContext() instanceof 
TranslatorContext);
-    // validate the two copies of TranslatorContext are clones of each other
-    validateClonedTranslatorContext(contextPerTaskOne, (TranslatorContext) 
testContext.getUserContext());
+    ApplicationTaskContext contextPerTaskOne =
+        appDesc.getApplicationTaskContextFactory().get().create(null, null, 
null, null);
+    ApplicationTaskContext contextPerTaskTwo =
+        appDesc.getApplicationTaskContextFactory().get().create(null, null, 
null, null);
+    assertTrue(contextPerTaskOne instanceof SamzaSqlApplicationContext);
+    assertTrue(contextPerTaskTwo instanceof SamzaSqlApplicationContext);
+    validateClonedTranslatorContext(((SamzaSqlApplicationContext) 
contextPerTaskOne).getTranslatorContext(),
+        ((SamzaSqlApplicationContext) 
contextPerTaskTwo).getTranslatorContext());
   }
 
   @Test
@@ -734,7 +731,7 @@ public class TestQueryTranslator {
 
     Assert.assertEquals(1, specGraph.getInputOperators().size());
     Assert.assertEquals(1, specGraph.getOutputStreams().size());
-    Assert.assertTrue(specGraph.hasWindowOrJoins());
+    assertTrue(specGraph.hasWindowOrJoins());
     Collection<OperatorSpec> operatorSpecs = specGraph.getAllOperatorSpecs();
   }
 

http://git-wip-us.apache.org/repos/asf/samza/blob/9d2d49e9/samza-test/src/main/java/org/apache/samza/example/KeyValueStoreExample.java
----------------------------------------------------------------------
diff --git 
a/samza-test/src/main/java/org/apache/samza/example/KeyValueStoreExample.java 
b/samza-test/src/main/java/org/apache/samza/example/KeyValueStoreExample.java
index dfc4b42..e991c4e 100644
--- 
a/samza-test/src/main/java/org/apache/samza/example/KeyValueStoreExample.java
+++ 
b/samza-test/src/main/java/org/apache/samza/example/KeyValueStoreExample.java
@@ -25,6 +25,7 @@ import java.util.concurrent.TimeUnit;
 import org.apache.samza.application.StreamApplication;
 import org.apache.samza.application.StreamApplicationDescriptor;
 import org.apache.samza.config.Config;
+import org.apache.samza.context.Context;
 import org.apache.samza.operators.KV;
 import org.apache.samza.operators.MessageStream;
 import org.apache.samza.operators.OutputStream;
@@ -38,7 +39,6 @@ import org.apache.samza.storage.kv.KeyValueStore;
 import org.apache.samza.system.kafka.KafkaInputDescriptor;
 import org.apache.samza.system.kafka.KafkaOutputDescriptor;
 import org.apache.samza.system.kafka.KafkaSystemDescriptor;
-import org.apache.samza.task.TaskContext;
 import org.apache.samza.util.CommandLine;
 
 
@@ -115,8 +115,9 @@ public class KeyValueStoreExample implements 
StreamApplication {
     }
 
     @Override
-    public void init(Config config, TaskContext context) {
-      this.statsStore = (KeyValueStore<String, StatsWindowState>) 
context.getStore("my-stats-wnd-store");
+    public void init(Context context) {
+      this.statsStore =
+          (KeyValueStore<String, StatsWindowState>) 
context.getTaskContext().getStore("my-stats-wnd-store");
     }
   }
 

http://git-wip-us.apache.org/repos/asf/samza/blob/9d2d49e9/samza-test/src/main/java/org/apache/samza/test/framework/MessageStreamAssert.java
----------------------------------------------------------------------
diff --git 
a/samza-test/src/main/java/org/apache/samza/test/framework/MessageStreamAssert.java
 
b/samza-test/src/main/java/org/apache/samza/test/framework/MessageStreamAssert.java
index 1a1c24c..de98feb 100644
--- 
a/samza-test/src/main/java/org/apache/samza/test/framework/MessageStreamAssert.java
+++ 
b/samza-test/src/main/java/org/apache/samza/test/framework/MessageStreamAssert.java
@@ -21,9 +21,7 @@ package org.apache.samza.test.framework;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.Iterables;
-import java.io.IOException;
-import java.io.ObjectInputStream;
-import org.apache.samza.config.Config;
+import org.apache.samza.context.Context;
 import org.apache.samza.operators.MessageStream;
 import org.apache.samza.operators.functions.SinkFunction;
 import org.apache.samza.serializers.KVSerde;
@@ -31,10 +29,11 @@ import org.apache.samza.serializers.Serde;
 import org.apache.samza.serializers.StringSerde;
 import org.apache.samza.system.SystemStreamPartition;
 import org.apache.samza.task.MessageCollector;
-import org.apache.samza.task.TaskContext;
 import org.apache.samza.task.TaskCoordinator;
 import org.hamcrest.Matchers;
 
+import java.io.IOException;
+import java.io.ObjectInputStream;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
@@ -148,10 +147,12 @@ class MessageStreamAssert<M> {
     }
 
     @Override
-    public void init(Config config, TaskContext context) {
-      final SystemStreamPartition ssp = 
Iterables.getFirst(context.getSystemStreamPartitions(), null);
+    public void init(Context context) {
+      final SystemStreamPartition ssp =
+          
Iterables.getFirst(context.getTaskContext().getTaskModel().getSystemStreamPartitions(),
 null);
       if (ssp != null || ssp.getPartition().getPartitionId() == 0) {
-        final int count = checkEachTask ? 
context.getSamzaContainerContext().taskNames.size() : 1;
+        final int count =
+            checkEachTask ? 
context.getContainerContext().getContainerModel().getTasks().keySet().size() : 
1;
         LATCHES.put(id, new CountDownLatch(count));
         timer.schedule(timerTask, TIMEOUT);
       }

http://git-wip-us.apache.org/repos/asf/samza/blob/9d2d49e9/samza-test/src/main/java/org/apache/samza/test/integration/NegateNumberTask.java
----------------------------------------------------------------------
diff --git 
a/samza-test/src/main/java/org/apache/samza/test/integration/NegateNumberTask.java
 
b/samza-test/src/main/java/org/apache/samza/test/integration/NegateNumberTask.java
index 4309d92..80057d4 100644
--- 
a/samza-test/src/main/java/org/apache/samza/test/integration/NegateNumberTask.java
+++ 
b/samza-test/src/main/java/org/apache/samza/test/integration/NegateNumberTask.java
@@ -19,15 +19,14 @@
 
 package org.apache.samza.test.integration;
 
-import org.apache.samza.config.Config;
 import org.apache.samza.config.ConfigException;
+import org.apache.samza.context.Context;
 import org.apache.samza.system.IncomingMessageEnvelope;
 import org.apache.samza.system.OutgoingMessageEnvelope;
 import org.apache.samza.system.SystemStream;
 import org.apache.samza.task.InitableTask;
 import org.apache.samza.task.MessageCollector;
 import org.apache.samza.task.StreamTask;
-import org.apache.samza.task.TaskContext;
 import org.apache.samza.task.TaskCoordinator;
 import org.apache.samza.task.TaskCoordinator.RequestScope;
 import org.apache.samza.util.StreamUtil;
@@ -53,9 +52,9 @@ public class NegateNumberTask implements StreamTask, 
InitableTask {
   private SystemStream outputSystemStream;
 
   @Override
-  public void init(Config config, TaskContext context) throws Exception {
-    maxMessages = config.getInt("task.max.messages", 50);
-    String outputSystemStreamString = config.get("task.outputs", null);
+  public void init(Context context) throws Exception {
+    maxMessages = 
context.getJobContext().getConfig().getInt("task.max.messages", 50);
+    String outputSystemStreamString = 
context.getJobContext().getConfig().get("task.outputs", null);
     if (outputSystemStreamString == null) {
       throw new ConfigException("Missing required configuration: 
task.outputs");
     }

http://git-wip-us.apache.org/repos/asf/samza/blob/9d2d49e9/samza-test/src/main/java/org/apache/samza/test/integration/SimpleStatefulTask.java
----------------------------------------------------------------------
diff --git 
a/samza-test/src/main/java/org/apache/samza/test/integration/SimpleStatefulTask.java
 
b/samza-test/src/main/java/org/apache/samza/test/integration/SimpleStatefulTask.java
index 6fafabc..213fc71 100644
--- 
a/samza-test/src/main/java/org/apache/samza/test/integration/SimpleStatefulTask.java
+++ 
b/samza-test/src/main/java/org/apache/samza/test/integration/SimpleStatefulTask.java
@@ -19,7 +19,7 @@
 
 package org.apache.samza.test.integration;
 
-import org.apache.samza.config.Config;
+import org.apache.samza.context.Context;
 import org.apache.samza.storage.kv.Entry;
 import org.apache.samza.storage.kv.KeyValueIterator;
 import org.apache.samza.storage.kv.KeyValueStore;
@@ -27,22 +27,21 @@ import org.apache.samza.system.IncomingMessageEnvelope;
 import org.apache.samza.task.InitableTask;
 import org.apache.samza.task.MessageCollector;
 import org.apache.samza.task.StreamTask;
-import org.apache.samza.task.TaskContext;
 import org.apache.samza.task.TaskCoordinator;
 import org.apache.samza.task.TaskCoordinator.RequestScope;
 
 /**
  * This is a simple task that writes each message to a state store and prints 
them all out on reload.
- * 
+ *
  * It is useful for command line testing with the kafka console producer and 
consumer and text messages.
  */
 public class SimpleStatefulTask implements StreamTask, InitableTask {
-  
+
   private KeyValueStore<String, String> store;
 
   @SuppressWarnings("unchecked")
-  public void init(Config config, TaskContext context) {
-    this.store = (KeyValueStore<String, String>) context.getStore("mystore");
+  public void init(Context context) {
+    this.store = (KeyValueStore<String, String>) 
context.getTaskContext().getStore("mystore");
     System.out.println("Contents of store: ");
     KeyValueIterator<String, String> iter = store.all();
     while (iter.hasNext()) {
@@ -51,7 +50,7 @@ public class SimpleStatefulTask implements StreamTask, 
InitableTask {
     }
     iter.close();
   }
-  
+
   public void process(IncomingMessageEnvelope envelope, MessageCollector 
collector, TaskCoordinator coordinator) {
     System.out.println("Adding " + envelope.getMessage() + " => " + 
envelope.getMessage() + " to the store.");
     store.put((String) envelope.getMessage(), (String) envelope.getMessage());

http://git-wip-us.apache.org/repos/asf/samza/blob/9d2d49e9/samza-test/src/main/java/org/apache/samza/test/integration/StatePerfTestTask.java
----------------------------------------------------------------------
diff --git 
a/samza-test/src/main/java/org/apache/samza/test/integration/StatePerfTestTask.java
 
b/samza-test/src/main/java/org/apache/samza/test/integration/StatePerfTestTask.java
index 1d524a8..a637ea0 100644
--- 
a/samza-test/src/main/java/org/apache/samza/test/integration/StatePerfTestTask.java
+++ 
b/samza-test/src/main/java/org/apache/samza/test/integration/StatePerfTestTask.java
@@ -19,13 +19,12 @@
 
 package org.apache.samza.test.integration;
 
-import org.apache.samza.config.Config;
+import org.apache.samza.context.Context;
 import org.apache.samza.storage.kv.KeyValueStore;
 import org.apache.samza.system.IncomingMessageEnvelope;
 import org.apache.samza.task.InitableTask;
 import org.apache.samza.task.MessageCollector;
 import org.apache.samza.task.StreamTask;
-import org.apache.samza.task.TaskContext;
 import org.apache.samza.task.TaskCoordinator;
 import org.apache.samza.task.TaskCoordinator.RequestScope;
 
@@ -41,10 +40,10 @@ public class StatePerfTestTask implements StreamTask, 
InitableTask {
   private long start = System.currentTimeMillis();
 
   @SuppressWarnings("unchecked")
-  public void init(Config config, TaskContext context) {
-    this.store = (KeyValueStore<String, String>) context.getStore("mystore");
+  public void init(Context context) {
+    this.store = (KeyValueStore<String, String>) 
context.getTaskContext().getStore("mystore");
   }
-  
+
   public void process(IncomingMessageEnvelope envelope, MessageCollector 
collector, TaskCoordinator coordinator) {
     store.put((String) envelope.getMessage(), (String) envelope.getMessage());
     count++;

http://git-wip-us.apache.org/repos/asf/samza/blob/9d2d49e9/samza-test/src/main/java/org/apache/samza/test/integration/join/Checker.java
----------------------------------------------------------------------
diff --git 
a/samza-test/src/main/java/org/apache/samza/test/integration/join/Checker.java 
b/samza-test/src/main/java/org/apache/samza/test/integration/join/Checker.java
index ea3aeb1..0238d23 100644
--- 
a/samza-test/src/main/java/org/apache/samza/test/integration/join/Checker.java
+++ 
b/samza-test/src/main/java/org/apache/samza/test/integration/join/Checker.java
@@ -19,7 +19,7 @@
 
 package org.apache.samza.test.integration.join;
 
-import org.apache.samza.config.Config;
+import org.apache.samza.context.Context;
 import org.apache.samza.storage.kv.Entry;
 import org.apache.samza.storage.kv.KeyValueIterator;
 import org.apache.samza.storage.kv.KeyValueStore;
@@ -29,14 +29,13 @@ import org.apache.samza.system.SystemStream;
 import org.apache.samza.task.InitableTask;
 import org.apache.samza.task.MessageCollector;
 import org.apache.samza.task.StreamTask;
-import org.apache.samza.task.TaskContext;
 import org.apache.samza.task.TaskCoordinator;
 import org.apache.samza.task.WindowableTask;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 public class Checker implements StreamTask, WindowableTask, InitableTask {
-  
+
   private static Logger logger = LoggerFactory.getLogger(Checker.class);
 
   private static final String CURRENT_EPOCH = "current-epoch";
@@ -44,13 +43,13 @@ public class Checker implements StreamTask, WindowableTask, 
InitableTask {
   private KeyValueStore<String, String> store;
   private int expectedKeys;
   private int numPartitions;
-  
+
   @Override
   @SuppressWarnings("unchecked")
-  public void init(Config config, TaskContext context) {
-    this.store = (KeyValueStore<String, String>) 
context.getStore("checker-state");
-    this.expectedKeys = config.getInt("expected.keys");
-    this.numPartitions = config.getInt("num.partitions");
+  public void init(Context context) {
+    this.store = (KeyValueStore<String, String>) 
context.getTaskContext().getStore("checker-state");
+    this.expectedKeys = 
context.getJobContext().getConfig().getInt("expected.keys");
+    this.numPartitions = 
context.getJobContext().getConfig().getInt("num.partitions");
   }
 
   @Override
@@ -61,7 +60,7 @@ public class Checker implements StreamTask, WindowableTask, 
InitableTask {
     checkEpoch(epoch);
     this.store.put(key, epoch);
   }
-  
+
   @Override
   public void window(MessageCollector collector, TaskCoordinator coordinator) {
     String currentEpoch = this.store.get(CURRENT_EPOCH);
@@ -93,7 +92,7 @@ public class Checker implements StreamTask, WindowableTask, 
InitableTask {
       logger.info("Only found " + count + " valid keys, try again later.");
     }
   }
-  
+
   private void checkEpoch(String epoch) {
     String curr = this.store.get(CURRENT_EPOCH);
     if (curr == null)

http://git-wip-us.apache.org/repos/asf/samza/blob/9d2d49e9/samza-test/src/main/java/org/apache/samza/test/integration/join/Emitter.java
----------------------------------------------------------------------
diff --git 
a/samza-test/src/main/java/org/apache/samza/test/integration/join/Emitter.java 
b/samza-test/src/main/java/org/apache/samza/test/integration/join/Emitter.java
index 6661d8e..38ef6e1 100644
--- 
a/samza-test/src/main/java/org/apache/samza/test/integration/join/Emitter.java
+++ 
b/samza-test/src/main/java/org/apache/samza/test/integration/join/Emitter.java
@@ -19,8 +19,8 @@
 
 package org.apache.samza.test.integration.join;
 
-import org.apache.samza.config.Config;
 import org.apache.samza.container.TaskName;
+import org.apache.samza.context.Context;
 import org.apache.samza.storage.kv.KeyValueStore;
 import org.apache.samza.system.IncomingMessageEnvelope;
 import org.apache.samza.system.OutgoingMessageEnvelope;
@@ -28,7 +28,6 @@ import org.apache.samza.system.SystemStream;
 import org.apache.samza.task.InitableTask;
 import org.apache.samza.task.MessageCollector;
 import org.apache.samza.task.StreamTask;
-import org.apache.samza.task.TaskContext;
 import org.apache.samza.task.TaskCoordinator;
 import org.apache.samza.task.TaskCoordinator.RequestScope;
 import org.apache.samza.task.WindowableTask;
@@ -38,9 +37,9 @@ import org.slf4j.LoggerFactory;
 
 @SuppressWarnings("unchecked")
 public class Emitter implements StreamTask, InitableTask, WindowableTask {
-  
+
   private static Logger logger = LoggerFactory.getLogger(Emitter.class);
-  
+
   private static final String EPOCH = "the-epoch";
   private static final String COUNT = "the-count";
 
@@ -49,10 +48,10 @@ public class Emitter implements StreamTask, InitableTask, 
WindowableTask {
   private TaskName taskName;
 
   @Override
-  public void init(Config config, TaskContext context) {
-    this.state = (KeyValueStore<String, String>) 
context.getStore("emitter-state");
-    this.taskName = context.getTaskName();
-    this.max = config.getInt("count");
+  public void init(Context context) {
+    this.state = (KeyValueStore<String, String>) 
context.getTaskContext().getStore("emitter-state");
+    this.taskName = context.getTaskContext().getTaskModel().getTaskName();
+    this.max = context.getJobContext().getConfig().getInt("count");
   }
 
   @Override
@@ -66,7 +65,7 @@ public class Emitter implements StreamTask, InitableTask, 
WindowableTask {
         return;
       if (newEpoch < epoch)
         throw new IllegalArgumentException("Got new epoch " + newEpoch + " 
which is less than current epoch " + epoch);
-      
+
       // it's a new era, reset current epoch and count
       logger.info("Epoch: " + newEpoch);
       this.state.put(EPOCH, Integer.toString(newEpoch));
@@ -74,7 +73,7 @@ public class Emitter implements StreamTask, InitableTask, 
WindowableTask {
       coordinator.commit(RequestScope.ALL_TASKS_IN_CONTAINER);
     }
   }
-  
+
   public void window(MessageCollector collector, TaskCoordinator coordinator) {
     Integer epoch = getInt(EPOCH);
     if (epoch == null) {
@@ -89,13 +88,13 @@ public class Emitter implements StreamTask, InitableTask, 
WindowableTask {
       this.state.put(COUNT, Integer.toString(getInt(COUNT) + 1));
     }
   }
-  
+
   private void resetEpoch() {
     logger.info("Resetting epoch to 0");
     state.put(EPOCH, "0");
     state.put(COUNT, "0");
   }
-  
+
   private Integer getInt(String key) {
     String value = this.state.get(key);
     return value == null ? null : Integer.parseInt(value);

http://git-wip-us.apache.org/repos/asf/samza/blob/9d2d49e9/samza-test/src/main/java/org/apache/samza/test/integration/join/Joiner.java
----------------------------------------------------------------------
diff --git 
a/samza-test/src/main/java/org/apache/samza/test/integration/join/Joiner.java 
b/samza-test/src/main/java/org/apache/samza/test/integration/join/Joiner.java
index d1dd1f8..d006bdd 100644
--- 
a/samza-test/src/main/java/org/apache/samza/test/integration/join/Joiner.java
+++ 
b/samza-test/src/main/java/org/apache/samza/test/integration/join/Joiner.java
@@ -21,9 +21,8 @@ package org.apache.samza.test.integration.join;
 
 import java.util.HashSet;
 import java.util.Set;
-
-import org.apache.samza.config.Config;
 import org.apache.samza.container.TaskName;
+import org.apache.samza.context.Context;
 import org.apache.samza.storage.kv.KeyValueStore;
 import org.apache.samza.system.IncomingMessageEnvelope;
 import org.apache.samza.system.OutgoingMessageEnvelope;
@@ -31,25 +30,24 @@ import org.apache.samza.system.SystemStream;
 import org.apache.samza.task.InitableTask;
 import org.apache.samza.task.MessageCollector;
 import org.apache.samza.task.StreamTask;
-import org.apache.samza.task.TaskContext;
 import org.apache.samza.task.TaskCoordinator;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 @SuppressWarnings("unchecked")
 public class Joiner implements StreamTask, InitableTask {
-  
+
   private static Logger logger = LoggerFactory.getLogger(Joiner.class);
-  
+
   private KeyValueStore<String, String> store;
   private int expected;
   private TaskName taskName;
 
   @Override
-  public void init(Config config, TaskContext context) {
-    this.store = (KeyValueStore<String, String>) 
context.getStore("joiner-state");
-    this.expected = config.getInt("num.partitions");
-    this.taskName = context.getTaskName();
+  public void init(Context context) {
+    this.store = (KeyValueStore<String, String>) 
context.getTaskContext().getStore("joiner-state");
+    this.expected = 
context.getJobContext().getConfig().getInt("num.partitions");
+    this.taskName = context.getTaskContext().getTaskModel().getTaskName();
   }
 
   @Override
@@ -83,7 +81,7 @@ public class Joiner implements StreamTask, InitableTask {
     this.store.put(key, partitions.toString());
     logger.info("Join store in Task " + this.taskName + " " + key + " -> " + 
partitions.toString());
   }
-  
+
   private Partitions loadPartitions(int epoch, String key) {
     String current = this.store.get(key);
     Partitions partitions;
@@ -93,16 +91,16 @@ public class Joiner implements StreamTask, InitableTask {
       partitions = Partitions.parse(current);
     return partitions;
   }
-  
+
   private static class Partitions {
     int epoch;
     Set<Integer> partitions;
-    
+
     public Partitions(int epoch, Set<Integer> partitions) {
       this.epoch = epoch;
       this.partitions = partitions;
     }
-    
+
     public static Partitions parse(String s) {
       String[] pieces = s.split("\\|", -1);
       int epoch = Integer.parseInt(pieces[1]);
@@ -111,7 +109,7 @@ public class Joiner implements StreamTask, InitableTask {
         set.add(Integer.parseInt(pieces[i]));
       return new Partitions(epoch, set);
     }
-    
+
     public String toString() {
       StringBuilder b = new StringBuilder("|");
       b.append(epoch);

http://git-wip-us.apache.org/repos/asf/samza/blob/9d2d49e9/samza-test/src/main/java/org/apache/samza/test/integration/join/Watcher.java
----------------------------------------------------------------------
diff --git 
a/samza-test/src/main/java/org/apache/samza/test/integration/join/Watcher.java 
b/samza-test/src/main/java/org/apache/samza/test/integration/join/Watcher.java
index 4a299b6..b8ab073 100644
--- 
a/samza-test/src/main/java/org/apache/samza/test/integration/join/Watcher.java
+++ 
b/samza-test/src/main/java/org/apache/samza/test/integration/join/Watcher.java
@@ -19,31 +19,30 @@
 
 package org.apache.samza.test.integration.join;
 
-import org.apache.samza.config.Config;
+import org.apache.samza.context.Context;
 import org.apache.samza.system.IncomingMessageEnvelope;
 import org.apache.samza.task.InitableTask;
 import org.apache.samza.task.MessageCollector;
 import org.apache.samza.task.StreamTask;
-import org.apache.samza.task.TaskContext;
 import org.apache.samza.task.TaskCoordinator;
 import org.apache.samza.task.WindowableTask;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 public class Watcher implements StreamTask, WindowableTask, InitableTask {
-  
+
   private static Logger logger = LoggerFactory.getLogger(Watcher.class);
 
   private boolean inError = false;
   private long lastEpochChange = System.currentTimeMillis();
   private long maxTimeBetweenEpochsMs;
   private int currentEpoch = 0;
-  
+
   @Override
-  public void init(Config config, TaskContext context) {
-    this.maxTimeBetweenEpochsMs = config.getLong("max.time.between.epochs.ms");
+  public void init(Context context) {
+    this.maxTimeBetweenEpochsMs = 
context.getJobContext().getConfig().getLong("max.time.between.epochs.ms");
   }
-  
+
   @Override
   public void process(IncomingMessageEnvelope envelope, MessageCollector 
collector, TaskCoordinator coordinator) {
     int epoch = Integer.parseInt((String) envelope.getMessage());
@@ -54,7 +53,7 @@ public class Watcher implements StreamTask, WindowableTask, 
InitableTask {
       this.inError = false;
     }
   }
-  
+
   @Override
   public void window(MessageCollector collector, TaskCoordinator coordinator) {
     boolean isLagging = System.currentTimeMillis() - lastEpochChange > 
maxTimeBetweenEpochsMs;
@@ -64,5 +63,5 @@ public class Watcher implements StreamTask, WindowableTask, 
InitableTask {
       logger.error("Job failed to make progress!" + String.format("No epoch 
change for %d minutes.", this.maxTimeBetweenEpochsMs / (60 * 1000)));
     }
   }
-  
+
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/9d2d49e9/samza-test/src/main/scala/org/apache/samza/test/performance/TestKeyValuePerformance.scala
----------------------------------------------------------------------
diff --git 
a/samza-test/src/main/scala/org/apache/samza/test/performance/TestKeyValuePerformance.scala
 
b/samza-test/src/main/scala/org/apache/samza/test/performance/TestKeyValuePerformance.scala
index 36c86cc..1c2b333 100644
--- 
a/samza-test/src/main/scala/org/apache/samza/test/performance/TestKeyValuePerformance.scala
+++ 
b/samza-test/src/main/scala/org/apache/samza/test/performance/TestKeyValuePerformance.scala
@@ -20,19 +20,20 @@
 package org.apache.samza.test.performance
 
 import java.io.File
-import java.util
-import java.util.UUID
 import java.util.concurrent.TimeUnit
+import java.util.{Collections, UUID}
 
 import com.google.common.base.Stopwatch
 import org.apache.samza.config.Config
 import org.apache.samza.config.StorageConfig._
-import org.apache.samza.container.{SamzaContainerContext, TaskName}
+import org.apache.samza.container.TaskName
+import org.apache.samza.context.{ContainerContextImpl, JobContextImpl}
+import org.apache.samza.job.model.{ContainerModel, TaskModel}
 import org.apache.samza.metrics.MetricsRegistryMap
 import org.apache.samza.serializers.{ByteSerde, SerdeManager, UUIDSerde}
 import org.apache.samza.storage.StorageEngineFactory
 import org.apache.samza.storage.kv.{KeyValueStorageEngine, KeyValueStore}
-import org.apache.samza.system.{SystemProducer, SystemProducers}
+import org.apache.samza.system.{SystemProducer, SystemProducers, 
SystemStreamPartition}
 import org.apache.samza.task.TaskInstanceCollector
 import org.apache.samza.util.{CommandLine, FileUtil, Logging, Util}
 import org.apache.samza.{Partition, SamzaException}
@@ -84,9 +85,14 @@ object TestKeyValuePerformance extends Logging {
   }
 
   def invokeTest(testName: String, testMethod: 
(KeyValueStorageEngine[Array[Byte], Array[Byte]], Config) => Unit, config: 
Config) {
-    val taskNames = new java.util.ArrayList[TaskName]()
     val partitionCount = config.getInt("partition.count", 1)
-    (0 until partitionCount).map(p => taskNames.add(new TaskName(new 
Partition(p).toString)))
+    val tasks = (0 until partitionCount)
+      .map(i => new Partition(i))
+      .map(partition => (new TaskName(partition.toString),
+        new TaskModel(new TaskName(partition.toString),
+          Collections.singleton(new SystemStreamPartition("system", "stream", 
partition)),
+          partition)))
+      .toMap
 
     val producerMultiplexer = new SystemProducers(
       Map[String, SystemProducer](),
@@ -116,7 +122,8 @@ object TestKeyValuePerformance extends Logging {
           new TaskInstanceCollector(producerMultiplexer),
           new MetricsRegistryMap,
           null,
-          new SamzaContainerContext("0", config, taskNames, new 
MetricsRegistryMap)
+          JobContextImpl.fromConfigWithDefaults(config),
+          new ContainerContextImpl(new ContainerModel("0", tasks.asJava), new 
MetricsRegistryMap)
         )
 
         val db = if(!engine.isInstanceOf[KeyValueStorageEngine[_,_]]) {

http://git-wip-us.apache.org/repos/asf/samza/blob/9d2d49e9/samza-test/src/main/scala/org/apache/samza/test/performance/TestPerformanceTask.scala
----------------------------------------------------------------------
diff --git 
a/samza-test/src/main/scala/org/apache/samza/test/performance/TestPerformanceTask.scala
 
b/samza-test/src/main/scala/org/apache/samza/test/performance/TestPerformanceTask.scala
index 99d047d..5ab8c2c 100644
--- 
a/samza-test/src/main/scala/org/apache/samza/test/performance/TestPerformanceTask.scala
+++ 
b/samza-test/src/main/scala/org/apache/samza/test/performance/TestPerformanceTask.scala
@@ -19,21 +19,15 @@
 
 package org.apache.samza.test.performance
 
-import org.apache.samza.task.TaskContext
-import org.apache.samza.task.InitableTask
-import org.apache.samza.system.IncomingMessageEnvelope
-import org.apache.samza.task.MessageCollector
-import org.apache.samza.task.StreamTask
-import org.apache.samza.task.TaskCoordinator
+import org.apache.samza.context.Context
+import org.apache.samza.system.{IncomingMessageEnvelope, 
OutgoingMessageEnvelope, SystemStream}
 import org.apache.samza.task.TaskCoordinator.RequestScope
-import org.apache.samza.config.Config
+import org.apache.samza.task.{InitableTask, MessageCollector, StreamTask, 
TaskCoordinator}
 import org.apache.samza.util.{Logging, StreamUtil}
-import org.apache.samza.system.SystemStream
-import org.apache.samza.system.OutgoingMessageEnvelope
 
 
 object TestPerformanceTask {
-  // No thread safety is needed for these variables because they're mutated in 
+  // No thread safety is needed for these variables because they're mutated in
   // the process method, which is single threaded.
   var messagesProcessed = 0
   var startTime = 0L
@@ -59,7 +53,7 @@ object TestPerformanceTask {
  * <pre>
  *   task.outputs=kafka.MyOutputTopic
  * <pre>
- * 
+ *
  * If undefined, the task simply drops incoming messages, rather than
  * forwarding them to the output stream.
  */
@@ -82,7 +76,8 @@ class TestPerformanceTask extends StreamTask with 
InitableTask with Logging {
    */
   var outputSystemStream: Option[SystemStream] = None
 
-  def init(config: Config, context: TaskContext) {
+  def init(context: Context) {
+    val config = context.getJobContext.getConfig
     logInterval = config.getInt("task.log.interval", 10000)
     maxMessages = config.getInt("task.max.messages", 10000000)
     outputSystemStream = Option(config.get("task.outputs", 
null)).map(StreamUtil.getSystemStreamFromNames)

http://git-wip-us.apache.org/repos/asf/samza/blob/9d2d49e9/samza-test/src/test/java/org/apache/samza/processor/TestZkStreamProcessorBase.java
----------------------------------------------------------------------
diff --git 
a/samza-test/src/test/java/org/apache/samza/processor/TestZkStreamProcessorBase.java
 
b/samza-test/src/test/java/org/apache/samza/processor/TestZkStreamProcessorBase.java
index 66cf061..585af0f 100644
--- 
a/samza-test/src/test/java/org/apache/samza/processor/TestZkStreamProcessorBase.java
+++ 
b/samza-test/src/test/java/org/apache/samza/processor/TestZkStreamProcessorBase.java
@@ -44,6 +44,7 @@ import org.apache.samza.config.JobCoordinatorConfig;
 import org.apache.samza.config.MapConfig;
 import org.apache.samza.config.TaskConfigJava;
 import org.apache.samza.config.ZkConfig;
+import org.apache.samza.context.Context;
 import org.apache.samza.coordinator.JobCoordinator;
 import org.apache.samza.coordinator.JobCoordinatorFactory;
 import org.apache.samza.runtime.ProcessorLifecycleListener;
@@ -54,7 +55,6 @@ import org.apache.samza.task.InitableTask;
 import org.apache.samza.task.MessageCollector;
 import org.apache.samza.task.StreamTask;
 import org.apache.samza.task.StreamTaskFactory;
-import org.apache.samza.task.TaskContext;
 import org.apache.samza.task.TaskCoordinator;
 import org.apache.samza.test.StandaloneIntegrationTestHarness;
 import org.apache.samza.test.StandaloneTestUtils;
@@ -340,8 +340,8 @@ public class TestZkStreamProcessorBase extends 
StandaloneIntegrationTestHarness
     protected String processorIdToFail;
 
     @Override
-    public void init(Config config, TaskContext taskContext)
-        throws Exception {
+    public void init(Context context) {
+      Config config = context.getJobContext().getConfig();
       this.processorId = config.get(ApplicationConfig.PROCESSOR_ID);
       this.outputTopic = config.get("app.outputTopic", "output");
       this.outputSystem = config.get("app.outputSystem", "test-system");

http://git-wip-us.apache.org/repos/asf/samza/blob/9d2d49e9/samza-test/src/test/java/org/apache/samza/test/framework/TestSchedulingApp.java
----------------------------------------------------------------------
diff --git 
a/samza-test/src/test/java/org/apache/samza/test/framework/TestSchedulingApp.java
 
b/samza-test/src/test/java/org/apache/samza/test/framework/TestSchedulingApp.java
index db78e8c..1644a0f 100644
--- 
a/samza-test/src/test/java/org/apache/samza/test/framework/TestSchedulingApp.java
+++ 
b/samza-test/src/test/java/org/apache/samza/test/framework/TestSchedulingApp.java
@@ -26,8 +26,8 @@ import java.util.Collections;
 import java.util.List;
 import org.apache.samza.application.StreamApplication;
 import org.apache.samza.application.StreamApplicationDescriptor;
-import org.apache.samza.operators.Scheduler;
 import org.apache.samza.operators.MessageStream;
+import org.apache.samza.operators.Scheduler;
 import org.apache.samza.operators.functions.FlatMapFunction;
 import org.apache.samza.operators.functions.ScheduledFunction;
 import org.apache.samza.serializers.JsonSerdeV2;

http://git-wip-us.apache.org/repos/asf/samza/blob/9d2d49e9/samza-test/src/test/java/org/apache/samza/test/processor/IdentityStreamTask.java
----------------------------------------------------------------------
diff --git 
a/samza-test/src/test/java/org/apache/samza/test/processor/IdentityStreamTask.java
 
b/samza-test/src/test/java/org/apache/samza/test/processor/IdentityStreamTask.java
index 4f1d1df..16e8777 100644
--- 
a/samza-test/src/test/java/org/apache/samza/test/processor/IdentityStreamTask.java
+++ 
b/samza-test/src/test/java/org/apache/samza/test/processor/IdentityStreamTask.java
@@ -20,13 +20,13 @@
 package org.apache.samza.test.processor;
 
 import org.apache.samza.config.Config;
+import org.apache.samza.context.Context;
 import org.apache.samza.system.IncomingMessageEnvelope;
 import org.apache.samza.system.OutgoingMessageEnvelope;
 import org.apache.samza.system.SystemStream;
 import org.apache.samza.task.InitableTask;
 import org.apache.samza.task.MessageCollector;
 import org.apache.samza.task.StreamTask;
-import org.apache.samza.task.TaskContext;
 import org.apache.samza.task.TaskCoordinator;
 
 
@@ -37,7 +37,8 @@ public class IdentityStreamTask implements StreamTask , 
InitableTask  {
   private String outputSystem;
 
   @Override
-  public void init(Config config, TaskContext taskContext) throws Exception {
+  public void init(Context context) throws Exception {
+    Config config = context.getJobContext().getConfig();
     this.expectedMessageCount = config.getInt("app.messageCount");
     this.outputTopic = config.get("app.outputTopic", "output");
     this.outputSystem = config.get("app.outputSystem", "test-system");

http://git-wip-us.apache.org/repos/asf/samza/blob/9d2d49e9/samza-test/src/test/java/org/apache/samza/test/processor/TestStreamProcessor.java
----------------------------------------------------------------------
diff --git 
a/samza-test/src/test/java/org/apache/samza/test/processor/TestStreamProcessor.java
 
b/samza-test/src/test/java/org/apache/samza/test/processor/TestStreamProcessor.java
index fc62b0a..e7040ca 100644
--- 
a/samza-test/src/test/java/org/apache/samza/test/processor/TestStreamProcessor.java
+++ 
b/samza-test/src/test/java/org/apache/samza/test/processor/TestStreamProcessor.java
@@ -52,7 +52,10 @@ import org.junit.Assert;
 import org.junit.Test;
 import scala.Option$;
 
-import static org.mockito.Mockito.*;
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.mock;
 
 
 public class TestStreamProcessor extends StandaloneIntegrationTestHarness {

http://git-wip-us.apache.org/repos/asf/samza/blob/9d2d49e9/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTable.java
----------------------------------------------------------------------
diff --git 
a/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTable.java 
b/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTable.java
index da8af9e..d24cf57 100644
--- a/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTable.java
+++ b/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTable.java
@@ -28,13 +28,13 @@ import java.util.Map;
 import org.apache.samza.application.StreamApplication;
 import org.apache.samza.application.TaskApplication;
 import org.apache.samza.application.TaskApplicationDescriptor;
-import org.apache.samza.config.Config;
 import org.apache.samza.config.JobConfig;
 import org.apache.samza.config.JobCoordinatorConfig;
 import org.apache.samza.config.MapConfig;
 import org.apache.samza.config.TaskConfig;
-import org.apache.samza.container.SamzaContainerContext;
 import org.apache.samza.container.grouper.task.SingleContainerGrouperFactory;
+import org.apache.samza.context.Context;
+import org.apache.samza.context.TaskContext;
 import org.apache.samza.metrics.Counter;
 import org.apache.samza.metrics.Gauge;
 import org.apache.samza.metrics.MetricsRegistry;
@@ -61,7 +61,6 @@ import org.apache.samza.task.InitableTask;
 import org.apache.samza.task.MessageCollector;
 import org.apache.samza.task.StreamTask;
 import org.apache.samza.task.StreamTaskFactory;
-import org.apache.samza.task.TaskContext;
 import org.apache.samza.task.TaskCoordinator;
 import org.apache.samza.test.harness.AbstractIntegrationTestHarness;
 import org.apache.samza.test.util.ArraySystemFactory;
@@ -69,8 +68,12 @@ import org.apache.samza.test.util.Base64Serializer;
 import org.junit.Assert;
 import org.junit.Test;
 
-import static org.apache.samza.test.table.TestTableData.*;
-
+import static org.apache.samza.test.table.TestTableData.EnrichedPageView;
+import static org.apache.samza.test.table.TestTableData.PageView;
+import static org.apache.samza.test.table.TestTableData.PageViewJsonSerde;
+import static 
org.apache.samza.test.table.TestTableData.PageViewJsonSerdeFactory;
+import static org.apache.samza.test.table.TestTableData.Profile;
+import static org.apache.samza.test.table.TestTableData.ProfileJsonSerde;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 import static org.mockito.Matchers.any;
@@ -80,6 +83,8 @@ import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
 
 /**
  * This test class tests sendTo() and join() for local tables
@@ -333,11 +338,11 @@ public class TestLocalTable extends 
AbstractIntegrationTestHarness {
     private transient ReadableTable table;
 
     @Override
-    public void init(Config config, TaskContext context) {
-      table = (ReadableTable) context.getTable("t1");
+    public void init(Context context) {
+      table = (ReadableTable) context.getTaskContext().getTable("t1");
       this.received = new ArrayList<>();
 
-      taskToMapFunctionMap.put(context.getTaskName().getTaskName(), this);
+      
taskToMapFunctionMap.put(context.getTaskContext().getTaskModel().getTaskName().getTaskName(),
 this);
     }
 
     @Override
@@ -355,16 +360,16 @@ public class TestLocalTable extends 
AbstractIntegrationTestHarness {
   public void testAsyncOperation() throws Exception {
     KeyValueStore kvStore = mock(KeyValueStore.class);
     LocalStoreBackedReadWriteTable<String, String> table = new 
LocalStoreBackedReadWriteTable<>("table1", kvStore);
+    Context context = mock(Context.class);
     TaskContext taskContext = mock(TaskContext.class);
+    when(context.getTaskContext()).thenReturn(taskContext);
     MetricsRegistry metricsRegistry = mock(MetricsRegistry.class);
     doReturn(mock(Timer.class)).when(metricsRegistry).newTimer(anyString(), 
anyString());
     
doReturn(mock(Counter.class)).when(metricsRegistry).newCounter(anyString(), 
anyString());
     doReturn(mock(Gauge.class)).when(metricsRegistry).newGauge(anyString(), 
any());
-    doReturn(metricsRegistry).when(taskContext).getMetricsRegistry();
-
-    SamzaContainerContext containerContext = mock(SamzaContainerContext.class);
+    doReturn(metricsRegistry).when(taskContext).getTaskMetricsRegistry();
 
-    table.init(containerContext, taskContext);
+    table.init(context);
 
     // GET
     doReturn("bar").when(kvStore).get(anyString());
@@ -423,8 +428,8 @@ public class TestLocalTable extends 
AbstractIntegrationTestHarness {
   static public class MyStreamTask implements StreamTask, InitableTask {
     private ReadWriteTable<Integer, PageView> pageViewTable;
     @Override
-    public void init(Config config, TaskContext context) throws Exception {
-      pageViewTable = (ReadWriteTable<Integer, PageView>) 
context.getTable("t1");
+    public void init(Context context) throws Exception {
+      pageViewTable = (ReadWriteTable<Integer, PageView>) 
context.getTaskContext().getTable("t1");
     }
     @Override
     public void process(IncomingMessageEnvelope message, MessageCollector 
collector, TaskCoordinator coordinator) {

http://git-wip-us.apache.org/repos/asf/samza/blob/9d2d49e9/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTableWithSideInputs.java
----------------------------------------------------------------------
diff --git 
a/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTableWithSideInputs.java
 
b/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTableWithSideInputs.java
index c31052d..2fa00fe 100644
--- 
a/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTableWithSideInputs.java
+++ 
b/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTableWithSideInputs.java
@@ -20,17 +20,9 @@
 package org.apache.samza.test.table;
 
 import com.google.common.collect.ImmutableList;
-
-import java.nio.file.FileSystems;
-import java.time.Duration;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.stream.Collectors;
 import org.apache.samza.SamzaException;
-import org.apache.samza.application.StreamApplicationDescriptor;
 import org.apache.samza.application.StreamApplication;
+import org.apache.samza.application.StreamApplicationDescriptor;
 import org.apache.samza.config.JobConfig;
 import org.apache.samza.config.MapConfig;
 import org.apache.samza.config.StreamConfig;
@@ -51,7 +43,18 @@ import 
org.apache.samza.test.framework.system.InMemorySystemDescriptor;
 import org.apache.samza.test.harness.AbstractIntegrationTestHarness;
 import org.junit.Test;
 
-import static org.apache.samza.test.table.TestTableData.*;
+import java.nio.file.FileSystems;
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import static org.apache.samza.test.table.TestTableData.EnrichedPageView;
+import static org.apache.samza.test.table.TestTableData.PageView;
+import static org.apache.samza.test.table.TestTableData.Profile;
+import static org.apache.samza.test.table.TestTableData.ProfileJsonSerde;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 

http://git-wip-us.apache.org/repos/asf/samza/blob/9d2d49e9/samza-test/src/test/java/org/apache/samza/test/table/TestRemoteTable.java
----------------------------------------------------------------------
diff --git 
a/samza-test/src/test/java/org/apache/samza/test/table/TestRemoteTable.java 
b/samza-test/src/test/java/org/apache/samza/test/table/TestRemoteTable.java
index e23cb58..a48bb7f 100644
--- a/samza-test/src/test/java/org/apache/samza/test/table/TestRemoteTable.java
+++ b/samza-test/src/test/java/org/apache/samza/test/table/TestRemoteTable.java
@@ -34,17 +34,18 @@ import java.util.concurrent.TimeUnit;
 import java.util.function.Function;
 import java.util.stream.Collectors;
 import org.apache.samza.SamzaException;
-import org.apache.samza.application.StreamApplicationDescriptor;
 import org.apache.samza.application.StreamApplication;
+import org.apache.samza.application.StreamApplicationDescriptor;
 import org.apache.samza.config.MapConfig;
-import org.apache.samza.container.SamzaContainerContext;
+import org.apache.samza.context.Context;
+import org.apache.samza.context.TaskContext;
 import org.apache.samza.metrics.Counter;
 import org.apache.samza.metrics.MetricsRegistry;
 import org.apache.samza.metrics.Timer;
 import org.apache.samza.operators.KV;
 import org.apache.samza.operators.TableDescriptor;
-import org.apache.samza.operators.descriptors.GenericInputDescriptor;
 import org.apache.samza.operators.descriptors.DelegatingSystemDescriptor;
+import org.apache.samza.operators.descriptors.GenericInputDescriptor;
 import org.apache.samza.runtime.LocalApplicationRunner;
 import org.apache.samza.serializers.NoOpSerde;
 import org.apache.samza.table.Table;
@@ -56,18 +57,22 @@ import org.apache.samza.table.remote.RemoteTableDescriptor;
 import org.apache.samza.table.remote.TableRateLimiter;
 import org.apache.samza.table.remote.TableReadFunction;
 import org.apache.samza.table.remote.TableWriteFunction;
-import org.apache.samza.task.TaskContext;
 import org.apache.samza.test.harness.AbstractIntegrationTestHarness;
 import org.apache.samza.test.util.Base64Serializer;
 import org.apache.samza.util.RateLimiter;
 import org.junit.Assert;
 import org.junit.Test;
 
-import static org.apache.samza.test.table.TestTableData.*;
+import static org.apache.samza.test.table.TestTableData.EnrichedPageView;
+import static org.apache.samza.test.table.TestTableData.PageView;
+import static org.apache.samza.test.table.TestTableData.Profile;
+import static org.apache.samza.test.table.TestTableData.generatePageViews;
+import static org.apache.samza.test.table.TestTableData.generateProfiles;
 import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.anyString;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
 import static org.mockito.Mockito.withSettings;
 
 
@@ -239,12 +244,14 @@ public class TestRemoteTable extends 
AbstractIntegrationTestHarness {
     doTestStreamTableJoinRemoteTable(true, true, 
"testStreamTableJoinRemoteTableWithDefaultCache");
   }
 
-  private TaskContext createMockTaskContext() {
+  private Context createMockContext() {
     MetricsRegistry metricsRegistry = mock(MetricsRegistry.class);
     doReturn(new Counter("")).when(metricsRegistry).newCounter(anyString(), 
anyString());
     doReturn(new Timer("")).when(metricsRegistry).newTimer(anyString(), 
anyString());
-    TaskContext context = mock(TaskContext.class);
-    doReturn(metricsRegistry).when(context).getMetricsRegistry();
+    Context context = mock(Context.class);
+    TaskContext taskContext = mock(TaskContext.class);
+    when(context.getTaskContext()).thenReturn(taskContext);
+    doReturn(metricsRegistry).when(taskContext).getTaskMetricsRegistry();
     return context;
   }
 
@@ -257,7 +264,7 @@ public class TestRemoteTable extends 
AbstractIntegrationTestHarness {
     TableRateLimiter rateLimitHelper = mock(TableRateLimiter.class);
     RemoteReadableTable<String, ?> table = new RemoteReadableTable<>(
         "table1", reader, rateLimitHelper, 
Executors.newSingleThreadExecutor(), null);
-    table.init(mock(SamzaContainerContext.class), createMockTaskContext());
+    table.init(createMockContext());
     table.get("abc");
   }
 
@@ -271,7 +278,7 @@ public class TestRemoteTable extends 
AbstractIntegrationTestHarness {
     TableRateLimiter rateLimitHelper = mock(TableRateLimiter.class);
     RemoteReadWriteTable<String, String> table = new 
RemoteReadWriteTable<String, String>(
         "table1", reader, writer, rateLimitHelper, rateLimitHelper, 
Executors.newSingleThreadExecutor(), null);
-    table.init(mock(SamzaContainerContext.class), createMockTaskContext());
+    table.init(createMockContext());
     table.put("abc", "efg");
   }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/9d2d49e9/samza-test/src/test/java/org/apache/samza/test/table/TestTableDescriptorsProvider.java
----------------------------------------------------------------------
diff --git 
a/samza-test/src/test/java/org/apache/samza/test/table/TestTableDescriptorsProvider.java
 
b/samza-test/src/test/java/org/apache/samza/test/table/TestTableDescriptorsProvider.java
index 6186ca7..f868fdc 100644
--- 
a/samza-test/src/test/java/org/apache/samza/test/table/TestTableDescriptorsProvider.java
+++ 
b/samza-test/src/test/java/org/apache/samza/test/table/TestTableDescriptorsProvider.java
@@ -25,7 +25,6 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
-
 import org.apache.samza.config.Config;
 import org.apache.samza.config.ConfigException;
 import org.apache.samza.config.ConfigRewriter;
@@ -50,7 +49,7 @@ import org.apache.samza.util.Util;
 import org.junit.Assert;
 import org.junit.Test;
 
-import static org.mockito.Mockito.*;
+import static org.mockito.Mockito.mock;
 
 
 /**

http://git-wip-us.apache.org/repos/asf/samza/blob/9d2d49e9/samza-test/src/test/scala/org/apache/samza/test/integration/StreamTaskTestUtil.scala
----------------------------------------------------------------------
diff --git 
a/samza-test/src/test/scala/org/apache/samza/test/integration/StreamTaskTestUtil.scala
 
b/samza-test/src/test/scala/org/apache/samza/test/integration/StreamTaskTestUtil.scala
index 8405c63..d248dac 100644
--- 
a/samza-test/src/test/scala/org/apache/samza/test/integration/StreamTaskTestUtil.scala
+++ 
b/samza-test/src/test/scala/org/apache/samza/test/integration/StreamTaskTestUtil.scala
@@ -22,8 +22,8 @@ package org.apache.samza.test.integration
 import java.util
 import java.util.Properties
 import java.util.concurrent.{CountDownLatch, TimeUnit}
-import javax.security.auth.login.Configuration
 
+import javax.security.auth.login.Configuration
 import kafka.admin.AdminUtils
 import kafka.consumer.{Consumer, ConsumerConfig}
 import kafka.message.MessageAndMetadata
@@ -31,13 +31,14 @@ import kafka.server.{KafkaConfig, KafkaServer}
 import kafka.utils.{CoreUtils, TestUtils, ZkUtils}
 import kafka.zk.EmbeddedZookeeper
 import org.apache.kafka.clients.producer.{KafkaProducer, Producer, 
ProducerConfig, ProducerRecord}
-import org.apache.samza.Partition
-import org.apache.samza.checkpoint.Checkpoint
 import org.apache.kafka.common.protocol.SecurityProtocol
 import org.apache.kafka.common.security.JaasUtils
+import org.apache.samza.Partition
+import org.apache.samza.checkpoint.Checkpoint
 import org.apache.samza.config._
 import org.apache.samza.container.TaskName
-import org.apache.samza.job.local.{ThreadJob, ThreadJobFactory}
+import org.apache.samza.context.Context
+import org.apache.samza.job.local.ThreadJobFactory
 import org.apache.samza.job.model.{ContainerModel, JobModel}
 import org.apache.samza.job.{ApplicationStatus, JobRunner, StreamJob}
 import org.apache.samza.metrics.MetricsRegistryMap
@@ -45,7 +46,7 @@ import org.apache.samza.storage.ChangelogStreamManager
 import org.apache.samza.system.kafka.TopicMetadataCache
 import org.apache.samza.system.{IncomingMessageEnvelope, SystemStreamPartition}
 import org.apache.samza.task._
-import org.apache.samza.util.{ClientUtilTopicMetadataStore, KafkaUtil, 
TopicMetadataStore, Util}
+import org.apache.samza.util.{ClientUtilTopicMetadataStore, KafkaUtil, 
TopicMetadataStore}
 import org.junit.Assert._
 
 import scala.collection.JavaConverters._
@@ -336,9 +337,9 @@ abstract class TestTask extends StreamTask with 
InitableTask {
   val eventProcessed = new CountDownLatch(1)
   @volatile var gotMessage = new CountDownLatch(1)
 
-  def init(config: Config, context: TaskContext) {
-    TestTask.register(context.getTaskName, this)
-    testInit(config, context)
+  def init(context: Context) {
+    TestTask.register(context.getTaskContext.getTaskModel.getTaskName, this)
+    testInit(context)
     initFinished.countDown()
   }
 
@@ -363,7 +364,7 @@ abstract class TestTask extends StreamTask with 
InitableTask {
     gotMessage = new CountDownLatch(1)
   }
 
-  def testInit(config: Config, context: TaskContext)
+  def testInit(context: Context)
 
   def testProcess(envelope: IncomingMessageEnvelope, collector: 
MessageCollector, coordinator: TaskCoordinator)
 

http://git-wip-us.apache.org/repos/asf/samza/blob/9d2d49e9/samza-test/src/test/scala/org/apache/samza/test/integration/TestShutdownStatefulTask.scala
----------------------------------------------------------------------
diff --git 
a/samza-test/src/test/scala/org/apache/samza/test/integration/TestShutdownStatefulTask.scala
 
b/samza-test/src/test/scala/org/apache/samza/test/integration/TestShutdownStatefulTask.scala
index ccb7cd4..4de0260 100644
--- 
a/samza-test/src/test/scala/org/apache/samza/test/integration/TestShutdownStatefulTask.scala
+++ 
b/samza-test/src/test/scala/org/apache/samza/test/integration/TestShutdownStatefulTask.scala
@@ -19,10 +19,10 @@
 
 package org.apache.samza.test.integration
 
-import org.apache.samza.config.Config
+import org.apache.samza.context.Context
 import org.apache.samza.storage.kv.KeyValueStore
 import org.apache.samza.system.IncomingMessageEnvelope
-import org.apache.samza.task.{MessageCollector, TaskContext, TaskCoordinator}
+import org.apache.samza.task.{MessageCollector, TaskCoordinator}
 import org.junit.Assert._
 import org.junit.{AfterClass, BeforeClass, Test}
 
@@ -112,8 +112,8 @@ class ShutdownStateStoreTask extends TestTask {
   var store: KeyValueStore[String, String] = null
   var restored = scala.collection.mutable.Map[String, String]()
 
-  override def testInit(config: Config, context: TaskContext) {
-    store = context
+  override def testInit(context: Context) {
+    store = context.getTaskContext
       .getStore(TestShutdownStatefulTask.STORE_NAME)
       .asInstanceOf[KeyValueStore[String, String]]
     val iter = store.all

http://git-wip-us.apache.org/repos/asf/samza/blob/9d2d49e9/samza-test/src/test/scala/org/apache/samza/test/integration/TestStatefulTask.scala
----------------------------------------------------------------------
diff --git 
a/samza-test/src/test/scala/org/apache/samza/test/integration/TestStatefulTask.scala
 
b/samza-test/src/test/scala/org/apache/samza/test/integration/TestStatefulTask.scala
index ccd5eaa..cf6d4fe 100644
--- 
a/samza-test/src/test/scala/org/apache/samza/test/integration/TestStatefulTask.scala
+++ 
b/samza-test/src/test/scala/org/apache/samza/test/integration/TestStatefulTask.scala
@@ -19,11 +19,11 @@
 
 package org.apache.samza.test.integration
 
-import org.apache.samza.config.Config
+import org.apache.samza.context.Context
 import org.apache.samza.storage.kv.KeyValueStore
 import org.apache.samza.system.IncomingMessageEnvelope
 import org.apache.samza.task.TaskCoordinator.RequestScope
-import org.apache.samza.task.{MessageCollector, TaskContext, TaskCoordinator}
+import org.apache.samza.task.{MessageCollector, TaskCoordinator}
 import org.junit.Assert._
 import org.junit.{AfterClass, BeforeClass, Test}
 
@@ -146,8 +146,8 @@ class StateStoreTestTask extends TestTask {
   var store: KeyValueStore[String, String] = null
   var restored = Set[String]()
 
-  override def testInit(config: Config, context: TaskContext): Unit = {
-    store = 
context.getStore(TestStatefulTask.STORE_NAME).asInstanceOf[KeyValueStore[String,
 String]]
+  override def testInit(context: Context): Unit = {
+    store = 
context.getTaskContext.getStore(TestStatefulTask.STORE_NAME).asInstanceOf[KeyValueStore[String,
 String]]
     val iter = store.all
     restored ++= iter
       .asScala

Reply via email to