SAMZA-1714: Creating shared context factory for shared context objects

<s>This includes changes in https://github.com/apache/samza/pull/626.</s>
Update: PR #626 has been merged, so the diff here should no longer show those 
changes.

Author: Cameron Lee <[email protected]>

Reviewers: Prateek Maheshwari <[email protected]>

Closes #672 from cameronlee314/shared_context_impl


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/9d2d49e9
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/9d2d49e9
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/9d2d49e9

Branch: refs/heads/master
Commit: 9d2d49e9e987699f053a46eb12a64cfe8cc42907
Parents: d2c9e81
Author: Cameron Lee <[email protected]>
Authored: Wed Oct 10 15:34:19 2018 -0700
Committer: Prateek Maheshwari <[email protected]>
Committed: Wed Oct 10 15:34:19 2018 -0700

----------------------------------------------------------------------
 .../versioned/jobs/configuration-table.html     |   4 +-
 .../versioned/hello-samza-high-level-code.md    |   2 +-
 .../application/ApplicationDescriptor.java      |  30 +++-
 .../samza/container/SamzaContainerContext.java  |  55 ------
 .../context/ApplicationContainerContext.java    |   7 +-
 .../samza/context/ApplicationTaskContext.java   |   4 +
 .../org/apache/samza/context/JobContext.java    |   1 +
 .../apache/samza/operators/ContextManager.java  |  49 ------
 .../operators/functions/InitableFunction.java   |   9 +-
 .../samza/scheduler/CallbackScheduler.java      |   1 +
 .../samza/scheduler/ScheduledCallback.java      |   5 +-
 .../samza/storage/StorageEngineFactory.java     |   8 +-
 .../org/apache/samza/table/ReadableTable.java   |   9 +-
 .../org/apache/samza/table/TableProvider.java   |   9 +-
 .../org/apache/samza/task/InitableTask.java     |   6 +-
 .../java/org/apache/samza/task/TaskContext.java |  98 -----------
 .../java/org/apache/samza/util/RateLimiter.java |   9 +-
 .../application/ApplicationDescriptorImpl.java  |  55 +++---
 .../apache/samza/container/TaskContextImpl.java | 169 -------------------
 .../org/apache/samza/context/ContextImpl.java   |  60 +++++--
 .../apache/samza/context/JobContextImpl.java    |  22 ++-
 .../apache/samza/context/TaskContextImpl.java   |  34 +++-
 .../operators/impl/BroadcastOperatorImpl.java   |   9 +-
 .../samza/operators/impl/InputOperatorImpl.java |   5 +-
 .../samza/operators/impl/OperatorImpl.java      |  54 +++---
 .../samza/operators/impl/OperatorImplGraph.java |  84 ++++-----
 .../operators/impl/OutputOperatorImpl.java      |   5 +-
 .../operators/impl/PartialJoinOperatorImpl.java |  11 +-
 .../operators/impl/PartitionByOperatorImpl.java |  17 +-
 .../operators/impl/SendToTableOperatorImpl.java |  15 +-
 .../samza/operators/impl/SinkOperatorImpl.java  |   9 +-
 .../operators/impl/StreamOperatorImpl.java      |   7 +-
 .../impl/StreamTableJoinOperatorImpl.java       |  18 +-
 .../operators/impl/WindowOperatorImpl.java      |  15 +-
 .../operators/spec/FilterOperatorSpec.java      |   7 +-
 .../samza/operators/spec/MapOperatorSpec.java   |   7 +-
 .../apache/samza/processor/StreamProcessor.java |  78 ++++++---
 .../samza/runtime/LocalApplicationRunner.java   |   3 +-
 .../samza/runtime/LocalContainerRunner.java     |  11 +-
 .../apache/samza/storage/StorageRecovery.java   |   9 +-
 .../org/apache/samza/table/TableManager.java    |  18 +-
 .../samza/table/caching/CachingTable.java       |  28 ++-
 .../table/caching/CachingTableProvider.java     |   8 +-
 .../table/caching/guava/GuavaCacheTable.java    |  18 +-
 .../caching/guava/GuavaCacheTableProvider.java  |   2 +-
 .../table/remote/RemoteReadWriteTable.java      |  24 ++-
 .../samza/table/remote/RemoteReadableTable.java |  30 ++--
 .../samza/table/remote/RemoteTableProvider.java |  26 +--
 .../samza/table/utils/BaseTableProvider.java    |  11 +-
 .../table/utils/DefaultTableReadMetrics.java    |  11 +-
 .../table/utils/DefaultTableWriteMetrics.java   |  11 +-
 .../samza/table/utils/TableMetricsUtil.java     |  21 +--
 .../org/apache/samza/task/AsyncRunLoop.java     |   5 +-
 .../samza/task/AsyncStreamTaskAdapter.java      |   6 +-
 .../apache/samza/task/StreamOperatorTask.java   |  31 +---
 .../org/apache/samza/task/TaskFactoryUtil.java  |   4 +-
 .../samza/util/EmbeddedTaggedRateLimiter.java   |  30 ++--
 .../apache/samza/container/SamzaContainer.scala |  67 +++++---
 .../apache/samza/container/TaskInstance.scala   |  53 ++++--
 .../samza/job/local/ThreadJobFactory.scala      |   8 +-
 .../TestStreamApplicationDescriptorImpl.java    |  37 +++-
 .../TestTaskApplicationDescriptorImpl.java      |  36 +++-
 .../org/apache/samza/context/MockContext.java   |  73 ++++++++
 .../apache/samza/context/TestContextImpl.java   |  12 +-
 .../samza/context/TestTaskContextImpl.java      |  15 +-
 .../TestJobNodeConfigurationGenerator.java      |  22 +--
 .../samza/operators/TestJoinOperator.java       |  29 ++--
 .../samza/operators/impl/TestOperatorImpl.java  |  52 +++---
 .../operators/impl/TestOperatorImplGraph.java   | 137 +++++++--------
 .../operators/impl/TestSinkOperatorImpl.java    |   7 +-
 .../operators/impl/TestStreamOperatorImpl.java  |   6 -
 .../impl/TestStreamTableJoinOperatorImpl.java   |  17 +-
 .../operators/impl/TestWindowOperator.java      | 114 +++++++------
 .../samza/operators/spec/TestOperatorSpec.java  |   2 +-
 .../spec/TestPartitionByOperatorSpec.java       |   2 +-
 .../operators/spec/TestWindowOperatorSpec.java  |   9 +-
 .../samza/processor/TestStreamProcessor.java    |   9 +-
 .../samza/storage/MockStorageEngineFactory.java |  16 +-
 .../apache/samza/table/TestTableManager.java    |  18 +-
 .../samza/table/caching/TestCachingTable.java   |  48 +++---
 .../samza/table/remote/TestRemoteTable.java     |  39 ++---
 .../table/remote/TestRemoteTableDescriptor.java |  41 +++--
 .../retry/TestRetriableTableFunctions.java      |  12 +-
 .../apache/samza/task/IdentityStreamTask.java   |   6 +-
 .../org/apache/samza/task/TestAsyncRunLoop.java |  44 ++++-
 .../samza/task/TestAsyncStreamAdapter.java      |   6 +-
 .../samza/task/TestEpochTimeScheduler.java      |   3 +-
 .../samza/task/TestStreamOperatorTask.java      |  30 ++--
 .../util/TestEmbeddedTaggedRateLimiter.java     |  48 +++---
 .../samza/container/TestSamzaContainer.scala    |  51 ++++--
 .../samza/container/TestTaskInstance.scala      |  90 +++++++---
 .../processor/StreamProcessorTestUtils.scala    |  31 ++--
 .../InMemoryKeyValueStorageEngineFactory.scala  |  13 +-
 .../samza/storage/kv/RocksDbKeyValueReader.java |  11 +-
 .../samza/storage/kv/RocksDbOptionsHelper.java  |  15 +-
 .../RocksDbKeyValueStorageEngineFactory.scala   |  23 +--
 .../storage/kv/TestRocksDbTableDescriptor.java  |   3 +-
 .../kv/BaseLocalStoreBackedTableProvider.java   |  18 +-
 .../kv/LocalStoreBackedReadWriteTable.java      |  10 +-
 .../kv/LocalStoreBackedReadableTable.java       |  10 +-
 .../kv/BaseKeyValueStorageEngineFactory.scala   |  41 ++---
 .../TestBaseLocalStoreBackedTableProvider.java  |  18 +-
 .../sql/runner/SamzaSqlApplicationContext.java  |  44 +++++
 .../samza/sql/translator/FilterTranslator.java  |   9 +-
 .../samza/sql/translator/ModifyTranslator.java  |  11 +-
 .../samza/sql/translator/ProjectTranslator.java |   8 +-
 .../samza/sql/translator/QueryTranslator.java   |  49 +++---
 .../samza/sql/translator/ScanTranslator.java    |  11 +-
 .../apache/samza/sql/e2e/TestSamzaSqlTable.java |   1 -
 .../runner/TestSamzaSqlApplicationRunner.java   |   2 -
 .../samza/sql/system/TestAvroSystemFactory.java |   1 -
 .../sql/testutil/TestIOResolverFactory.java     |   1 -
 .../sql/testutil/TestSamzaSqlFileParser.java    |   2 -
 .../sql/translator/TestFilterTranslator.java    |  16 +-
 .../sql/translator/TestProjectTranslator.java   |  25 +--
 .../sql/translator/TestQueryTranslator.java     |  47 +++---
 .../samza/example/KeyValueStoreExample.java     |   7 +-
 .../test/framework/MessageStreamAssert.java     |  15 +-
 .../test/integration/NegateNumberTask.java      |   9 +-
 .../test/integration/SimpleStatefulTask.java    |  13 +-
 .../test/integration/StatePerfTestTask.java     |   9 +-
 .../samza/test/integration/join/Checker.java    |  19 +--
 .../samza/test/integration/join/Emitter.java    |  23 ++-
 .../samza/test/integration/join/Joiner.java     |  26 ++-
 .../samza/test/integration/join/Watcher.java    |  17 +-
 .../performance/TestKeyValuePerformance.scala   |  21 ++-
 .../test/performance/TestPerformanceTask.scala  |  19 +--
 .../processor/TestZkStreamProcessorBase.java    |   6 +-
 .../samza/test/framework/TestSchedulingApp.java |   2 +-
 .../test/processor/IdentityStreamTask.java      |   5 +-
 .../test/processor/TestStreamProcessor.java     |   5 +-
 .../apache/samza/test/table/TestLocalTable.java |  33 ++--
 .../table/TestLocalTableWithSideInputs.java     |  23 +--
 .../samza/test/table/TestRemoteTable.java       |  27 +--
 .../table/TestTableDescriptorsProvider.java     |   3 +-
 .../test/integration/StreamTaskTestUtil.scala   |  19 ++-
 .../integration/TestShutdownStatefulTask.scala  |   8 +-
 .../test/integration/TestStatefulTask.scala     |   8 +-
 138 files changed, 1598 insertions(+), 1592 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/9d2d49e9/docs/learn/documentation/versioned/jobs/configuration-table.html
----------------------------------------------------------------------
diff --git a/docs/learn/documentation/versioned/jobs/configuration-table.html 
b/docs/learn/documentation/versioned/jobs/configuration-table.html
index 35ddcab..44d43f3 100644
--- a/docs/learn/documentation/versioned/jobs/configuration-table.html
+++ b/docs/learn/documentation/versioned/jobs/configuration-table.html
@@ -1626,9 +1626,9 @@
                         store any <span class="store">store-name</span> except 
<em>default</em> (the <span class="store">store-name</span>
                         <em>default</em> is reserved for defining default 
store parameters), and use that name to get a
                         reference to the store in your stream task (call
-                        <a 
href="../api/javadocs/org/apache/samza/task/TaskContext.html#getStore(java.lang.String)">TaskContext.getStore()</a>
+                        <a 
href="../api/javadocs/org/apache/samza/context/TaskContext.html#getStore(java.lang.String)">TaskContext.getStore()</a>
                         in your task's
-                        <a 
href="../api/javadocs/org/apache/samza/task/InitableTask.html#init(org.apache.samza.config.Config,
 org.apache.samza.task.TaskContext)">init()</a>
+                        <a 
href="../api/javadocs/org/apache/samza/task/InitableTask.html#init(org.apache.samza.context.Context)">init()</a>
                         method). The value of this property is the 
fully-qualified name of a Java class that implements
                         <a 
href="../api/javadocs/org/apache/samza/storage/StorageEngineFactory.html">StorageEngineFactory</a>.
                         Samza currently ships with one storage engine 
implementation:

http://git-wip-us.apache.org/repos/asf/samza/blob/9d2d49e9/docs/learn/tutorials/versioned/hello-samza-high-level-code.md
----------------------------------------------------------------------
diff --git a/docs/learn/tutorials/versioned/hello-samza-high-level-code.md 
b/docs/learn/tutorials/versioned/hello-samza-high-level-code.md
index 2f6a4a6..1c06116 100644
--- a/docs/learn/tutorials/versioned/hello-samza-high-level-code.md
+++ b/docs/learn/tutorials/versioned/hello-samza-high-level-code.md
@@ -357,7 +357,7 @@ To use the store in the application, we need to get it from 
the [TaskContext](/l
 private KeyValueStore<String, Integer> store;
 {% endhighlight %}
 
-Then override the 
[init](/learn/documentation/{{site.version}}/api/javadocs/org/apache/samza/operators/functions/InitableFunction.html#init-org.apache.samza.config.Config-org.apache.samza.task.TaskContext-)
 method in `WikipediaStatsAggregator` to initialize the store.
+Then override the 
[init](/learn/documentation/{{site.version}}/api/javadocs/org/apache/samza/operators/functions/InitableFunction.html#init-org.apache.samza.context.Context-)
 method in `WikipediaStatsAggregator` to initialize the store.
 {% highlight java %}
 @Override
 public void init(Config config, TaskContext context) {

http://git-wip-us.apache.org/repos/asf/samza/blob/9d2d49e9/samza-api/src/main/java/org/apache/samza/application/ApplicationDescriptor.java
----------------------------------------------------------------------
diff --git 
a/samza-api/src/main/java/org/apache/samza/application/ApplicationDescriptor.java
 
b/samza-api/src/main/java/org/apache/samza/application/ApplicationDescriptor.java
index 178fdee..e806aad 100644
--- 
a/samza-api/src/main/java/org/apache/samza/application/ApplicationDescriptor.java
+++ 
b/samza-api/src/main/java/org/apache/samza/application/ApplicationDescriptor.java
@@ -21,8 +21,9 @@ package org.apache.samza.application;
 import java.util.Map;
 import org.apache.samza.annotation.InterfaceStability;
 import org.apache.samza.config.Config;
+import org.apache.samza.context.ApplicationContainerContextFactory;
+import org.apache.samza.context.ApplicationTaskContextFactory;
 import org.apache.samza.metrics.MetricsReporterFactory;
-import org.apache.samza.operators.ContextManager;
 import org.apache.samza.runtime.ProcessorLifecycleListenerFactory;
 
 
@@ -44,17 +45,30 @@ public interface ApplicationDescriptor<S extends 
ApplicationDescriptor> {
   Config getConfig();
 
   /**
-   * Sets the {@link ContextManager} for this application.
+   * Sets the {@link ApplicationContainerContextFactory} for this application. 
Each task will be given access to a
+   * different instance of the {@link 
org.apache.samza.context.ApplicationContainerContext} that this creates. The
+   * context can be accessed through the {@link 
org.apache.samza.context.Context}.
    * <p>
-   * Setting the {@link ContextManager} is optional. The provided {@link 
ContextManager} can be used to build the shared
-   * context between the operator functions within a task instance
+   * Setting this is optional.
    *
-   * TODO: this should be replaced by the shared context factory when 
SAMZA-1714 is fixed.
+   * @param factory the {@link ApplicationContainerContextFactory} for this 
application
+   * @return type {@code S} of {@link ApplicationDescriptor} with {@code 
factory} set as its
+   * {@link ApplicationContainerContextFactory}
+   */
+  S 
withApplicationContainerContextFactory(ApplicationContainerContextFactory<?> 
factory);
 
-   * @param contextManager the {@link ContextManager} to use for the 
application
-   * @return type {@code S} of {@link ApplicationDescriptor} with {@code 
contextManager} set as its {@link ContextManager}
+  /**
+   * Sets the {@link ApplicationTaskContextFactory} for this application. Each 
task will be given access to a different
+   * instance of the {@link org.apache.samza.context.ApplicationTaskContext} 
that this creates. The context can be
+   * accessed through the {@link org.apache.samza.context.Context}.
+   * <p>
+   * Setting this is optional.
+   *
+   * @param factory the {@link ApplicationTaskContextFactory} for this 
application
+   * @return type {@code S} of {@link ApplicationDescriptor} with {@code 
factory} set as its
+   * {@link ApplicationTaskContextFactory}
    */
-  S withContextManager(ContextManager contextManager);
+  S withApplicationTaskContextFactory(ApplicationTaskContextFactory<?> 
factory);
 
   /**
    * Sets the {@link ProcessorLifecycleListenerFactory} for this application.

http://git-wip-us.apache.org/repos/asf/samza/blob/9d2d49e9/samza-api/src/main/java/org/apache/samza/container/SamzaContainerContext.java
----------------------------------------------------------------------
diff --git 
a/samza-api/src/main/java/org/apache/samza/container/SamzaContainerContext.java 
b/samza-api/src/main/java/org/apache/samza/container/SamzaContainerContext.java
deleted file mode 100644
index 6e13f7a..0000000
--- 
a/samza-api/src/main/java/org/apache/samza/container/SamzaContainerContext.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.container;
-
-import org.apache.samza.config.Config;
-import org.apache.samza.metrics.MetricsRegistry;
-
-import java.util.Collection;
-import java.util.Collections;
-
-/**
- * A SamzaContainerContext maintains per-container information for the tasks 
it executes.
- */
-public class SamzaContainerContext {
-  public final String id;
-  public final Config config;
-  public final Collection<TaskName> taskNames;
-  public final MetricsRegistry metricsRegistry;
-
-  /**
-   * An immutable context object that can passed to tasks to give them 
information
-   * about the container in which they are executing.
-   * @param id The id of the container.
-   * @param config The job configuration.
-   * @param taskNames The set of taskName keys for which this container is 
responsible.
-   * @param metricsRegistry the {@link MetricsRegistry} for the container 
metrics
-   */
-  public SamzaContainerContext(
-      String id,
-      Config config,
-      Collection<TaskName> taskNames,
-      MetricsRegistry metricsRegistry) {
-    this.id = id;
-    this.config = config;
-    this.taskNames = Collections.unmodifiableCollection(taskNames);
-    this.metricsRegistry = metricsRegistry;
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/9d2d49e9/samza-api/src/main/java/org/apache/samza/context/ApplicationContainerContext.java
----------------------------------------------------------------------
diff --git 
a/samza-api/src/main/java/org/apache/samza/context/ApplicationContainerContext.java
 
b/samza-api/src/main/java/org/apache/samza/context/ApplicationContainerContext.java
index 08e0b60..aab8c7f 100644
--- 
a/samza-api/src/main/java/org/apache/samza/context/ApplicationContainerContext.java
+++ 
b/samza-api/src/main/java/org/apache/samza/context/ApplicationContainerContext.java
@@ -20,11 +20,16 @@ package org.apache.samza.context;
 
 /**
  * An application should implement this to contain any runtime objects 
required by processing logic which can be shared
- * across all tasks in a container. A single instance of this will be created 
in each container.
+ * across all tasks in a container. A single instance of this will be created 
in each container. Note that if the
+ * container moves or the container model changes (e.g. container 
failure/rebalancing), then this will be recreated.
  * <p>
  * This needs to be created by an implementation of {@link 
ApplicationContainerContextFactory}. The factory should
  * create the runtime objects contained within this context.
  * <p>
+ * This is related to {@link ContainerContext} in that they are both 
associated with the container lifecycle. In order
+ * to access this in application code, use {@link 
Context#getApplicationContainerContext()}. The
+ * {@link ContainerContext} is accessible through {@link 
Context#getContainerContext()}.
+ * <p>
  * If it is necessary to have a separate instance per task, then use {@link 
ApplicationTaskContext} instead.
  * <p>
  * This class does not need to be {@link java.io.Serializable} and instances 
are not persisted across deployments.

http://git-wip-us.apache.org/repos/asf/samza/blob/9d2d49e9/samza-api/src/main/java/org/apache/samza/context/ApplicationTaskContext.java
----------------------------------------------------------------------
diff --git 
a/samza-api/src/main/java/org/apache/samza/context/ApplicationTaskContext.java 
b/samza-api/src/main/java/org/apache/samza/context/ApplicationTaskContext.java
index ffc5383..6afbf23 100644
--- 
a/samza-api/src/main/java/org/apache/samza/context/ApplicationTaskContext.java
+++ 
b/samza-api/src/main/java/org/apache/samza/context/ApplicationTaskContext.java
@@ -25,6 +25,10 @@ package org.apache.samza.context;
  * This needs to be created by an implementation of {@link 
ApplicationTaskContextFactory}. The factory should create
  * the runtime objects contained within this context.
  * <p>
+ * This is related to {@link TaskContext} in that they are both associated 
with a task lifecycle. In order to access
+ * this in application code, use {@link Context#getApplicationTaskContext()}. 
The {@link TaskContext} is accessible
+ * through {@link Context#getTaskContext()}.
+ * <p>
  * If it is possible to share an instance of this across tasks in a container, 
then use
  * {@link ApplicationContainerContext} instead.
  * <p>

http://git-wip-us.apache.org/repos/asf/samza/blob/9d2d49e9/samza-api/src/main/java/org/apache/samza/context/JobContext.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/context/JobContext.java 
b/samza-api/src/main/java/org/apache/samza/context/JobContext.java
index 9b09fa9..239a011 100644
--- a/samza-api/src/main/java/org/apache/samza/context/JobContext.java
+++ b/samza-api/src/main/java/org/apache/samza/context/JobContext.java
@@ -35,6 +35,7 @@ public interface JobContext {
   /**
    * Returns the name of the job.
    * @return name of the job
+   * @throws org.apache.samza.SamzaException if the job name was not configured
    */
   String getJobName();
 

http://git-wip-us.apache.org/repos/asf/samza/blob/9d2d49e9/samza-api/src/main/java/org/apache/samza/operators/ContextManager.java
----------------------------------------------------------------------
diff --git 
a/samza-api/src/main/java/org/apache/samza/operators/ContextManager.java 
b/samza-api/src/main/java/org/apache/samza/operators/ContextManager.java
deleted file mode 100644
index 5f2c020..0000000
--- a/samza-api/src/main/java/org/apache/samza/operators/ContextManager.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.operators;
-
-import org.apache.samza.annotation.InterfaceStability;
-import org.apache.samza.config.Config;
-import org.apache.samza.task.TaskContext;
-
-
-/**
- * Manages custom context that is shared across multiple operator functions in 
a task.
- */
[email protected]
-public interface ContextManager {
-
-  /**
-   * Allows initializing and setting a custom context that is shared across 
multiple operator functions in a task.
-   * <p>
-   * This method is invoked before any {@link 
org.apache.samza.operators.functions.InitableFunction}s are initialized.
-   * Use {@link TaskContext#setUserContext(Object)} to set the context here 
and {@link TaskContext#getUserContext()} to
-   * get it in InitableFunctions.
-   *
-   * @param config the {@link Config} for the application
-   * @param context the {@link TaskContext} for this task
-   */
-  void init(Config config, TaskContext context);
-
-  /**
-   * Allows closing the custom context that is shared across multiple operator 
functions in a task.
-   */
-  void close();
-
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/9d2d49e9/samza-api/src/main/java/org/apache/samza/operators/functions/InitableFunction.java
----------------------------------------------------------------------
diff --git 
a/samza-api/src/main/java/org/apache/samza/operators/functions/InitableFunction.java
 
b/samza-api/src/main/java/org/apache/samza/operators/functions/InitableFunction.java
index 8a5d83b..7f950de 100644
--- 
a/samza-api/src/main/java/org/apache/samza/operators/functions/InitableFunction.java
+++ 
b/samza-api/src/main/java/org/apache/samza/operators/functions/InitableFunction.java
@@ -20,8 +20,7 @@
 package org.apache.samza.operators.functions;
 
 import org.apache.samza.annotation.InterfaceStability;
-import org.apache.samza.config.Config;
-import org.apache.samza.task.TaskContext;
+import org.apache.samza.context.Context;
 
 /**
  * A function that can be initialized before execution.
@@ -33,12 +32,10 @@ import org.apache.samza.task.TaskContext;
  */
 @InterfaceStability.Evolving
 public interface InitableFunction {
-
   /**
    * Initializes the function before any messages are processed.
    *
-   * @param config the {@link Config} for the application
-   * @param context the {@link TaskContext} for this task
+   * @param context the {@link Context} for this task
    */
-  default void init(Config config, TaskContext context) { }
+  default void init(Context context) { }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/9d2d49e9/samza-api/src/main/java/org/apache/samza/scheduler/CallbackScheduler.java
----------------------------------------------------------------------
diff --git 
a/samza-api/src/main/java/org/apache/samza/scheduler/CallbackScheduler.java 
b/samza-api/src/main/java/org/apache/samza/scheduler/CallbackScheduler.java
index e230304..5c8d77d 100644
--- a/samza-api/src/main/java/org/apache/samza/scheduler/CallbackScheduler.java
+++ b/samza-api/src/main/java/org/apache/samza/scheduler/CallbackScheduler.java
@@ -18,6 +18,7 @@
  */
 package org.apache.samza.scheduler;
 
+
 /**
  * Provides a way for applications to register some logic to be executed at a 
future time.
  */

http://git-wip-us.apache.org/repos/asf/samza/blob/9d2d49e9/samza-api/src/main/java/org/apache/samza/scheduler/ScheduledCallback.java
----------------------------------------------------------------------
diff --git 
a/samza-api/src/main/java/org/apache/samza/scheduler/ScheduledCallback.java 
b/samza-api/src/main/java/org/apache/samza/scheduler/ScheduledCallback.java
index 8745422..546ca37 100644
--- a/samza-api/src/main/java/org/apache/samza/scheduler/ScheduledCallback.java
+++ b/samza-api/src/main/java/org/apache/samza/scheduler/ScheduledCallback.java
@@ -24,8 +24,9 @@ import org.apache.samza.task.TaskCoordinator;
 
 
 /**
- * The callback that is invoked when its corresponding schedule time 
registered via
- * {@link org.apache.samza.task.TaskContext} is reached.
+ * The callback that is invoked when its corresponding schedule time 
registered via {@link CallbackScheduler} is
+ * reached. The {@link CallbackScheduler} is available through
+ * {@link org.apache.samza.context.TaskContext#getCallbackScheduler()}.
  * @param <K> type of the callback key
  */
 public interface ScheduledCallback<K> {

http://git-wip-us.apache.org/repos/asf/samza/blob/9d2d49e9/samza-api/src/main/java/org/apache/samza/storage/StorageEngineFactory.java
----------------------------------------------------------------------
diff --git 
a/samza-api/src/main/java/org/apache/samza/storage/StorageEngineFactory.java 
b/samza-api/src/main/java/org/apache/samza/storage/StorageEngineFactory.java
index 800deeb..2425cf3 100644
--- a/samza-api/src/main/java/org/apache/samza/storage/StorageEngineFactory.java
+++ b/samza-api/src/main/java/org/apache/samza/storage/StorageEngineFactory.java
@@ -20,8 +20,8 @@
 package org.apache.samza.storage;
 
 import java.io.File;
-
-import org.apache.samza.container.SamzaContainerContext;
+import org.apache.samza.context.ContainerContext;
+import org.apache.samza.context.JobContext;
 import org.apache.samza.metrics.MetricsRegistry;
 import org.apache.samza.serializers.Serde;
 import org.apache.samza.system.SystemStreamPartition;
@@ -43,6 +43,7 @@ public interface StorageEngineFactory<K, V> {
    * @param collector MessageCollector the storage engine uses to persist 
changes.
    * @param registry MetricsRegistry to which to publish storage-engine 
specific metrics.
    * @param changeLogSystemStreamPartition Samza stream partition from which 
to receive the changelog.
+   * @param jobContext Information about the job in which the task is executing
    * @param containerContext Information about the container in which the task 
is executing.
    * @return The storage engine instance.
    */
@@ -54,5 +55,6 @@ public interface StorageEngineFactory<K, V> {
     MessageCollector collector,
     MetricsRegistry registry,
     SystemStreamPartition changeLogSystemStreamPartition,
-    SamzaContainerContext containerContext);
+    JobContext jobContext,
+    ContainerContext containerContext);
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/9d2d49e9/samza-api/src/main/java/org/apache/samza/table/ReadableTable.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/table/ReadableTable.java 
b/samza-api/src/main/java/org/apache/samza/table/ReadableTable.java
index 490acc0..6c88fd3 100644
--- a/samza-api/src/main/java/org/apache/samza/table/ReadableTable.java
+++ b/samza-api/src/main/java/org/apache/samza/table/ReadableTable.java
@@ -21,11 +21,9 @@ package org.apache.samza.table;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
-
 import org.apache.samza.annotation.InterfaceStability;
-import org.apache.samza.container.SamzaContainerContext;
+import org.apache.samza.context.Context;
 import org.apache.samza.operators.KV;
-import org.apache.samza.task.TaskContext;
 
 
 /**
@@ -40,10 +38,9 @@ public interface ReadableTable<K, V> extends Table<KV<K, V>> 
{
   /**
    * Initializes the table during container initialization.
    * Guaranteed to be invoked as the first operation on the table.
-   * @param containerContext Samza container context
-   * @param taskContext nullable for global table
+   * @param context {@link Context} corresponding to this table
    */
-  default void init(SamzaContainerContext containerContext, TaskContext 
taskContext) {
+  default void init(Context context) {
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/samza/blob/9d2d49e9/samza-api/src/main/java/org/apache/samza/table/TableProvider.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/table/TableProvider.java 
b/samza-api/src/main/java/org/apache/samza/table/TableProvider.java
index 99446e4..350324c 100644
--- a/samza-api/src/main/java/org/apache/samza/table/TableProvider.java
+++ b/samza-api/src/main/java/org/apache/samza/table/TableProvider.java
@@ -19,11 +19,9 @@
 package org.apache.samza.table;
 
 import java.util.Map;
-
 import org.apache.samza.annotation.InterfaceStability;
 import org.apache.samza.config.Config;
-import org.apache.samza.container.SamzaContainerContext;
-import org.apache.samza.task.TaskContext;
+import org.apache.samza.context.Context;
 
 /**
  * A table provider provides the implementation for a table. It ensures a 
table is
@@ -33,10 +31,9 @@ import org.apache.samza.task.TaskContext;
 public interface TableProvider {
   /**
    * Initialize TableProvider with container and task context
-   * @param containerContext Samza container context
-   * @param taskContext nullable for global table
+   * @param context context for the task
    */
-  void init(SamzaContainerContext containerContext, TaskContext taskContext);
+  void init(Context context);
 
   /**
    * Get an instance of the table for read/write operations

http://git-wip-us.apache.org/repos/asf/samza/blob/9d2d49e9/samza-api/src/main/java/org/apache/samza/task/InitableTask.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/task/InitableTask.java 
b/samza-api/src/main/java/org/apache/samza/task/InitableTask.java
index 6926f16..8bf0619 100644
--- a/samza-api/src/main/java/org/apache/samza/task/InitableTask.java
+++ b/samza-api/src/main/java/org/apache/samza/task/InitableTask.java
@@ -19,7 +19,8 @@
 
 package org.apache.samza.task;
 
-import org.apache.samza.config.Config;
+import org.apache.samza.context.Context;
+
 
 /**
  * Used as an interface for user processing StreamTasks that need to have 
specific functionality performed as their StreamTasks
@@ -28,9 +29,8 @@ import org.apache.samza.config.Config;
 public interface InitableTask {
   /**
    * Called by TaskRunner each time an implementing task is created.
-   * @param config Allows accessing of fields in the configuration files that 
this StreamTask is specified in.
    * @param context Allows accessing of contextual data of this StreamTask.
    * @throws Exception Any exception types encountered during the execution of 
the processing task.
    */
-  void init(Config config, TaskContext context) throws Exception;
+  void init(Context context) throws Exception;
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/9d2d49e9/samza-api/src/main/java/org/apache/samza/task/TaskContext.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/task/TaskContext.java 
b/samza-api/src/main/java/org/apache/samza/task/TaskContext.java
deleted file mode 100644
index 007028a..0000000
--- a/samza-api/src/main/java/org/apache/samza/task/TaskContext.java
+++ /dev/null
@@ -1,98 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.task;
-
-import java.util.Set;
-
-import org.apache.samza.container.SamzaContainerContext;
-import org.apache.samza.container.TaskName;
-import org.apache.samza.metrics.MetricsRegistry;
-import org.apache.samza.scheduler.ScheduledCallback;
-import org.apache.samza.system.SystemStreamPartition;
-import org.apache.samza.table.Table;
-
-
-/**
- * A TaskContext provides resources about the {@link 
org.apache.samza.task.StreamTask}, particularly during
- * initialization in an {@link org.apache.samza.task.InitableTask}.
- * TODO this will be replaced by {@link org.apache.samza.context.TaskContext} 
in the near future by SAMZA-1714
- */
-public interface TaskContext {
-  MetricsRegistry getMetricsRegistry();
-
-  Set<SystemStreamPartition> getSystemStreamPartitions();
-
-  Object getStore(String name);
-
-  Table getTable(String tableId);
-
-  TaskName getTaskName();
-
-  SamzaContainerContext getSamzaContainerContext();
-
-  /**
-   * Set the starting offset for the given {@link 
org.apache.samza.system.SystemStreamPartition}. Offsets
-   * can only be set for a {@link 
org.apache.samza.system.SystemStreamPartition} assigned to this task
-   * (as returned by {@link #getSystemStreamPartitions()}); trying to set the 
offset for any other partition
-   * will have no effect.
-   *
-   * NOTE: this feature is experimental, and the API may change in a future 
release.
-   *
-   * @param ssp {@link org.apache.samza.system.SystemStreamPartition} whose 
offset should be set
-   * @param offset to set for the given {@link 
org.apache.samza.system.SystemStreamPartition}
-   *
-   */
-  void setStartingOffset(SystemStreamPartition ssp, String offset);
-
-  /**
-   * Sets the user-defined context.
-   *
-   * @param context the user-defined context to set
-   */
-  default void setUserContext(Object context) { }
-
-  /**
-   * Gets the user-defined context.
-   *
-   * @return the user-defined context if set, else null
-   */
-  default Object getUserContext() {
-    return null;
-  }
-
-  /**
-   * Schedule the {@code callback} for the provided {@code key} to be invoked 
at epoch-time {@code timestamp}.
-   * The callback will be invoked exclusively with any other operations for 
this task,
-   * e.g. processing, windowing and commit.
-   * @param key key for the callback
-   * @param timestamp epoch time when the callback will be fired, in 
milliseconds
-   * @param callback callback to call when the {@code timestamp} is reached
-   * @param <K> type of the key
-   */
-  <K> void scheduleCallback(K key, long timestamp, ScheduledCallback<K> 
callback);
-
-  /**
-   * Delete the scheduled {@code callback} for the {@code key}.
-   * Deletion only happens if the callback hasn't been fired. Otherwise it 
will not interrupt.
-   * @param key callback key
-   * @param <K> type of the key
-   */
-  <K> void deleteScheduledCallback(K key);
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/9d2d49e9/samza-api/src/main/java/org/apache/samza/util/RateLimiter.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/util/RateLimiter.java 
b/samza-api/src/main/java/org/apache/samza/util/RateLimiter.java
index ad40d35..83532e4 100644
--- a/samza-api/src/main/java/org/apache/samza/util/RateLimiter.java
+++ b/samza-api/src/main/java/org/apache/samza/util/RateLimiter.java
@@ -22,10 +22,8 @@ import java.io.Serializable;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
-
 import org.apache.samza.annotation.InterfaceStability;
-import org.apache.samza.config.Config;
-import org.apache.samza.task.TaskContext;
+import org.apache.samza.context.Context;
 
 /**
  * A rate limiter interface used by Samza components to limit throughput of 
operations
@@ -53,10 +51,9 @@ public interface RateLimiter extends Serializable {
   /**
    * Initialize this rate limiter, this method should be called during 
container initialization.
    *
-   * @param config job configuration
-   * @param taskContext task context that owns this rate limiter
+   * @param context {@link Context} that corresponds to this rate limiter
    */
-  void init(Config config, TaskContext taskContext);
+  void init(Context context);
 
   /**
    * Attempt to acquire the provided number of credits, blocks indefinitely 
until

http://git-wip-us.apache.org/repos/asf/samza/blob/9d2d49e9/samza-core/src/main/java/org/apache/samza/application/ApplicationDescriptorImpl.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/application/ApplicationDescriptorImpl.java
 
b/samza-core/src/main/java/org/apache/samza/application/ApplicationDescriptorImpl.java
index b58d5a5..5416af5 100644
--- 
a/samza-core/src/main/java/org/apache/samza/application/ApplicationDescriptorImpl.java
+++ 
b/samza-core/src/main/java/org/apache/samza/application/ApplicationDescriptorImpl.java
@@ -25,8 +25,11 @@ import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
 import org.apache.samza.config.Config;
+import org.apache.samza.context.ApplicationContainerContext;
+import org.apache.samza.context.ApplicationContainerContextFactory;
+import org.apache.samza.context.ApplicationTaskContext;
+import org.apache.samza.context.ApplicationTaskContextFactory;
 import org.apache.samza.metrics.MetricsReporterFactory;
-import org.apache.samza.operators.ContextManager;
 import org.apache.samza.operators.KV;
 import org.apache.samza.operators.TableDescriptor;
 import org.apache.samza.operators.descriptors.base.stream.InputDescriptor;
@@ -38,7 +41,6 @@ import 
org.apache.samza.runtime.ProcessorLifecycleListenerFactory;
 import org.apache.samza.serializers.KVSerde;
 import org.apache.samza.serializers.NoOpSerde;
 import org.apache.samza.serializers.Serde;
-import org.apache.samza.task.TaskContext;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -47,7 +49,8 @@ import org.slf4j.LoggerFactory;
  * This is the base class that implements interface {@link 
ApplicationDescriptor}.
  * <p>
  * This base class contains the common objects that are used by both 
high-level and low-level API applications, such as
- * {@link Config}, {@link ContextManager}, and {@link 
ProcessorLifecycleListenerFactory}.
+ * {@link Config}, {@link ApplicationContainerContextFactory}, {@link 
ApplicationTaskContextFactory}, and
+ * {@link ProcessorLifecycleListenerFactory}.
  *
  * @param <S> the type of {@link ApplicationDescriptor} interface this 
implements. It has to be either
  *            {@link StreamApplicationDescriptor} or {@link 
TaskApplicationDescriptor}
@@ -64,17 +67,8 @@ public abstract class ApplicationDescriptorImpl<S extends 
ApplicationDescriptor>
   private final Map<String, KV<Serde, Serde>> tableSerdes = new HashMap<>();
   final Config config;
 
-  // Default to no-op functions in ContextManager
-  // TODO: this should be replaced by shared context factory defined in 
SAMZA-1714
-  ContextManager contextManager = new ContextManager() {
-    @Override
-    public void init(Config config, TaskContext context) {
-    }
-
-    @Override
-    public void close() {
-    }
-  };
+  private Optional<ApplicationContainerContextFactory<?>> 
applicationContainerContextFactoryOptional = Optional.empty();
+  private Optional<ApplicationTaskContextFactory<?>> 
applicationTaskContextFactoryOptional = Optional.empty();
 
   // Default to no-op  ProcessorLifecycleListenerFactory
   ProcessorLifecycleListenerFactory listenerFactory = (pcontext, cfg) -> new 
ProcessorLifecycleListener() { };
@@ -90,8 +84,14 @@ public abstract class ApplicationDescriptorImpl<S extends 
ApplicationDescriptor>
   }
 
   @Override
-  public S withContextManager(ContextManager contextManager) {
-    this.contextManager = contextManager;
+  public S 
withApplicationContainerContextFactory(ApplicationContainerContextFactory<?> 
factory) {
+    this.applicationContainerContextFactoryOptional = Optional.of(factory);
+    return (S) this;
+  }
+
+  @Override
+  public S withApplicationTaskContextFactory(ApplicationTaskContextFactory<?> 
factory) {
+    this.applicationTaskContextFactoryOptional = Optional.of(factory);
     return (S) this;
   }
 
@@ -118,12 +118,27 @@ public abstract class ApplicationDescriptorImpl<S extends 
ApplicationDescriptor>
   }
 
   /**
-   * Get the {@link ContextManager} associated with this application
+   * Get the {@link ApplicationContainerContextFactory} specified by the 
application.
+   *
+   * @return {@link ApplicationContainerContextFactory} if application 
specified it; empty otherwise
+   */
+  public 
Optional<ApplicationContainerContextFactory<ApplicationContainerContext>> 
getApplicationContainerContextFactory() {
+    @SuppressWarnings("unchecked") // ok because all context types are at 
least ApplicationContainerContext
+    Optional<ApplicationContainerContextFactory<ApplicationContainerContext>> 
factoryOptional =
+        (Optional) this.applicationContainerContextFactoryOptional;
+    return factoryOptional;
+  }
+
+  /**
+   * Get the {@link ApplicationTaskContextFactory} specified by the 
application.
    *
-   * @return the {@link ContextManager} for this application
+   * @return {@link ApplicationTaskContextFactory} if application specified 
it; empty otherwise
    */
-  public ContextManager getContextManager() {
-    return contextManager;
+  public Optional<ApplicationTaskContextFactory<ApplicationTaskContext>> 
getApplicationTaskContextFactory() {
+    @SuppressWarnings("unchecked") // ok because all context types are at 
least ApplicationTaskContext
+    Optional<ApplicationTaskContextFactory<ApplicationTaskContext>> 
factoryOptional =
+        (Optional) this.applicationTaskContextFactoryOptional;
+    return factoryOptional;
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/samza/blob/9d2d49e9/samza-core/src/main/java/org/apache/samza/container/TaskContextImpl.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/container/TaskContextImpl.java 
b/samza-core/src/main/java/org/apache/samza/container/TaskContextImpl.java
deleted file mode 100644
index 25ffe8f..0000000
--- a/samza-core/src/main/java/org/apache/samza/container/TaskContextImpl.java
+++ /dev/null
@@ -1,169 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.container;
-
-import com.google.common.collect.ImmutableSet;
-import java.util.function.Function;
-import org.apache.samza.checkpoint.OffsetManager;
-import org.apache.samza.job.model.JobModel;
-import org.apache.samza.metrics.ReadableMetricsRegistry;
-import org.apache.samza.storage.kv.KeyValueStore;
-import org.apache.samza.system.StreamMetadataCache;
-import org.apache.samza.system.SystemStreamPartition;
-import org.apache.samza.table.Table;
-import org.apache.samza.table.TableManager;
-import org.apache.samza.task.EpochTimeScheduler;
-import org.apache.samza.task.TaskContext;
-import org.apache.samza.scheduler.ScheduledCallback;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ScheduledExecutorService;
-
-
-/**
- * TODO this will be replaced by {@link 
org.apache.samza.context.TaskContextImpl} in the near future by SAMZA-1714
- */
-public class TaskContextImpl implements TaskContext {
-  private static final Logger LOG = 
LoggerFactory.getLogger(TaskContextImpl.class);
-
-  private final TaskName taskName;
-  private final TaskInstanceMetrics metrics;
-  private final SamzaContainerContext containerContext;
-  private final Set<SystemStreamPartition> systemStreamPartitions;
-  private final OffsetManager offsetManager;
-  private final Function<String, KeyValueStore> kvStoreSupplier;
-  private final TableManager tableManager;
-  private final JobModel jobModel;
-  private final StreamMetadataCache streamMetadataCache;
-  private final Map<String, Object> objectRegistry = new HashMap<>();
-  private final EpochTimeScheduler timerScheduler;
-
-  private Object userContext = null;
-
-  public TaskContextImpl(TaskName taskName,
-                         TaskInstanceMetrics metrics,
-                         SamzaContainerContext containerContext,
-                         Set<SystemStreamPartition> systemStreamPartitions,
-                         OffsetManager offsetManager,
-                         Function<String, KeyValueStore> kvStoreSupplier,
-                         TableManager tableManager,
-                         JobModel jobModel,
-                         StreamMetadataCache streamMetadataCache,
-                         ScheduledExecutorService timerExecutor) {
-    this.taskName = taskName;
-    this.metrics = metrics;
-    this.containerContext = containerContext;
-    this.systemStreamPartitions = ImmutableSet.copyOf(systemStreamPartitions);
-    this.offsetManager = offsetManager;
-    this.kvStoreSupplier = kvStoreSupplier;
-    this.tableManager = tableManager;
-    this.jobModel = jobModel;
-    this.streamMetadataCache = streamMetadataCache;
-    this.timerScheduler = EpochTimeScheduler.create(timerExecutor);
-  }
-
-  @Override
-  public ReadableMetricsRegistry getMetricsRegistry() {
-    return metrics.registry();
-  }
-
-  @Override
-  public Set<SystemStreamPartition> getSystemStreamPartitions() {
-    return systemStreamPartitions;
-  }
-
-  @Override
-  public KeyValueStore getStore(String storeName) {
-    KeyValueStore store = kvStoreSupplier.apply(storeName);
-    if (store == null) {
-      LOG.warn("No store found for name: {}", storeName);
-    }
-    return store;
-  }
-
-  @Override
-  public Table getTable(String tableId) {
-    if (tableManager != null) {
-      return tableManager.getTable(tableId);
-    } else {
-      LOG.warn("No table manager found");
-      return null;
-    }
-  }
-
-  @Override
-  public TaskName getTaskName() {
-    return taskName;
-  }
-
-  @Override
-  public SamzaContainerContext getSamzaContainerContext() {
-    return containerContext;
-  }
-
-  @Override
-  public void setStartingOffset(SystemStreamPartition ssp, String offset) {
-    offsetManager.setStartingOffset(taskName, ssp, offset);
-  }
-
-  @Override
-  public void setUserContext(Object context) {
-    userContext = context;
-  }
-
-  @Override
-  public Object getUserContext() {
-    return userContext;
-  }
-
-  @Override
-  public <K> void scheduleCallback(K key, long timestamp, ScheduledCallback<K> 
callback) {
-    timerScheduler.setTimer(key, timestamp, callback);
-  }
-
-  @Override
-  public <K> void deleteScheduledCallback(K key) {
-    timerScheduler.deleteTimer(key);
-  }
-
-  public void registerObject(String name, Object value) {
-    objectRegistry.put(name, value);
-  }
-
-  public Object fetchObject(String name) {
-    return objectRegistry.get(name);
-  }
-
-  public JobModel getJobModel() {
-    return jobModel;
-  }
-
-  public StreamMetadataCache getStreamMetadataCache() {
-    return streamMetadataCache;
-  }
-
-  public EpochTimeScheduler getTimerScheduler() {
-    return timerScheduler;
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/9d2d49e9/samza-core/src/main/java/org/apache/samza/context/ContextImpl.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/context/ContextImpl.java 
b/samza-core/src/main/java/org/apache/samza/context/ContextImpl.java
index c2c182f..93c7eb1 100644
--- a/samza-core/src/main/java/org/apache/samza/context/ContextImpl.java
+++ b/samza-core/src/main/java/org/apache/samza/context/ContextImpl.java
@@ -18,27 +18,36 @@
  */
 package org.apache.samza.context;
 
+import com.google.common.base.Preconditions;
+
+import java.util.Objects;
+import java.util.Optional;
+
+
 public class ContextImpl implements Context {
   private final JobContext jobContext;
   private final ContainerContext containerContext;
   private final TaskContext taskContext;
-  private final ApplicationContainerContext applicationContainerContext;
-  private final ApplicationTaskContext applicationTaskContext;
+  private final Optional<ApplicationContainerContext> 
applicationContainerContextOptional;
+  private final Optional<ApplicationTaskContext> 
applicationTaskContextOptional;
 
   /**
    * @param jobContext non-null job context
    * @param containerContext non-null framework container context
    * @param taskContext non-null framework task context
-   * @param applicationContainerContext nullable application-defined container 
context
-   * @param applicationTaskContext nullable application-defined task context
+   * @param applicationContainerContextOptional optional application-defined 
container context
+   * @param applicationTaskContextOptional optional application-defined task 
context
    */
-  public ContextImpl(JobContext jobContext, ContainerContext containerContext, 
TaskContext taskContext,
-      ApplicationContainerContext applicationContainerContext, 
ApplicationTaskContext applicationTaskContext) {
-    this.jobContext = jobContext;
-    this.containerContext = containerContext;
-    this.taskContext = taskContext;
-    this.applicationContainerContext = applicationContainerContext;
-    this.applicationTaskContext = applicationTaskContext;
+  public ContextImpl(JobContext jobContext,
+      ContainerContext containerContext,
+      TaskContext taskContext,
+      Optional<ApplicationContainerContext> 
applicationContainerContextOptional,
+      Optional<ApplicationTaskContext> applicationTaskContextOptional) {
+    this.jobContext = Preconditions.checkNotNull(jobContext, "Job context can 
not be null");
+    this.containerContext = Preconditions.checkNotNull(containerContext, 
"Container context can not be null");
+    this.taskContext = Preconditions.checkNotNull(taskContext, "Task context 
can not be null");
+    this.applicationContainerContextOptional = 
applicationContainerContextOptional;
+    this.applicationTaskContextOptional = applicationTaskContextOptional;
   }
 
   @Override
@@ -58,17 +67,38 @@ public class ContextImpl implements Context {
 
   @Override
   public ApplicationContainerContext getApplicationContainerContext() {
-    if (this.applicationContainerContext == null) {
+    if (!this.applicationContainerContextOptional.isPresent()) {
       throw new IllegalStateException("No application-defined container 
context exists");
     }
-    return this.applicationContainerContext;
+    return this.applicationContainerContextOptional.get();
   }
 
   @Override
   public ApplicationTaskContext getApplicationTaskContext() {
-    if (this.applicationTaskContext == null) {
+    if (!this.applicationTaskContextOptional.isPresent()) {
       throw new IllegalStateException("No application-defined task context 
exists");
     }
-    return this.applicationTaskContext;
+    return this.applicationTaskContextOptional.get();
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+    ContextImpl context = (ContextImpl) o;
+    return Objects.equals(jobContext, context.jobContext) && 
Objects.equals(containerContext, context.containerContext)
+        && Objects.equals(taskContext, context.taskContext) && 
Objects.equals(applicationContainerContextOptional,
+        context.applicationContainerContextOptional) && 
Objects.equals(applicationTaskContextOptional,
+        context.applicationTaskContextOptional);
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(jobContext, containerContext, taskContext, 
applicationContainerContextOptional,
+        applicationTaskContextOptional);
   }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/9d2d49e9/samza-core/src/main/java/org/apache/samza/context/JobContextImpl.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/context/JobContextImpl.java 
b/samza-core/src/main/java/org/apache/samza/context/JobContextImpl.java
index 8fe44e4..797e2ca 100644
--- a/samza-core/src/main/java/org/apache/samza/context/JobContextImpl.java
+++ b/samza-core/src/main/java/org/apache/samza/context/JobContextImpl.java
@@ -19,6 +19,8 @@
 package org.apache.samza.context;
 
 import org.apache.samza.config.Config;
+import org.apache.samza.config.JobConfig;
+import scala.Option;
 
 
 public class JobContextImpl implements JobContext {
@@ -26,12 +28,30 @@ public class JobContextImpl implements JobContext {
   private final String jobName;
   private final String jobId;
 
-  public JobContextImpl(Config config, String jobName, String jobId) {
+  private JobContextImpl(Config config, String jobName, String jobId) {
     this.config = config;
     this.jobName = jobName;
     this.jobId = jobId;
   }
 
+  /**
+   * Build a {@link JobContextImpl} from a {@link Config} object.
+   * This extracts some information like job name and job id.
+   *
+   * @param config used to extract job information
+   * @return {@link JobContextImpl} corresponding to the {@code config}
+   * @throws IllegalArgumentException if job name is not defined in the {@code 
config}
+   */
+  public static JobContextImpl fromConfigWithDefaults(Config config) {
+    JobConfig jobConfig = new JobConfig(config);
+    Option<String> jobName = jobConfig.getName();
+    if (jobName.isEmpty()) {
+      throw new IllegalArgumentException("Job name is not defined in 
configuration");
+    }
+    String jobId = jobConfig.getJobId();
+    return new JobContextImpl(config, jobName.get(), jobId);
+  }
+
   @Override
   public Config getConfig() {
     return this.config;

http://git-wip-us.apache.org/repos/asf/samza/blob/9d2d49e9/samza-core/src/main/java/org/apache/samza/context/TaskContextImpl.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/context/TaskContextImpl.java 
b/samza-core/src/main/java/org/apache/samza/context/TaskContextImpl.java
index e975dcd..ec52f8a 100644
--- a/samza-core/src/main/java/org/apache/samza/context/TaskContextImpl.java
+++ b/samza-core/src/main/java/org/apache/samza/context/TaskContextImpl.java
@@ -18,16 +18,21 @@
  */
 package org.apache.samza.context;
 
-import java.util.function.Function;
 import org.apache.samza.checkpoint.OffsetManager;
+import org.apache.samza.job.model.JobModel;
 import org.apache.samza.job.model.TaskModel;
 import org.apache.samza.metrics.MetricsRegistry;
 import org.apache.samza.scheduler.CallbackScheduler;
 import org.apache.samza.storage.kv.KeyValueStore;
+import org.apache.samza.system.StreamMetadataCache;
 import org.apache.samza.system.SystemStreamPartition;
 import org.apache.samza.table.Table;
 import org.apache.samza.table.TableManager;
 
+import java.util.HashMap;
+import java.util.Map;
+import java.util.function.Function;
+
 
 public class TaskContextImpl implements TaskContext {
   private final TaskModel taskModel;
@@ -36,19 +41,26 @@ public class TaskContextImpl implements TaskContext {
   private final TableManager tableManager;
   private final CallbackScheduler callbackScheduler;
   private final OffsetManager offsetManager;
+  private final JobModel jobModel;
+  private final StreamMetadataCache streamMetadataCache;
+  private final Map<String, Object> objectRegistry = new HashMap<>();
 
   public TaskContextImpl(TaskModel taskModel,
       MetricsRegistry taskMetricsRegistry,
       Function<String, KeyValueStore> keyValueStoreProvider,
       TableManager tableManager,
       CallbackScheduler callbackScheduler,
-      OffsetManager offsetManager) {
+      OffsetManager offsetManager,
+      JobModel jobModel,
+      StreamMetadataCache streamMetadataCache) {
     this.taskModel = taskModel;
     this.taskMetricsRegistry = taskMetricsRegistry;
     this.keyValueStoreProvider = keyValueStoreProvider;
     this.tableManager = tableManager;
     this.callbackScheduler = callbackScheduler;
     this.offsetManager = offsetManager;
+    this.jobModel = jobModel;
+    this.streamMetadataCache = streamMetadataCache;
   }
 
   @Override
@@ -84,4 +96,22 @@ public class TaskContextImpl implements TaskContext {
   public void setStartingOffset(SystemStreamPartition systemStreamPartition, 
String offset) {
     this.offsetManager.setStartingOffset(this.taskModel.getTaskName(), 
systemStreamPartition, offset);
   }
+
+  // TODO SAMZA-1935: below methods are used by operator code; they should be 
decoupled from this client API
+
+  public void registerObject(String name, Object value) {
+    this.objectRegistry.put(name, value);
+  }
+
+  public Object fetchObject(String name) {
+    return this.objectRegistry.get(name);
+  }
+
+  public JobModel getJobModel() {
+    return this.jobModel;
+  }
+
+  public StreamMetadataCache getStreamMetadataCache() {
+    return this.streamMetadataCache;
+  }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/9d2d49e9/samza-core/src/main/java/org/apache/samza/operators/impl/BroadcastOperatorImpl.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/operators/impl/BroadcastOperatorImpl.java
 
b/samza-core/src/main/java/org/apache/samza/operators/impl/BroadcastOperatorImpl.java
index 99ed089..4965f7b 100644
--- 
a/samza-core/src/main/java/org/apache/samza/operators/impl/BroadcastOperatorImpl.java
+++ 
b/samza-core/src/main/java/org/apache/samza/operators/impl/BroadcastOperatorImpl.java
@@ -19,7 +19,7 @@
 
 package org.apache.samza.operators.impl;
 
-import org.apache.samza.config.Config;
+import org.apache.samza.context.Context;
 import org.apache.samza.operators.spec.BroadcastOperatorSpec;
 import org.apache.samza.operators.spec.OperatorSpec;
 import org.apache.samza.system.ControlMessage;
@@ -28,7 +28,6 @@ import org.apache.samza.system.OutgoingMessageEnvelope;
 import org.apache.samza.system.SystemStream;
 import org.apache.samza.system.WatermarkMessage;
 import org.apache.samza.task.MessageCollector;
-import org.apache.samza.task.TaskContext;
 import org.apache.samza.task.TaskCoordinator;
 
 import java.util.Collection;
@@ -40,14 +39,14 @@ class BroadcastOperatorImpl<M> extends OperatorImpl<M, 
Void> {
   private final SystemStream systemStream;
   private final String taskName;
 
-  BroadcastOperatorImpl(BroadcastOperatorSpec<M> broadcastOpSpec, SystemStream 
systemStream, TaskContext context) {
+  BroadcastOperatorImpl(BroadcastOperatorSpec<M> broadcastOpSpec, SystemStream 
systemStream, Context context) {
     this.broadcastOpSpec = broadcastOpSpec;
     this.systemStream = systemStream;
-    this.taskName = context.getTaskName().getTaskName();
+    this.taskName = 
context.getTaskContext().getTaskModel().getTaskName().getTaskName();
   }
 
   @Override
-  protected void handleInit(Config config, TaskContext context) {
+  protected void handleInit(Context context) {
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/samza/blob/9d2d49e9/samza-core/src/main/java/org/apache/samza/operators/impl/InputOperatorImpl.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/operators/impl/InputOperatorImpl.java
 
b/samza-core/src/main/java/org/apache/samza/operators/impl/InputOperatorImpl.java
index 6cc57e0..2a73064 100644
--- 
a/samza-core/src/main/java/org/apache/samza/operators/impl/InputOperatorImpl.java
+++ 
b/samza-core/src/main/java/org/apache/samza/operators/impl/InputOperatorImpl.java
@@ -18,14 +18,13 @@
  */
 package org.apache.samza.operators.impl;
 
-import org.apache.samza.config.Config;
+import org.apache.samza.context.Context;
 import org.apache.samza.operators.KV;
 import org.apache.samza.operators.functions.InputTransformer;
 import org.apache.samza.operators.spec.InputOperatorSpec;
 import org.apache.samza.operators.spec.OperatorSpec;
 import org.apache.samza.system.IncomingMessageEnvelope;
 import org.apache.samza.task.MessageCollector;
-import org.apache.samza.task.TaskContext;
 import org.apache.samza.task.TaskCoordinator;
 
 import java.util.Collection;
@@ -44,7 +43,7 @@ public final class InputOperatorImpl extends 
OperatorImpl<IncomingMessageEnvelop
   }
 
   @Override
-  protected void handleInit(Config config, TaskContext context) {
+  protected void handleInit(Context context) {
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/samza/blob/9d2d49e9/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImpl.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImpl.java 
b/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImpl.java
index 5cafd26..675b211 100644
--- a/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImpl.java
+++ b/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImpl.java
@@ -21,23 +21,23 @@ package org.apache.samza.operators.impl;
 import org.apache.samza.SamzaException;
 import org.apache.samza.config.Config;
 import org.apache.samza.config.MetricsConfig;
-import org.apache.samza.container.TaskContextImpl;
 import org.apache.samza.container.TaskName;
-import org.apache.samza.job.model.ContainerModel;
+import org.apache.samza.context.Context;
+import org.apache.samza.context.TaskContextImpl;
 import org.apache.samza.job.model.TaskModel;
-import org.apache.samza.operators.Scheduler;
-import org.apache.samza.operators.functions.ScheduledFunction;
-import org.apache.samza.operators.functions.WatermarkFunction;
-import org.apache.samza.system.EndOfStreamMessage;
 import org.apache.samza.metrics.Counter;
 import org.apache.samza.metrics.MetricsRegistry;
 import org.apache.samza.metrics.Timer;
+import org.apache.samza.operators.Scheduler;
+import org.apache.samza.operators.functions.ScheduledFunction;
+import org.apache.samza.operators.functions.WatermarkFunction;
 import org.apache.samza.operators.spec.OperatorSpec;
+import org.apache.samza.scheduler.CallbackScheduler;
+import org.apache.samza.system.EndOfStreamMessage;
 import org.apache.samza.system.SystemStream;
 import org.apache.samza.system.SystemStreamPartition;
 import org.apache.samza.system.WatermarkMessage;
 import org.apache.samza.task.MessageCollector;
-import org.apache.samza.task.TaskContext;
 import org.apache.samza.task.TaskCoordinator;
 import org.apache.samza.util.HighResolutionClock;
 import org.slf4j.Logger;
@@ -82,16 +82,15 @@ public abstract class OperatorImpl<M, RM> {
   private EndOfStreamStates eosStates;
   // watermark states
   private WatermarkStates watermarkStates;
-  private TaskContext taskContext;
+  private CallbackScheduler callbackScheduler;
   private ControlMessageSender controlMessageSender;
 
   /**
    * Initialize this {@link OperatorImpl} and its user-defined functions.
    *
-   * @param config  the {@link Config} for the task
-   * @param context  the {@link TaskContext} for the task
+   * @param context the {@link Context} for the task
    */
-  public final void init(Config config, TaskContext context) {
+  public final void init(Context context) {
     String opId = getOpImplId();
 
     if (initialized) {
@@ -102,32 +101,24 @@ public abstract class OperatorImpl<M, RM> {
       throw new IllegalStateException(String.format("Attempted to initialize 
Operator %s after it was closed.", opId));
     }
 
-    this.highResClock = createHighResClock(config);
+    this.highResClock = 
createHighResClock(context.getJobContext().getConfig());
     registeredOperators = new HashSet<>();
     prevOperators = new HashSet<>();
     inputStreams = new HashSet<>();
-    MetricsRegistry metricsRegistry = context.getMetricsRegistry();
+    // TODO SAMZA-1935: the objects that are only accessible through 
TaskContextImpl should be moved somewhere else
+    TaskContextImpl taskContext = (TaskContextImpl) context.getTaskContext();
+    MetricsRegistry metricsRegistry = taskContext.getTaskMetricsRegistry();
     this.numMessage = metricsRegistry.newCounter(METRICS_GROUP, opId + 
"-messages");
     this.handleMessageNs = metricsRegistry.newTimer(METRICS_GROUP, opId + 
"-handle-message-ns");
     this.handleTimerNs = metricsRegistry.newTimer(METRICS_GROUP, opId + 
"-handle-timer-ns");
-    this.taskName = context.getTaskName();
+    this.taskName = taskContext.getTaskModel().getTaskName();
 
-    TaskContextImpl taskContext = (TaskContextImpl) context;
     this.eosStates = (EndOfStreamStates) 
taskContext.fetchObject(EndOfStreamStates.class.getName());
     this.watermarkStates = (WatermarkStates) 
taskContext.fetchObject(WatermarkStates.class.getName());
     this.controlMessageSender = new 
ControlMessageSender(taskContext.getStreamMetadataCache());
-
-    if (taskContext.getJobModel() != null) {
-      ContainerModel containerModel = taskContext.getJobModel().getContainers()
-          .get(context.getSamzaContainerContext().id);
-      this.taskModel = containerModel.getTasks().get(taskName);
-    } else {
-      this.taskModel = null;
-      this.usedInCurrentTask = true;
-    }
-
-    this.taskContext = taskContext;
-    handleInit(config, taskContext);
+    this.taskModel = taskContext.getTaskModel();
+    this.callbackScheduler = taskContext.getCallbackScheduler();
+    handleInit(context);
 
     initialized = true;
   }
@@ -135,10 +126,9 @@ public abstract class OperatorImpl<M, RM> {
   /**
    * Initialize this {@link OperatorImpl} and its user-defined functions.
    *
-   * @param config  the {@link Config} for the task
-   * @param context  the {@link TaskContext} for the task
+   * @param context the {@link Context} for the task
    */
-  protected abstract void handleInit(Config config, TaskContext context);
+  protected abstract void handleInit(Context context);
 
   /**
    * Register an operator that this operator should propagate its results to.
@@ -448,7 +438,7 @@ public abstract class OperatorImpl<M, RM> {
     return new Scheduler<K>() {
       @Override
       public void schedule(K key, long time) {
-        taskContext.scheduleCallback(key, time, (k, collector, coordinator) -> 
{
+        callbackScheduler.scheduleCallback(key, time, (k, collector, 
coordinator) -> {
             final ScheduledFunction<K, RM> scheduledFn = 
getOperatorSpec().getScheduledFn();
             if (scheduledFn != null) {
               final Collection<RM> output = scheduledFn.onCallback(key, time);
@@ -468,7 +458,7 @@ public abstract class OperatorImpl<M, RM> {
 
       @Override
       public void delete(K key) {
-        taskContext.deleteScheduledCallback(key);
+        callbackScheduler.deleteCallback(key);
       }
     };
   }

http://git-wip-us.apache.org/repos/asf/samza/blob/9d2d49e9/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImplGraph.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImplGraph.java
 
b/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImplGraph.java
index d76c7de..e668b91 100644
--- 
a/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImplGraph.java
+++ 
b/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImplGraph.java
@@ -23,20 +23,18 @@ import com.google.common.collect.Lists;
 import com.google.common.collect.Multimap;
 import org.apache.samza.config.Config;
 import org.apache.samza.config.StreamConfig;
-import org.apache.samza.container.SamzaContainerContext;
-import org.apache.samza.container.TaskContextImpl;
+import org.apache.samza.context.Context;
+import org.apache.samza.context.TaskContextImpl;
 import org.apache.samza.job.model.JobModel;
-import org.apache.samza.metrics.MetricsRegistry;
 import org.apache.samza.operators.KV;
+import org.apache.samza.operators.OperatorSpecGraph;
 import org.apache.samza.operators.Scheduler;
 import org.apache.samza.operators.functions.JoinFunction;
 import org.apache.samza.operators.functions.PartialJoinFunction;
-import org.apache.samza.util.TimestampedValue;
 import org.apache.samza.operators.spec.BroadcastOperatorSpec;
 import org.apache.samza.operators.spec.InputOperatorSpec;
 import org.apache.samza.operators.spec.JoinOperatorSpec;
 import org.apache.samza.operators.spec.OperatorSpec;
-import org.apache.samza.operators.OperatorSpecGraph;
 import org.apache.samza.operators.spec.OutputOperatorSpec;
 import org.apache.samza.operators.spec.PartitionByOperatorSpec;
 import org.apache.samza.operators.spec.SendToTableOperatorSpec;
@@ -46,8 +44,8 @@ import 
org.apache.samza.operators.spec.StreamTableJoinOperatorSpec;
 import org.apache.samza.operators.spec.WindowOperatorSpec;
 import org.apache.samza.storage.kv.KeyValueStore;
 import org.apache.samza.system.SystemStream;
-import org.apache.samza.task.TaskContext;
 import org.apache.samza.util.Clock;
+import org.apache.samza.util.TimestampedValue;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -93,14 +91,14 @@ public class OperatorImplGraph {
    * in the {@code specGraph}.
    *
    * @param specGraph  the {@link OperatorSpecGraph} containing the logical 
{@link OperatorSpec} DAG
-   * @param config  the {@link Config} required to instantiate operators
-   * @param context  the {@link TaskContext} required to instantiate operators
+   * @param context  the {@link Context} required to instantiate operators
    * @param clock  the {@link Clock} to get current time
    */
-  public OperatorImplGraph(OperatorSpecGraph specGraph, Config config, 
TaskContext context, Clock clock) {
+  public OperatorImplGraph(OperatorSpecGraph specGraph, Context context, Clock 
clock) {
     this.clock = clock;
-    StreamConfig streamConfig = new StreamConfig(config);
-    TaskContextImpl taskContext = (TaskContextImpl) context;
+    StreamConfig streamConfig = new 
StreamConfig(context.getJobContext().getConfig());
+    // TODO SAMZA-1935: the objects that are only accessible through 
TaskContextImpl should be moved somewhere else
+    TaskContextImpl taskContext = (TaskContextImpl) context.getTaskContext();
     Map<SystemStream, Integer> producerTaskCounts =
         hasIntermediateStreams(specGraph)
             ? getProducerTaskCountForIntermediateStreams(
@@ -113,15 +111,16 @@ public class OperatorImplGraph {
 
     // set states for end-of-stream
     taskContext.registerObject(EndOfStreamStates.class.getName(),
-        new EndOfStreamStates(context.getSystemStreamPartitions(), 
producerTaskCounts));
+        new 
EndOfStreamStates(taskContext.getTaskModel().getSystemStreamPartitions(), 
producerTaskCounts));
     // set states for watermark
     taskContext.registerObject(WatermarkStates.class.getName(),
-        new WatermarkStates(context.getSystemStreamPartitions(), 
producerTaskCounts, getMetricsRegistry(context)));
+        new 
WatermarkStates(taskContext.getTaskModel().getSystemStreamPartitions(), 
producerTaskCounts,
+            context.getContainerContext().getContainerMetricsRegistry()));
 
     specGraph.getInputOperators().forEach((streamId, inputOpSpec) -> {
         SystemStream systemStream = 
streamConfig.streamIdToSystemStream(streamId);
         InputOperatorImpl inputOperatorImpl =
-            (InputOperatorImpl) createAndRegisterOperatorImpl(null, 
inputOpSpec, systemStream, config, context);
+            (InputOperatorImpl) createAndRegisterOperatorImpl(null, 
inputOpSpec, systemStream, context);
         this.inputOperators.put(systemStream, inputOperatorImpl);
       });
   }
@@ -158,18 +157,16 @@ public class OperatorImplGraph {
    * @param prevOperatorSpec  the parent of the current {@code operatorSpec} 
in the traversal
    * @param operatorSpec  the {@link OperatorSpec} to create the {@link 
OperatorImpl} for
    * @param inputStream  the source input stream that we traverse the {@link 
OperatorSpecGraph} from
-   * @param config  the {@link Config} required to instantiate operators
-   * @param context  the {@link TaskContext} required to instantiate operators
+   * @param context the {@link Context} required to instantiate operators
    * @return  the operator implementation for the operatorSpec
    */
   private OperatorImpl createAndRegisterOperatorImpl(OperatorSpec 
prevOperatorSpec, OperatorSpec operatorSpec,
-      SystemStream inputStream, Config config, TaskContext context) {
-
+      SystemStream inputStream, Context context) {
     if (!operatorImpls.containsKey(operatorSpec.getOpId()) || operatorSpec 
instanceof JoinOperatorSpec) {
       // Either this is the first time we've seen this operatorSpec, or this 
is a join operator spec
       // and we need to create 2 partial join operator impls for it. 
Initialize and register the sub-DAG.
-      OperatorImpl operatorImpl = createOperatorImpl(prevOperatorSpec, 
operatorSpec, config, context);
-      operatorImpl.init(config, context);
+      OperatorImpl operatorImpl = createOperatorImpl(prevOperatorSpec, 
operatorSpec, context);
+      operatorImpl.init(context);
       operatorImpl.registerInputStream(inputStream);
 
       if (operatorSpec.getScheduledFn() != null) {
@@ -185,8 +182,7 @@ public class OperatorImplGraph {
       Collection<OperatorSpec> registeredSpecs = 
operatorSpec.getRegisteredOperatorSpecs();
       registeredSpecs.forEach(registeredSpec -> {
           LOG.debug("Creating operator {} with opCode: {}", 
registeredSpec.getOpId(), registeredSpec.getOpCode());
-          OperatorImpl nextImpl =
-              createAndRegisterOperatorImpl(operatorSpec, registeredSpec, 
inputStream, config, context);
+          OperatorImpl nextImpl = createAndRegisterOperatorImpl(operatorSpec, 
registeredSpec, inputStream, context);
           operatorImpl.registerNextOperator(nextImpl);
         });
       return operatorImpl;
@@ -197,9 +193,8 @@ public class OperatorImplGraph {
 
       // We still need to traverse the DAG further to register the input 
streams.
       Collection<OperatorSpec> registeredSpecs = 
operatorSpec.getRegisteredOperatorSpecs();
-      registeredSpecs.forEach(registeredSpec -> {
-          createAndRegisterOperatorImpl(operatorSpec, registeredSpec, 
inputStream, config, context);
-        });
+      registeredSpecs.forEach(
+          registeredSpec -> createAndRegisterOperatorImpl(operatorSpec, 
registeredSpec, inputStream, context));
       return operatorImpl;
     }
   }
@@ -209,19 +204,18 @@ public class OperatorImplGraph {
    *
    * @param prevOperatorSpec the original {@link OperatorSpec} that produces 
output for {@code operatorSpec} from {@link OperatorSpecGraph}
    * @param operatorSpec  the original {@link OperatorSpec} from {@link 
OperatorSpecGraph}
-   * @param config  the {@link Config} required to instantiate operators
-   * @param context  the {@link TaskContext} required to instantiate operators
+   * @param context  the {@link Context} required to instantiate operators
    * @return  the {@link OperatorImpl} implementation instance
    */
-  OperatorImpl createOperatorImpl(OperatorSpec prevOperatorSpec, OperatorSpec 
operatorSpec,
-      Config config, TaskContext context) {
+  OperatorImpl createOperatorImpl(OperatorSpec prevOperatorSpec, OperatorSpec 
operatorSpec, Context context) {
+    Config config = context.getJobContext().getConfig();
     StreamConfig streamConfig = new StreamConfig(config);
     if (operatorSpec instanceof InputOperatorSpec) {
       return new InputOperatorImpl((InputOperatorSpec) operatorSpec);
     } else if (operatorSpec instanceof StreamOperatorSpec) {
       return new StreamOperatorImpl((StreamOperatorSpec) operatorSpec);
     } else if (operatorSpec instanceof SinkOperatorSpec) {
-      return new SinkOperatorImpl((SinkOperatorSpec) operatorSpec, config, 
context);
+      return new SinkOperatorImpl((SinkOperatorSpec) operatorSpec);
     } else if (operatorSpec instanceof OutputOperatorSpec) {
       String streamId = ((OutputOperatorSpec) 
operatorSpec).getOutputStream().getStreamId();
       SystemStream systemStream = 
streamConfig.streamIdToSystemStream(streamId);
@@ -234,12 +228,11 @@ public class OperatorImplGraph {
       return new WindowOperatorImpl((WindowOperatorSpec) operatorSpec, clock);
     } else if (operatorSpec instanceof JoinOperatorSpec) {
       return getOrCreatePartialJoinOpImpls((JoinOperatorSpec) operatorSpec,
-          prevOperatorSpec.equals(((JoinOperatorSpec) 
operatorSpec).getLeftInputOpSpec()),
-          config, context, clock);
+          prevOperatorSpec.equals(((JoinOperatorSpec) 
operatorSpec).getLeftInputOpSpec()), clock);
     } else if (operatorSpec instanceof StreamTableJoinOperatorSpec) {
-      return new StreamTableJoinOperatorImpl((StreamTableJoinOperatorSpec) 
operatorSpec, config, context);
+      return new StreamTableJoinOperatorImpl((StreamTableJoinOperatorSpec) 
operatorSpec, context);
     } else if (operatorSpec instanceof SendToTableOperatorSpec) {
-      return new SendToTableOperatorImpl((SendToTableOperatorSpec) 
operatorSpec, config, context);
+      return new SendToTableOperatorImpl((SendToTableOperatorSpec) 
operatorSpec, context);
     } else if (operatorSpec instanceof BroadcastOperatorSpec) {
       String streamId = ((BroadcastOperatorSpec) 
operatorSpec).getOutputStream().getStreamId();
       SystemStream systemStream = 
streamConfig.streamIdToSystemStream(streamId);
@@ -250,14 +243,14 @@ public class OperatorImplGraph {
   }
 
   private PartialJoinOperatorImpl 
getOrCreatePartialJoinOpImpls(JoinOperatorSpec joinOpSpec, boolean isLeft,
-      Config config, TaskContext context, Clock clock) {
+      Clock clock) {
     // get the per task pair of PartialJoinOperatorImpl for the corresponding 
{@code joinOpSpec}
     KV<PartialJoinOperatorImpl, PartialJoinOperatorImpl> partialJoinOpImpls = 
joinOpImpls.computeIfAbsent(joinOpSpec.getOpId(),
         joinOpId -> {
         PartialJoinFunction leftJoinFn = createLeftJoinFn(joinOpSpec);
         PartialJoinFunction rightJoinFn = createRightJoinFn(joinOpSpec);
-        return new KV(new PartialJoinOperatorImpl(joinOpSpec, true, 
leftJoinFn, rightJoinFn, config, context, clock),
-            new PartialJoinOperatorImpl(joinOpSpec, false, rightJoinFn, 
leftJoinFn, config, context, clock));
+        return new KV(new PartialJoinOperatorImpl(joinOpSpec, true, 
leftJoinFn, rightJoinFn, clock),
+            new PartialJoinOperatorImpl(joinOpSpec, false, rightJoinFn, 
leftJoinFn, clock));
       });
 
     if (isLeft) { // we got here from the left side of the join
@@ -288,12 +281,13 @@ public class OperatorImplGraph {
       }
 
       @Override
-      public void init(Config config, TaskContext context) {
+      public void init(Context context) {
         String leftStoreName = joinOpSpec.getLeftOpId();
-        leftStreamState = (KeyValueStore<Object, TimestampedValue<Object>>) 
context.getStore(leftStoreName);
+        leftStreamState =
+            (KeyValueStore<Object, TimestampedValue<Object>>) 
context.getTaskContext().getStore(leftStoreName);
 
         // user-defined joinFn should only be initialized once, so we do it 
only in left partial join function.
-        joinFn.init(config, context);
+        joinFn.init(context);
       }
 
       @Override
@@ -320,9 +314,10 @@ public class OperatorImplGraph {
       }
 
       @Override
-      public void init(Config config, TaskContext context) {
+      public void init(Context context) {
         String rightStoreName = joinOpSpec.getRightOpId();
-        rightStreamState = (KeyValueStore<Object, TimestampedValue<Object>>) 
context.getStore(rightStoreName);
+        rightStreamState =
+            (KeyValueStore<Object, TimestampedValue<Object>>) 
context.getTaskContext().getStore(rightStoreName);
 
         // user-defined joinFn should only be initialized once,
         // so we do it only in left partial join function and not here again.
@@ -405,9 +400,4 @@ public class OperatorImplGraph {
   private boolean hasIntermediateStreams(OperatorSpecGraph specGraph) {
     return !Collections.disjoint(specGraph.getInputOperators().keySet(), 
specGraph.getOutputStreams().keySet());
   }
-
-  private static MetricsRegistry getMetricsRegistry(TaskContext context) {
-    final SamzaContainerContext containerContext = 
context.getSamzaContainerContext();
-    return containerContext != null ? containerContext.metricsRegistry : 
context.getMetricsRegistry();
-  }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/9d2d49e9/samza-core/src/main/java/org/apache/samza/operators/impl/OutputOperatorImpl.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/operators/impl/OutputOperatorImpl.java
 
b/samza-core/src/main/java/org/apache/samza/operators/impl/OutputOperatorImpl.java
index 22fbb1b..407cdd9 100644
--- 
a/samza-core/src/main/java/org/apache/samza/operators/impl/OutputOperatorImpl.java
+++ 
b/samza-core/src/main/java/org/apache/samza/operators/impl/OutputOperatorImpl.java
@@ -18,7 +18,7 @@
  */
 package org.apache.samza.operators.impl;
 
-import org.apache.samza.config.Config;
+import org.apache.samza.context.Context;
 import org.apache.samza.operators.KV;
 import org.apache.samza.operators.spec.OperatorSpec;
 import org.apache.samza.operators.spec.OutputOperatorSpec;
@@ -26,7 +26,6 @@ import org.apache.samza.operators.spec.OutputStreamImpl;
 import org.apache.samza.system.OutgoingMessageEnvelope;
 import org.apache.samza.system.SystemStream;
 import org.apache.samza.task.MessageCollector;
-import org.apache.samza.task.TaskContext;
 import org.apache.samza.task.TaskCoordinator;
 
 import java.util.Collection;
@@ -49,7 +48,7 @@ class OutputOperatorImpl<M> extends OperatorImpl<M, Void> {
   }
 
   @Override
-  protected void handleInit(Config config, TaskContext context) {
+  protected void handleInit(Context context) {
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/samza/blob/9d2d49e9/samza-core/src/main/java/org/apache/samza/operators/impl/PartialJoinOperatorImpl.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/operators/impl/PartialJoinOperatorImpl.java
 
b/samza-core/src/main/java/org/apache/samza/operators/impl/PartialJoinOperatorImpl.java
index 0cdde49..55658eb 100644
--- 
a/samza-core/src/main/java/org/apache/samza/operators/impl/PartialJoinOperatorImpl.java
+++ 
b/samza-core/src/main/java/org/apache/samza/operators/impl/PartialJoinOperatorImpl.java
@@ -19,16 +19,15 @@
 package org.apache.samza.operators.impl;
 
 import org.apache.samza.SamzaException;
-import org.apache.samza.config.Config;
+import org.apache.samza.context.Context;
 import org.apache.samza.operators.functions.PartialJoinFunction;
-import org.apache.samza.util.TimestampedValue;
 import org.apache.samza.operators.spec.JoinOperatorSpec;
 import org.apache.samza.operators.spec.OperatorSpec;
 import org.apache.samza.storage.kv.KeyValueStore;
 import org.apache.samza.task.MessageCollector;
-import org.apache.samza.task.TaskContext;
 import org.apache.samza.task.TaskCoordinator;
 import org.apache.samza.util.Clock;
+import org.apache.samza.util.TimestampedValue;
 
 import java.util.Collection;
 import java.util.Collections;
@@ -54,7 +53,7 @@ class PartialJoinOperatorImpl<K, M, OM, JM> extends 
OperatorImpl<M, JM> {
   PartialJoinOperatorImpl(JoinOperatorSpec<K, M, OM, JM> joinOpSpec, boolean 
isLeftSide,
       PartialJoinFunction<K, M, OM, JM> thisPartialJoinFn,
       PartialJoinFunction<K, OM, M, JM> otherPartialJoinFn,
-      Config config, TaskContext context, Clock clock) {
+      Clock clock) {
     this.joinOpSpec = joinOpSpec;
     this.isLeftSide = isLeftSide;
     this.thisPartialJoinFn = thisPartialJoinFn;
@@ -64,8 +63,8 @@ class PartialJoinOperatorImpl<K, M, OM, JM> extends 
OperatorImpl<M, JM> {
   }
 
   @Override
-  protected void handleInit(Config config, TaskContext context) {
-    this.thisPartialJoinFn.init(config, context);
+  protected void handleInit(Context context) {
+    this.thisPartialJoinFn.init(context);
   }
 
   @Override

Reply via email to