SAMZA-1714: Creating shared context factory for shared context objects <s>This includes changes in https://github.com/apache/samza/pull/626.</s> Update: PR #626 has been merged, so the diff here should no longer show those changes.
Author: Cameron Lee <[email protected]> Reviewers: Prateek Maheshwari <[email protected]> Closes #672 from cameronlee314/shared_context_impl Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/9d2d49e9 Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/9d2d49e9 Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/9d2d49e9 Branch: refs/heads/master Commit: 9d2d49e9e987699f053a46eb12a64cfe8cc42907 Parents: d2c9e81 Author: Cameron Lee <[email protected]> Authored: Wed Oct 10 15:34:19 2018 -0700 Committer: Prateek Maheshwari <[email protected]> Committed: Wed Oct 10 15:34:19 2018 -0700 ---------------------------------------------------------------------- .../versioned/jobs/configuration-table.html | 4 +- .../versioned/hello-samza-high-level-code.md | 2 +- .../application/ApplicationDescriptor.java | 30 +++- .../samza/container/SamzaContainerContext.java | 55 ------ .../context/ApplicationContainerContext.java | 7 +- .../samza/context/ApplicationTaskContext.java | 4 + .../org/apache/samza/context/JobContext.java | 1 + .../apache/samza/operators/ContextManager.java | 49 ------ .../operators/functions/InitableFunction.java | 9 +- .../samza/scheduler/CallbackScheduler.java | 1 + .../samza/scheduler/ScheduledCallback.java | 5 +- .../samza/storage/StorageEngineFactory.java | 8 +- .../org/apache/samza/table/ReadableTable.java | 9 +- .../org/apache/samza/table/TableProvider.java | 9 +- .../org/apache/samza/task/InitableTask.java | 6 +- .../java/org/apache/samza/task/TaskContext.java | 98 ----------- .../java/org/apache/samza/util/RateLimiter.java | 9 +- .../application/ApplicationDescriptorImpl.java | 55 +++--- .../apache/samza/container/TaskContextImpl.java | 169 ------------------- .../org/apache/samza/context/ContextImpl.java | 60 +++++-- .../apache/samza/context/JobContextImpl.java | 22 ++- .../apache/samza/context/TaskContextImpl.java | 34 +++- .../operators/impl/BroadcastOperatorImpl.java | 9 +- .../samza/operators/impl/InputOperatorImpl.java | 5 +- .../samza/operators/impl/OperatorImpl.java | 54 +++--- .../samza/operators/impl/OperatorImplGraph.java | 84 ++++----- .../operators/impl/OutputOperatorImpl.java | 5 +- .../operators/impl/PartialJoinOperatorImpl.java | 11 +- .../operators/impl/PartitionByOperatorImpl.java | 17 +- .../operators/impl/SendToTableOperatorImpl.java | 15 +- .../samza/operators/impl/SinkOperatorImpl.java | 9 +- .../operators/impl/StreamOperatorImpl.java | 7 +- .../impl/StreamTableJoinOperatorImpl.java | 18 +- .../operators/impl/WindowOperatorImpl.java | 15 +- .../operators/spec/FilterOperatorSpec.java | 7 +- .../samza/operators/spec/MapOperatorSpec.java | 7 +- .../apache/samza/processor/StreamProcessor.java | 78 ++++++--- .../samza/runtime/LocalApplicationRunner.java | 3 +- .../samza/runtime/LocalContainerRunner.java | 11 +- .../apache/samza/storage/StorageRecovery.java | 9 +- .../org/apache/samza/table/TableManager.java | 18 +- .../samza/table/caching/CachingTable.java | 28 ++- .../table/caching/CachingTableProvider.java | 8 +- .../table/caching/guava/GuavaCacheTable.java | 18 +- .../caching/guava/GuavaCacheTableProvider.java | 2 +- .../table/remote/RemoteReadWriteTable.java | 24 ++- .../samza/table/remote/RemoteReadableTable.java | 30 ++-- .../samza/table/remote/RemoteTableProvider.java | 26 +-- .../samza/table/utils/BaseTableProvider.java | 11 +- .../table/utils/DefaultTableReadMetrics.java | 11 +- .../table/utils/DefaultTableWriteMetrics.java | 11 +- .../samza/table/utils/TableMetricsUtil.java | 21 +-- .../org/apache/samza/task/AsyncRunLoop.java | 5 +- .../samza/task/AsyncStreamTaskAdapter.java | 6 +- .../apache/samza/task/StreamOperatorTask.java | 31 +--- .../org/apache/samza/task/TaskFactoryUtil.java | 4 +- .../samza/util/EmbeddedTaggedRateLimiter.java | 30 ++-- .../apache/samza/container/SamzaContainer.scala | 67 +++++--- .../apache/samza/container/TaskInstance.scala | 53 ++++-- .../samza/job/local/ThreadJobFactory.scala | 8 +- .../TestStreamApplicationDescriptorImpl.java | 37 +++- .../TestTaskApplicationDescriptorImpl.java | 36 +++- .../org/apache/samza/context/MockContext.java | 73 ++++++++ .../apache/samza/context/TestContextImpl.java | 12 +- .../samza/context/TestTaskContextImpl.java | 15 +- .../TestJobNodeConfigurationGenerator.java | 22 +-- .../samza/operators/TestJoinOperator.java | 29 ++-- .../samza/operators/impl/TestOperatorImpl.java | 52 +++--- .../operators/impl/TestOperatorImplGraph.java | 137 +++++++-------- .../operators/impl/TestSinkOperatorImpl.java | 7 +- .../operators/impl/TestStreamOperatorImpl.java | 6 - .../impl/TestStreamTableJoinOperatorImpl.java | 17 +- .../operators/impl/TestWindowOperator.java | 114 +++++++------ .../samza/operators/spec/TestOperatorSpec.java | 2 +- .../spec/TestPartitionByOperatorSpec.java | 2 +- .../operators/spec/TestWindowOperatorSpec.java | 9 +- .../samza/processor/TestStreamProcessor.java | 9 +- .../samza/storage/MockStorageEngineFactory.java | 16 +- .../apache/samza/table/TestTableManager.java | 18 +- .../samza/table/caching/TestCachingTable.java | 48 +++--- .../samza/table/remote/TestRemoteTable.java | 39 ++--- .../table/remote/TestRemoteTableDescriptor.java | 41 +++-- .../retry/TestRetriableTableFunctions.java | 12 +- .../apache/samza/task/IdentityStreamTask.java | 6 +- .../org/apache/samza/task/TestAsyncRunLoop.java | 44 ++++- .../samza/task/TestAsyncStreamAdapter.java | 6 +- .../samza/task/TestEpochTimeScheduler.java | 3 +- .../samza/task/TestStreamOperatorTask.java | 30 ++-- .../util/TestEmbeddedTaggedRateLimiter.java | 48 +++--- .../samza/container/TestSamzaContainer.scala | 51 ++++-- .../samza/container/TestTaskInstance.scala | 90 +++++++--- .../processor/StreamProcessorTestUtils.scala | 31 ++-- .../InMemoryKeyValueStorageEngineFactory.scala | 13 +- .../samza/storage/kv/RocksDbKeyValueReader.java | 11 +- .../samza/storage/kv/RocksDbOptionsHelper.java | 15 +- .../RocksDbKeyValueStorageEngineFactory.scala | 23 +-- .../storage/kv/TestRocksDbTableDescriptor.java | 3 +- .../kv/BaseLocalStoreBackedTableProvider.java | 18 +- .../kv/LocalStoreBackedReadWriteTable.java | 10 +- .../kv/LocalStoreBackedReadableTable.java | 10 +- .../kv/BaseKeyValueStorageEngineFactory.scala | 41 ++--- .../TestBaseLocalStoreBackedTableProvider.java | 18 +- .../sql/runner/SamzaSqlApplicationContext.java | 44 +++++ .../samza/sql/translator/FilterTranslator.java | 9 +- .../samza/sql/translator/ModifyTranslator.java | 11 +- .../samza/sql/translator/ProjectTranslator.java | 8 +- .../samza/sql/translator/QueryTranslator.java | 49 +++--- .../samza/sql/translator/ScanTranslator.java | 11 +- .../apache/samza/sql/e2e/TestSamzaSqlTable.java | 1 - .../runner/TestSamzaSqlApplicationRunner.java | 2 - .../samza/sql/system/TestAvroSystemFactory.java | 1 - .../sql/testutil/TestIOResolverFactory.java | 1 - .../sql/testutil/TestSamzaSqlFileParser.java | 2 - .../sql/translator/TestFilterTranslator.java | 16 +- .../sql/translator/TestProjectTranslator.java | 25 +-- .../sql/translator/TestQueryTranslator.java | 47 +++--- .../samza/example/KeyValueStoreExample.java | 7 +- .../test/framework/MessageStreamAssert.java | 15 +- .../test/integration/NegateNumberTask.java | 9 +- .../test/integration/SimpleStatefulTask.java | 13 +- .../test/integration/StatePerfTestTask.java | 9 +- .../samza/test/integration/join/Checker.java | 19 +-- .../samza/test/integration/join/Emitter.java | 23 ++- .../samza/test/integration/join/Joiner.java | 26 ++- .../samza/test/integration/join/Watcher.java | 17 +- .../performance/TestKeyValuePerformance.scala | 21 ++- .../test/performance/TestPerformanceTask.scala | 19 +-- .../processor/TestZkStreamProcessorBase.java | 6 +- .../samza/test/framework/TestSchedulingApp.java | 2 +- .../test/processor/IdentityStreamTask.java | 5 +- .../test/processor/TestStreamProcessor.java | 5 +- .../apache/samza/test/table/TestLocalTable.java | 33 ++-- .../table/TestLocalTableWithSideInputs.java | 23 +-- .../samza/test/table/TestRemoteTable.java | 27 +-- .../table/TestTableDescriptorsProvider.java | 3 +- .../test/integration/StreamTaskTestUtil.scala | 19 ++- .../integration/TestShutdownStatefulTask.scala | 8 +- .../test/integration/TestStatefulTask.scala | 8 +- 138 files changed, 1598 insertions(+), 1592 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/9d2d49e9/docs/learn/documentation/versioned/jobs/configuration-table.html ---------------------------------------------------------------------- diff --git a/docs/learn/documentation/versioned/jobs/configuration-table.html b/docs/learn/documentation/versioned/jobs/configuration-table.html index 35ddcab..44d43f3 100644 --- a/docs/learn/documentation/versioned/jobs/configuration-table.html +++ b/docs/learn/documentation/versioned/jobs/configuration-table.html @@ -1626,9 +1626,9 @@ store any <span class="store">store-name</span> except <em>default</em> (the <span class="store">store-name</span> <em>default</em> is reserved for defining default store parameters), and use that name to get a reference to the store in your stream task (call - <a href="../api/javadocs/org/apache/samza/task/TaskContext.html#getStore(java.lang.String)">TaskContext.getStore()</a> + <a href="../api/javadocs/org/apache/samza/context/TaskContext.html#getStore(java.lang.String)">TaskContext.getStore()</a> in your task's - <a href="../api/javadocs/org/apache/samza/task/InitableTask.html#init(org.apache.samza.config.Config, org.apache.samza.task.TaskContext)">init()</a> + <a href="../api/javadocs/org/apache/samza/task/InitableTask.html#init(org.apache.samza.context.Context)">init()</a> method). The value of this property is the fully-qualified name of a Java class that implements <a href="../api/javadocs/org/apache/samza/storage/StorageEngineFactory.html">StorageEngineFactory</a>. Samza currently ships with one storage engine implementation: http://git-wip-us.apache.org/repos/asf/samza/blob/9d2d49e9/docs/learn/tutorials/versioned/hello-samza-high-level-code.md ---------------------------------------------------------------------- diff --git a/docs/learn/tutorials/versioned/hello-samza-high-level-code.md b/docs/learn/tutorials/versioned/hello-samza-high-level-code.md index 2f6a4a6..1c06116 100644 --- a/docs/learn/tutorials/versioned/hello-samza-high-level-code.md +++ b/docs/learn/tutorials/versioned/hello-samza-high-level-code.md @@ -357,7 +357,7 @@ To use the store in the application, we need to get it from the [TaskContext](/l private KeyValueStore<String, Integer> store; {% endhighlight %} -Then override the [init](/learn/documentation/{{site.version}}/api/javadocs/org/apache/samza/operators/functions/InitableFunction.html#init-org.apache.samza.config.Config-org.apache.samza.task.TaskContext-) method in `WikipediaStatsAggregator` to initialize the store. +Then override the [init](/learn/documentation/{{site.version}}/api/javadocs/org/apache/samza/operators/functions/InitableFunction.html#init-org.apache.samza.context.Context-) method in `WikipediaStatsAggregator` to initialize the store. {% highlight java %} @Override public void init(Config config, TaskContext context) { http://git-wip-us.apache.org/repos/asf/samza/blob/9d2d49e9/samza-api/src/main/java/org/apache/samza/application/ApplicationDescriptor.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/application/ApplicationDescriptor.java b/samza-api/src/main/java/org/apache/samza/application/ApplicationDescriptor.java index 178fdee..e806aad 100644 --- a/samza-api/src/main/java/org/apache/samza/application/ApplicationDescriptor.java +++ b/samza-api/src/main/java/org/apache/samza/application/ApplicationDescriptor.java @@ -21,8 +21,9 @@ package org.apache.samza.application; import java.util.Map; import org.apache.samza.annotation.InterfaceStability; import org.apache.samza.config.Config; +import org.apache.samza.context.ApplicationContainerContextFactory; +import org.apache.samza.context.ApplicationTaskContextFactory; import org.apache.samza.metrics.MetricsReporterFactory; -import org.apache.samza.operators.ContextManager; import org.apache.samza.runtime.ProcessorLifecycleListenerFactory; @@ -44,17 +45,30 @@ public interface ApplicationDescriptor<S extends ApplicationDescriptor> { Config getConfig(); /** - * Sets the {@link ContextManager} for this application. + * Sets the {@link ApplicationContainerContextFactory} for this application. Each task will be given access to a + * different instance of the {@link org.apache.samza.context.ApplicationContainerContext} that this creates. The + * context can be accessed through the {@link org.apache.samza.context.Context}. * <p> - * Setting the {@link ContextManager} is optional. The provided {@link ContextManager} can be used to build the shared - * context between the operator functions within a task instance + * Setting this is optional. * - * TODO: this should be replaced by the shared context factory when SAMZA-1714 is fixed. + * @param factory the {@link ApplicationContainerContextFactory} for this application + * @return type {@code S} of {@link ApplicationDescriptor} with {@code factory} set as its + * {@link ApplicationContainerContextFactory} + */ + S withApplicationContainerContextFactory(ApplicationContainerContextFactory<?> factory); - * @param contextManager the {@link ContextManager} to use for the application - * @return type {@code S} of {@link ApplicationDescriptor} with {@code contextManager} set as its {@link ContextManager} + /** + * Sets the {@link ApplicationTaskContextFactory} for this application. Each task will be given access to a different + * instance of the {@link org.apache.samza.context.ApplicationTaskContext} that this creates. The context can be + * accessed through the {@link org.apache.samza.context.Context}. + * <p> + * Setting this is optional. + * + * @param factory the {@link ApplicationTaskContextFactory} for this application + * @return type {@code S} of {@link ApplicationDescriptor} with {@code factory} set as its + * {@link ApplicationTaskContextFactory} */ - S withContextManager(ContextManager contextManager); + S withApplicationTaskContextFactory(ApplicationTaskContextFactory<?> factory); /** * Sets the {@link ProcessorLifecycleListenerFactory} for this application. http://git-wip-us.apache.org/repos/asf/samza/blob/9d2d49e9/samza-api/src/main/java/org/apache/samza/container/SamzaContainerContext.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/container/SamzaContainerContext.java b/samza-api/src/main/java/org/apache/samza/container/SamzaContainerContext.java deleted file mode 100644 index 6e13f7a..0000000 --- a/samza-api/src/main/java/org/apache/samza/container/SamzaContainerContext.java +++ /dev/null @@ -1,55 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.samza.container; - -import org.apache.samza.config.Config; -import org.apache.samza.metrics.MetricsRegistry; - -import java.util.Collection; -import java.util.Collections; - -/** - * A SamzaContainerContext maintains per-container information for the tasks it executes. - */ -public class SamzaContainerContext { - public final String id; - public final Config config; - public final Collection<TaskName> taskNames; - public final MetricsRegistry metricsRegistry; - - /** - * An immutable context object that can passed to tasks to give them information - * about the container in which they are executing. - * @param id The id of the container. - * @param config The job configuration. - * @param taskNames The set of taskName keys for which this container is responsible. - * @param metricsRegistry the {@link MetricsRegistry} for the container metrics - */ - public SamzaContainerContext( - String id, - Config config, - Collection<TaskName> taskNames, - MetricsRegistry metricsRegistry) { - this.id = id; - this.config = config; - this.taskNames = Collections.unmodifiableCollection(taskNames); - this.metricsRegistry = metricsRegistry; - } -} http://git-wip-us.apache.org/repos/asf/samza/blob/9d2d49e9/samza-api/src/main/java/org/apache/samza/context/ApplicationContainerContext.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/context/ApplicationContainerContext.java b/samza-api/src/main/java/org/apache/samza/context/ApplicationContainerContext.java index 08e0b60..aab8c7f 100644 --- a/samza-api/src/main/java/org/apache/samza/context/ApplicationContainerContext.java +++ b/samza-api/src/main/java/org/apache/samza/context/ApplicationContainerContext.java @@ -20,11 +20,16 @@ package org.apache.samza.context; /** * An application should implement this to contain any runtime objects required by processing logic which can be shared - * across all tasks in a container. A single instance of this will be created in each container. + * across all tasks in a container. A single instance of this will be created in each container. Note that if the + * container moves or the container model changes (e.g. container failure/rebalancing), then this will be recreated. * <p> * This needs to be created by an implementation of {@link ApplicationContainerContextFactory}. The factory should * create the runtime objects contained within this context. * <p> + * This is related to {@link ContainerContext} in that they are both associated with the container lifecycle. In order + * to access this in application code, use {@link Context#getApplicationContainerContext()}. The + * {@link ContainerContext} is accessible through {@link Context#getContainerContext()}. + * <p> * If it is necessary to have a separate instance per task, then use {@link ApplicationTaskContext} instead. * <p> * This class does not need to be {@link java.io.Serializable} and instances are not persisted across deployments. http://git-wip-us.apache.org/repos/asf/samza/blob/9d2d49e9/samza-api/src/main/java/org/apache/samza/context/ApplicationTaskContext.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/context/ApplicationTaskContext.java b/samza-api/src/main/java/org/apache/samza/context/ApplicationTaskContext.java index ffc5383..6afbf23 100644 --- a/samza-api/src/main/java/org/apache/samza/context/ApplicationTaskContext.java +++ b/samza-api/src/main/java/org/apache/samza/context/ApplicationTaskContext.java @@ -25,6 +25,10 @@ package org.apache.samza.context; * This needs to be created by an implementation of {@link ApplicationTaskContextFactory}. The factory should create * the runtime objects contained within this context. * <p> + * This is related to {@link TaskContext} in that they are both associated with a task lifecycle. In order to access + * this in application code, use {@link Context#getApplicationTaskContext()}. The {@link TaskContext} is accessible + * through {@link Context#getTaskContext()}. + * <p> * If it is possible to share an instance of this across tasks in a container, then use * {@link ApplicationContainerContext} instead. * <p> http://git-wip-us.apache.org/repos/asf/samza/blob/9d2d49e9/samza-api/src/main/java/org/apache/samza/context/JobContext.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/context/JobContext.java b/samza-api/src/main/java/org/apache/samza/context/JobContext.java index 9b09fa9..239a011 100644 --- a/samza-api/src/main/java/org/apache/samza/context/JobContext.java +++ b/samza-api/src/main/java/org/apache/samza/context/JobContext.java @@ -35,6 +35,7 @@ public interface JobContext { /** * Returns the name of the job. * @return name of the job + * @throws org.apache.samza.SamzaException if the job name was not configured */ String getJobName(); http://git-wip-us.apache.org/repos/asf/samza/blob/9d2d49e9/samza-api/src/main/java/org/apache/samza/operators/ContextManager.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/operators/ContextManager.java b/samza-api/src/main/java/org/apache/samza/operators/ContextManager.java deleted file mode 100644 index 5f2c020..0000000 --- a/samza-api/src/main/java/org/apache/samza/operators/ContextManager.java +++ /dev/null @@ -1,49 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.samza.operators; - -import org.apache.samza.annotation.InterfaceStability; -import org.apache.samza.config.Config; -import org.apache.samza.task.TaskContext; - - -/** - * Manages custom context that is shared across multiple operator functions in a task. - */ [email protected] -public interface ContextManager { - - /** - * Allows initializing and setting a custom context that is shared across multiple operator functions in a task. - * <p> - * This method is invoked before any {@link org.apache.samza.operators.functions.InitableFunction}s are initialized. - * Use {@link TaskContext#setUserContext(Object)} to set the context here and {@link TaskContext#getUserContext()} to - * get it in InitableFunctions. - * - * @param config the {@link Config} for the application - * @param context the {@link TaskContext} for this task - */ - void init(Config config, TaskContext context); - - /** - * Allows closing the custom context that is shared across multiple operator functions in a task. - */ - void close(); - -} http://git-wip-us.apache.org/repos/asf/samza/blob/9d2d49e9/samza-api/src/main/java/org/apache/samza/operators/functions/InitableFunction.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/operators/functions/InitableFunction.java b/samza-api/src/main/java/org/apache/samza/operators/functions/InitableFunction.java index 8a5d83b..7f950de 100644 --- a/samza-api/src/main/java/org/apache/samza/operators/functions/InitableFunction.java +++ b/samza-api/src/main/java/org/apache/samza/operators/functions/InitableFunction.java @@ -20,8 +20,7 @@ package org.apache.samza.operators.functions; import org.apache.samza.annotation.InterfaceStability; -import org.apache.samza.config.Config; -import org.apache.samza.task.TaskContext; +import org.apache.samza.context.Context; /** * A function that can be initialized before execution. @@ -33,12 +32,10 @@ import org.apache.samza.task.TaskContext; */ @InterfaceStability.Evolving public interface InitableFunction { - /** * Initializes the function before any messages are processed. * - * @param config the {@link Config} for the application - * @param context the {@link TaskContext} for this task + * @param context the {@link Context} for this task */ - default void init(Config config, TaskContext context) { } + default void init(Context context) { } } http://git-wip-us.apache.org/repos/asf/samza/blob/9d2d49e9/samza-api/src/main/java/org/apache/samza/scheduler/CallbackScheduler.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/scheduler/CallbackScheduler.java b/samza-api/src/main/java/org/apache/samza/scheduler/CallbackScheduler.java index e230304..5c8d77d 100644 --- a/samza-api/src/main/java/org/apache/samza/scheduler/CallbackScheduler.java +++ b/samza-api/src/main/java/org/apache/samza/scheduler/CallbackScheduler.java @@ -18,6 +18,7 @@ */ package org.apache.samza.scheduler; + /** * Provides a way for applications to register some logic to be executed at a future time. */ http://git-wip-us.apache.org/repos/asf/samza/blob/9d2d49e9/samza-api/src/main/java/org/apache/samza/scheduler/ScheduledCallback.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/scheduler/ScheduledCallback.java b/samza-api/src/main/java/org/apache/samza/scheduler/ScheduledCallback.java index 8745422..546ca37 100644 --- a/samza-api/src/main/java/org/apache/samza/scheduler/ScheduledCallback.java +++ b/samza-api/src/main/java/org/apache/samza/scheduler/ScheduledCallback.java @@ -24,8 +24,9 @@ import org.apache.samza.task.TaskCoordinator; /** - * The callback that is invoked when its corresponding schedule time registered via - * {@link org.apache.samza.task.TaskContext} is reached. + * The callback that is invoked when its corresponding schedule time registered via {@link CallbackScheduler} is + * reached. The {@link CallbackScheduler} is available through + * {@link org.apache.samza.context.TaskContext#getCallbackScheduler()}. * @param <K> type of the callback key */ public interface ScheduledCallback<K> { http://git-wip-us.apache.org/repos/asf/samza/blob/9d2d49e9/samza-api/src/main/java/org/apache/samza/storage/StorageEngineFactory.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/storage/StorageEngineFactory.java b/samza-api/src/main/java/org/apache/samza/storage/StorageEngineFactory.java index 800deeb..2425cf3 100644 --- a/samza-api/src/main/java/org/apache/samza/storage/StorageEngineFactory.java +++ b/samza-api/src/main/java/org/apache/samza/storage/StorageEngineFactory.java @@ -20,8 +20,8 @@ package org.apache.samza.storage; import java.io.File; - -import org.apache.samza.container.SamzaContainerContext; +import org.apache.samza.context.ContainerContext; +import org.apache.samza.context.JobContext; import org.apache.samza.metrics.MetricsRegistry; import org.apache.samza.serializers.Serde; import org.apache.samza.system.SystemStreamPartition; @@ -43,6 +43,7 @@ public interface StorageEngineFactory<K, V> { * @param collector MessageCollector the storage engine uses to persist changes. * @param registry MetricsRegistry to which to publish storage-engine specific metrics. * @param changeLogSystemStreamPartition Samza stream partition from which to receive the changelog. + * @param jobContext Information about the job in which the task is executing * @param containerContext Information about the container in which the task is executing. * @return The storage engine instance. */ @@ -54,5 +55,6 @@ public interface StorageEngineFactory<K, V> { MessageCollector collector, MetricsRegistry registry, SystemStreamPartition changeLogSystemStreamPartition, - SamzaContainerContext containerContext); + JobContext jobContext, + ContainerContext containerContext); } http://git-wip-us.apache.org/repos/asf/samza/blob/9d2d49e9/samza-api/src/main/java/org/apache/samza/table/ReadableTable.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/table/ReadableTable.java b/samza-api/src/main/java/org/apache/samza/table/ReadableTable.java index 490acc0..6c88fd3 100644 --- a/samza-api/src/main/java/org/apache/samza/table/ReadableTable.java +++ b/samza-api/src/main/java/org/apache/samza/table/ReadableTable.java @@ -21,11 +21,9 @@ package org.apache.samza.table; import java.util.List; import java.util.Map; import java.util.concurrent.CompletableFuture; - import org.apache.samza.annotation.InterfaceStability; -import org.apache.samza.container.SamzaContainerContext; +import org.apache.samza.context.Context; import org.apache.samza.operators.KV; -import org.apache.samza.task.TaskContext; /** @@ -40,10 +38,9 @@ public interface ReadableTable<K, V> extends Table<KV<K, V>> { /** * Initializes the table during container initialization. * Guaranteed to be invoked as the first operation on the table. - * @param containerContext Samza container context - * @param taskContext nullable for global table + * @param context {@link Context} corresponding to this table */ - default void init(SamzaContainerContext containerContext, TaskContext taskContext) { + default void init(Context context) { } /** http://git-wip-us.apache.org/repos/asf/samza/blob/9d2d49e9/samza-api/src/main/java/org/apache/samza/table/TableProvider.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/table/TableProvider.java b/samza-api/src/main/java/org/apache/samza/table/TableProvider.java index 99446e4..350324c 100644 --- a/samza-api/src/main/java/org/apache/samza/table/TableProvider.java +++ b/samza-api/src/main/java/org/apache/samza/table/TableProvider.java @@ -19,11 +19,9 @@ package org.apache.samza.table; import java.util.Map; - import org.apache.samza.annotation.InterfaceStability; import org.apache.samza.config.Config; -import org.apache.samza.container.SamzaContainerContext; -import org.apache.samza.task.TaskContext; +import org.apache.samza.context.Context; /** * A table provider provides the implementation for a table. It ensures a table is @@ -33,10 +31,9 @@ import org.apache.samza.task.TaskContext; public interface TableProvider { /** * Initialize TableProvider with container and task context - * @param containerContext Samza container context - * @param taskContext nullable for global table + * @param context context for the task */ - void init(SamzaContainerContext containerContext, TaskContext taskContext); + void init(Context context); /** * Get an instance of the table for read/write operations http://git-wip-us.apache.org/repos/asf/samza/blob/9d2d49e9/samza-api/src/main/java/org/apache/samza/task/InitableTask.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/task/InitableTask.java b/samza-api/src/main/java/org/apache/samza/task/InitableTask.java index 6926f16..8bf0619 100644 --- a/samza-api/src/main/java/org/apache/samza/task/InitableTask.java +++ b/samza-api/src/main/java/org/apache/samza/task/InitableTask.java @@ -19,7 +19,8 @@ package org.apache.samza.task; -import org.apache.samza.config.Config; +import org.apache.samza.context.Context; + /** * Used as an interface for user processing StreamTasks that need to have specific functionality performed as their StreamTasks @@ -28,9 +29,8 @@ import org.apache.samza.config.Config; public interface InitableTask { /** * Called by TaskRunner each time an implementing task is created. - * @param config Allows accessing of fields in the configuration files that this StreamTask is specified in. * @param context Allows accessing of contextual data of this StreamTask. * @throws Exception Any exception types encountered during the execution of the processing task. */ - void init(Config config, TaskContext context) throws Exception; + void init(Context context) throws Exception; } http://git-wip-us.apache.org/repos/asf/samza/blob/9d2d49e9/samza-api/src/main/java/org/apache/samza/task/TaskContext.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/task/TaskContext.java b/samza-api/src/main/java/org/apache/samza/task/TaskContext.java deleted file mode 100644 index 007028a..0000000 --- a/samza-api/src/main/java/org/apache/samza/task/TaskContext.java +++ /dev/null @@ -1,98 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.samza.task; - -import java.util.Set; - -import org.apache.samza.container.SamzaContainerContext; -import org.apache.samza.container.TaskName; -import org.apache.samza.metrics.MetricsRegistry; -import org.apache.samza.scheduler.ScheduledCallback; -import org.apache.samza.system.SystemStreamPartition; -import org.apache.samza.table.Table; - - -/** - * A TaskContext provides resources about the {@link org.apache.samza.task.StreamTask}, particularly during - * initialization in an {@link org.apache.samza.task.InitableTask}. - * TODO this will be replaced by {@link org.apache.samza.context.TaskContext} in the near future by SAMZA-1714 - */ -public interface TaskContext { - MetricsRegistry getMetricsRegistry(); - - Set<SystemStreamPartition> getSystemStreamPartitions(); - - Object getStore(String name); - - Table getTable(String tableId); - - TaskName getTaskName(); - - SamzaContainerContext getSamzaContainerContext(); - - /** - * Set the starting offset for the given {@link org.apache.samza.system.SystemStreamPartition}. Offsets - * can only be set for a {@link org.apache.samza.system.SystemStreamPartition} assigned to this task - * (as returned by {@link #getSystemStreamPartitions()}); trying to set the offset for any other partition - * will have no effect. - * - * NOTE: this feature is experimental, and the API may change in a future release. - * - * @param ssp {@link org.apache.samza.system.SystemStreamPartition} whose offset should be set - * @param offset to set for the given {@link org.apache.samza.system.SystemStreamPartition} - * - */ - void setStartingOffset(SystemStreamPartition ssp, String offset); - - /** - * Sets the user-defined context. - * - * @param context the user-defined context to set - */ - default void setUserContext(Object context) { } - - /** - * Gets the user-defined context. - * - * @return the user-defined context if set, else null - */ - default Object getUserContext() { - return null; - } - - /** - * Schedule the {@code callback} for the provided {@code key} to be invoked at epoch-time {@code timestamp}. - * The callback will be invoked exclusively with any other operations for this task, - * e.g. processing, windowing and commit. - * @param key key for the callback - * @param timestamp epoch time when the callback will be fired, in milliseconds - * @param callback callback to call when the {@code timestamp} is reached - * @param <K> type of the key - */ - <K> void scheduleCallback(K key, long timestamp, ScheduledCallback<K> callback); - - /** - * Delete the scheduled {@code callback} for the {@code key}. - * Deletion only happens if the callback hasn't been fired. Otherwise it will not interrupt. - * @param key callback key - * @param <K> type of the key - */ - <K> void deleteScheduledCallback(K key); -} http://git-wip-us.apache.org/repos/asf/samza/blob/9d2d49e9/samza-api/src/main/java/org/apache/samza/util/RateLimiter.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/util/RateLimiter.java b/samza-api/src/main/java/org/apache/samza/util/RateLimiter.java index ad40d35..83532e4 100644 --- a/samza-api/src/main/java/org/apache/samza/util/RateLimiter.java +++ b/samza-api/src/main/java/org/apache/samza/util/RateLimiter.java @@ -22,10 +22,8 @@ import java.io.Serializable; import java.util.Map; import java.util.Set; import java.util.concurrent.TimeUnit; - import org.apache.samza.annotation.InterfaceStability; -import org.apache.samza.config.Config; -import org.apache.samza.task.TaskContext; +import org.apache.samza.context.Context; /** * A rate limiter interface used by Samza components to limit throughput of operations @@ -53,10 +51,9 @@ public interface RateLimiter extends Serializable { /** * Initialize this rate limiter, this method should be called during container initialization. * - * @param config job configuration - * @param taskContext task context that owns this rate limiter + * @param context {@link Context} that corresponds to this rate limiter */ - void init(Config config, TaskContext taskContext); + void init(Context context); /** * Attempt to acquire the provided number of credits, blocks indefinitely until http://git-wip-us.apache.org/repos/asf/samza/blob/9d2d49e9/samza-core/src/main/java/org/apache/samza/application/ApplicationDescriptorImpl.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/application/ApplicationDescriptorImpl.java b/samza-core/src/main/java/org/apache/samza/application/ApplicationDescriptorImpl.java index b58d5a5..5416af5 100644 --- a/samza-core/src/main/java/org/apache/samza/application/ApplicationDescriptorImpl.java +++ b/samza-core/src/main/java/org/apache/samza/application/ApplicationDescriptorImpl.java @@ -25,8 +25,11 @@ import java.util.Map; import java.util.Optional; import java.util.Set; import org.apache.samza.config.Config; +import org.apache.samza.context.ApplicationContainerContext; +import org.apache.samza.context.ApplicationContainerContextFactory; +import org.apache.samza.context.ApplicationTaskContext; +import org.apache.samza.context.ApplicationTaskContextFactory; import org.apache.samza.metrics.MetricsReporterFactory; -import org.apache.samza.operators.ContextManager; import org.apache.samza.operators.KV; import org.apache.samza.operators.TableDescriptor; import org.apache.samza.operators.descriptors.base.stream.InputDescriptor; @@ -38,7 +41,6 @@ import org.apache.samza.runtime.ProcessorLifecycleListenerFactory; import org.apache.samza.serializers.KVSerde; import org.apache.samza.serializers.NoOpSerde; import org.apache.samza.serializers.Serde; -import org.apache.samza.task.TaskContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -47,7 +49,8 @@ import org.slf4j.LoggerFactory; * This is the base class that implements interface {@link ApplicationDescriptor}. * <p> * This base class contains the common objects that are used by both high-level and low-level API applications, such as - * {@link Config}, {@link ContextManager}, and {@link ProcessorLifecycleListenerFactory}. + * {@link Config}, {@link ApplicationContainerContextFactory}, {@link ApplicationTaskContextFactory}, and + * {@link ProcessorLifecycleListenerFactory}. * * @param <S> the type of {@link ApplicationDescriptor} interface this implements. It has to be either * {@link StreamApplicationDescriptor} or {@link TaskApplicationDescriptor} @@ -64,17 +67,8 @@ public abstract class ApplicationDescriptorImpl<S extends ApplicationDescriptor> private final Map<String, KV<Serde, Serde>> tableSerdes = new HashMap<>(); final Config config; - // Default to no-op functions in ContextManager - // TODO: this should be replaced by shared context factory defined in SAMZA-1714 - ContextManager contextManager = new ContextManager() { - @Override - public void init(Config config, TaskContext context) { - } - - @Override - public void close() { - } - }; + private Optional<ApplicationContainerContextFactory<?>> applicationContainerContextFactoryOptional = Optional.empty(); + private Optional<ApplicationTaskContextFactory<?>> applicationTaskContextFactoryOptional = Optional.empty(); // Default to no-op ProcessorLifecycleListenerFactory ProcessorLifecycleListenerFactory listenerFactory = (pcontext, cfg) -> new ProcessorLifecycleListener() { }; @@ -90,8 +84,14 @@ public abstract class ApplicationDescriptorImpl<S extends ApplicationDescriptor> } @Override - public S withContextManager(ContextManager contextManager) { - this.contextManager = contextManager; + public S withApplicationContainerContextFactory(ApplicationContainerContextFactory<?> factory) { + this.applicationContainerContextFactoryOptional = Optional.of(factory); + return (S) this; + } + + @Override + public S withApplicationTaskContextFactory(ApplicationTaskContextFactory<?> factory) { + this.applicationTaskContextFactoryOptional = Optional.of(factory); return (S) this; } @@ -118,12 +118,27 @@ public abstract class ApplicationDescriptorImpl<S extends ApplicationDescriptor> } /** - * Get the {@link ContextManager} associated with this application + * Get the {@link ApplicationContainerContextFactory} specified by the application. + * + * @return {@link ApplicationContainerContextFactory} if application specified it; empty otherwise + */ + public Optional<ApplicationContainerContextFactory<ApplicationContainerContext>> getApplicationContainerContextFactory() { + @SuppressWarnings("unchecked") // ok because all context types are at least ApplicationContainerContext + Optional<ApplicationContainerContextFactory<ApplicationContainerContext>> factoryOptional = + (Optional) this.applicationContainerContextFactoryOptional; + return factoryOptional; + } + + /** + * Get the {@link ApplicationTaskContextFactory} specified by the application. * - * @return the {@link ContextManager} for this application + * @return {@link ApplicationTaskContextFactory} if application specified it; empty otherwise */ - public ContextManager getContextManager() { - return contextManager; + public Optional<ApplicationTaskContextFactory<ApplicationTaskContext>> getApplicationTaskContextFactory() { + @SuppressWarnings("unchecked") // ok because all context types are at least ApplicationTaskContext + Optional<ApplicationTaskContextFactory<ApplicationTaskContext>> factoryOptional = + (Optional) this.applicationTaskContextFactoryOptional; + return factoryOptional; } /** http://git-wip-us.apache.org/repos/asf/samza/blob/9d2d49e9/samza-core/src/main/java/org/apache/samza/container/TaskContextImpl.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/container/TaskContextImpl.java b/samza-core/src/main/java/org/apache/samza/container/TaskContextImpl.java deleted file mode 100644 index 25ffe8f..0000000 --- a/samza-core/src/main/java/org/apache/samza/container/TaskContextImpl.java +++ /dev/null @@ -1,169 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.samza.container; - -import com.google.common.collect.ImmutableSet; -import java.util.function.Function; -import org.apache.samza.checkpoint.OffsetManager; -import org.apache.samza.job.model.JobModel; -import org.apache.samza.metrics.ReadableMetricsRegistry; -import org.apache.samza.storage.kv.KeyValueStore; -import org.apache.samza.system.StreamMetadataCache; -import org.apache.samza.system.SystemStreamPartition; -import org.apache.samza.table.Table; -import org.apache.samza.table.TableManager; -import org.apache.samza.task.EpochTimeScheduler; -import org.apache.samza.task.TaskContext; -import org.apache.samza.scheduler.ScheduledCallback; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.HashMap; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.ScheduledExecutorService; - - -/** - * TODO this will be replaced by {@link org.apache.samza.context.TaskContextImpl} in the near future by SAMZA-1714 - */ -public class TaskContextImpl implements TaskContext { - private static final Logger LOG = LoggerFactory.getLogger(TaskContextImpl.class); - - private final TaskName taskName; - private final TaskInstanceMetrics metrics; - private final SamzaContainerContext containerContext; - private final Set<SystemStreamPartition> systemStreamPartitions; - private final OffsetManager offsetManager; - private final Function<String, KeyValueStore> kvStoreSupplier; - private final TableManager tableManager; - private final JobModel jobModel; - private final StreamMetadataCache streamMetadataCache; - private final Map<String, Object> objectRegistry = new HashMap<>(); - private final EpochTimeScheduler timerScheduler; - - private Object userContext = null; - - public TaskContextImpl(TaskName taskName, - TaskInstanceMetrics metrics, - SamzaContainerContext containerContext, - Set<SystemStreamPartition> systemStreamPartitions, - OffsetManager offsetManager, - Function<String, KeyValueStore> kvStoreSupplier, - TableManager tableManager, - JobModel jobModel, - StreamMetadataCache streamMetadataCache, - ScheduledExecutorService timerExecutor) { - this.taskName = taskName; - this.metrics = metrics; - this.containerContext = containerContext; - this.systemStreamPartitions = ImmutableSet.copyOf(systemStreamPartitions); - this.offsetManager = offsetManager; - this.kvStoreSupplier = kvStoreSupplier; - this.tableManager = tableManager; - this.jobModel = jobModel; - this.streamMetadataCache = streamMetadataCache; - this.timerScheduler = EpochTimeScheduler.create(timerExecutor); - } - - @Override - public ReadableMetricsRegistry getMetricsRegistry() { - return metrics.registry(); - } - - @Override - public Set<SystemStreamPartition> getSystemStreamPartitions() { - return systemStreamPartitions; - } - - @Override - public KeyValueStore getStore(String storeName) { - KeyValueStore store = kvStoreSupplier.apply(storeName); - if (store == null) { - LOG.warn("No store found for name: {}", storeName); - } - return store; - } - - @Override - public Table getTable(String tableId) { - if (tableManager != null) { - return tableManager.getTable(tableId); - } else { - LOG.warn("No table manager found"); - return null; - } - } - - @Override - public TaskName getTaskName() { - return taskName; - } - - @Override - public SamzaContainerContext getSamzaContainerContext() { - return containerContext; - } - - @Override - public void setStartingOffset(SystemStreamPartition ssp, String offset) { - offsetManager.setStartingOffset(taskName, ssp, offset); - } - - @Override - public void setUserContext(Object context) { - userContext = context; - } - - @Override - public Object getUserContext() { - return userContext; - } - - @Override - public <K> void scheduleCallback(K key, long timestamp, ScheduledCallback<K> callback) { - timerScheduler.setTimer(key, timestamp, callback); - } - - @Override - public <K> void deleteScheduledCallback(K key) { - timerScheduler.deleteTimer(key); - } - - public void registerObject(String name, Object value) { - objectRegistry.put(name, value); - } - - public Object fetchObject(String name) { - return objectRegistry.get(name); - } - - public JobModel getJobModel() { - return jobModel; - } - - public StreamMetadataCache getStreamMetadataCache() { - return streamMetadataCache; - } - - public EpochTimeScheduler getTimerScheduler() { - return timerScheduler; - } -} http://git-wip-us.apache.org/repos/asf/samza/blob/9d2d49e9/samza-core/src/main/java/org/apache/samza/context/ContextImpl.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/context/ContextImpl.java b/samza-core/src/main/java/org/apache/samza/context/ContextImpl.java index c2c182f..93c7eb1 100644 --- a/samza-core/src/main/java/org/apache/samza/context/ContextImpl.java +++ b/samza-core/src/main/java/org/apache/samza/context/ContextImpl.java @@ -18,27 +18,36 @@ */ package org.apache.samza.context; +import com.google.common.base.Preconditions; + +import java.util.Objects; +import java.util.Optional; + + public class ContextImpl implements Context { private final JobContext jobContext; private final ContainerContext containerContext; private final TaskContext taskContext; - private final ApplicationContainerContext applicationContainerContext; - private final ApplicationTaskContext applicationTaskContext; + private final Optional<ApplicationContainerContext> applicationContainerContextOptional; + private final Optional<ApplicationTaskContext> applicationTaskContextOptional; /** * @param jobContext non-null job context * @param containerContext non-null framework container context * @param taskContext non-null framework task context - * @param applicationContainerContext nullable application-defined container context - * @param applicationTaskContext nullable application-defined task context + * @param applicationContainerContextOptional optional application-defined container context + * @param applicationTaskContextOptional optional application-defined task context */ - public ContextImpl(JobContext jobContext, ContainerContext containerContext, TaskContext taskContext, - ApplicationContainerContext applicationContainerContext, ApplicationTaskContext applicationTaskContext) { - this.jobContext = jobContext; - this.containerContext = containerContext; - this.taskContext = taskContext; - this.applicationContainerContext = applicationContainerContext; - this.applicationTaskContext = applicationTaskContext; + public ContextImpl(JobContext jobContext, + ContainerContext containerContext, + TaskContext taskContext, + Optional<ApplicationContainerContext> applicationContainerContextOptional, + Optional<ApplicationTaskContext> applicationTaskContextOptional) { + this.jobContext = Preconditions.checkNotNull(jobContext, "Job context can not be null"); + this.containerContext = Preconditions.checkNotNull(containerContext, "Container context can not be null"); + this.taskContext = Preconditions.checkNotNull(taskContext, "Task context can not be null"); + this.applicationContainerContextOptional = applicationContainerContextOptional; + this.applicationTaskContextOptional = applicationTaskContextOptional; } @Override @@ -58,17 +67,38 @@ public class ContextImpl implements Context { @Override public ApplicationContainerContext getApplicationContainerContext() { - if (this.applicationContainerContext == null) { + if (!this.applicationContainerContextOptional.isPresent()) { throw new IllegalStateException("No application-defined container context exists"); } - return this.applicationContainerContext; + return this.applicationContainerContextOptional.get(); } @Override public ApplicationTaskContext getApplicationTaskContext() { - if (this.applicationTaskContext == null) { + if (!this.applicationTaskContextOptional.isPresent()) { throw new IllegalStateException("No application-defined task context exists"); } - return this.applicationTaskContext; + return this.applicationTaskContextOptional.get(); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + ContextImpl context = (ContextImpl) o; + return Objects.equals(jobContext, context.jobContext) && Objects.equals(containerContext, context.containerContext) + && Objects.equals(taskContext, context.taskContext) && Objects.equals(applicationContainerContextOptional, + context.applicationContainerContextOptional) && Objects.equals(applicationTaskContextOptional, + context.applicationTaskContextOptional); + } + + @Override + public int hashCode() { + return Objects.hash(jobContext, containerContext, taskContext, applicationContainerContextOptional, + applicationTaskContextOptional); } } http://git-wip-us.apache.org/repos/asf/samza/blob/9d2d49e9/samza-core/src/main/java/org/apache/samza/context/JobContextImpl.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/context/JobContextImpl.java b/samza-core/src/main/java/org/apache/samza/context/JobContextImpl.java index 8fe44e4..797e2ca 100644 --- a/samza-core/src/main/java/org/apache/samza/context/JobContextImpl.java +++ b/samza-core/src/main/java/org/apache/samza/context/JobContextImpl.java @@ -19,6 +19,8 @@ package org.apache.samza.context; import org.apache.samza.config.Config; +import org.apache.samza.config.JobConfig; +import scala.Option; public class JobContextImpl implements JobContext { @@ -26,12 +28,30 @@ public class JobContextImpl implements JobContext { private final String jobName; private final String jobId; - public JobContextImpl(Config config, String jobName, String jobId) { + private JobContextImpl(Config config, String jobName, String jobId) { this.config = config; this.jobName = jobName; this.jobId = jobId; } + /** + * Build a {@link JobContextImpl} from a {@link Config} object. + * This extracts some information like job name and job id. + * + * @param config used to extract job information + * @return {@link JobContextImpl} corresponding to the {@code config} + * @throws IllegalArgumentException if job name is not defined in the {@code config} + */ + public static JobContextImpl fromConfigWithDefaults(Config config) { + JobConfig jobConfig = new JobConfig(config); + Option<String> jobName = jobConfig.getName(); + if (jobName.isEmpty()) { + throw new IllegalArgumentException("Job name is not defined in configuration"); + } + String jobId = jobConfig.getJobId(); + return new JobContextImpl(config, jobName.get(), jobId); + } + @Override public Config getConfig() { return this.config; http://git-wip-us.apache.org/repos/asf/samza/blob/9d2d49e9/samza-core/src/main/java/org/apache/samza/context/TaskContextImpl.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/context/TaskContextImpl.java b/samza-core/src/main/java/org/apache/samza/context/TaskContextImpl.java index e975dcd..ec52f8a 100644 --- a/samza-core/src/main/java/org/apache/samza/context/TaskContextImpl.java +++ b/samza-core/src/main/java/org/apache/samza/context/TaskContextImpl.java @@ -18,16 +18,21 @@ */ package org.apache.samza.context; -import java.util.function.Function; import org.apache.samza.checkpoint.OffsetManager; +import org.apache.samza.job.model.JobModel; import org.apache.samza.job.model.TaskModel; import org.apache.samza.metrics.MetricsRegistry; import org.apache.samza.scheduler.CallbackScheduler; import org.apache.samza.storage.kv.KeyValueStore; +import org.apache.samza.system.StreamMetadataCache; import org.apache.samza.system.SystemStreamPartition; import org.apache.samza.table.Table; import org.apache.samza.table.TableManager; +import java.util.HashMap; +import java.util.Map; +import java.util.function.Function; + public class TaskContextImpl implements TaskContext { private final TaskModel taskModel; @@ -36,19 +41,26 @@ public class TaskContextImpl implements TaskContext { private final TableManager tableManager; private final CallbackScheduler callbackScheduler; private final OffsetManager offsetManager; + private final JobModel jobModel; + private final StreamMetadataCache streamMetadataCache; + private final Map<String, Object> objectRegistry = new HashMap<>(); public TaskContextImpl(TaskModel taskModel, MetricsRegistry taskMetricsRegistry, Function<String, KeyValueStore> keyValueStoreProvider, TableManager tableManager, CallbackScheduler callbackScheduler, - OffsetManager offsetManager) { + OffsetManager offsetManager, + JobModel jobModel, + StreamMetadataCache streamMetadataCache) { this.taskModel = taskModel; this.taskMetricsRegistry = taskMetricsRegistry; this.keyValueStoreProvider = keyValueStoreProvider; this.tableManager = tableManager; this.callbackScheduler = callbackScheduler; this.offsetManager = offsetManager; + this.jobModel = jobModel; + this.streamMetadataCache = streamMetadataCache; } @Override @@ -84,4 +96,22 @@ public class TaskContextImpl implements TaskContext { public void setStartingOffset(SystemStreamPartition systemStreamPartition, String offset) { this.offsetManager.setStartingOffset(this.taskModel.getTaskName(), systemStreamPartition, offset); } + + // TODO SAMZA-1935: below methods are used by operator code; they should be decoupled from this client API + + public void registerObject(String name, Object value) { + this.objectRegistry.put(name, value); + } + + public Object fetchObject(String name) { + return this.objectRegistry.get(name); + } + + public JobModel getJobModel() { + return this.jobModel; + } + + public StreamMetadataCache getStreamMetadataCache() { + return this.streamMetadataCache; + } } http://git-wip-us.apache.org/repos/asf/samza/blob/9d2d49e9/samza-core/src/main/java/org/apache/samza/operators/impl/BroadcastOperatorImpl.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/operators/impl/BroadcastOperatorImpl.java b/samza-core/src/main/java/org/apache/samza/operators/impl/BroadcastOperatorImpl.java index 99ed089..4965f7b 100644 --- a/samza-core/src/main/java/org/apache/samza/operators/impl/BroadcastOperatorImpl.java +++ b/samza-core/src/main/java/org/apache/samza/operators/impl/BroadcastOperatorImpl.java @@ -19,7 +19,7 @@ package org.apache.samza.operators.impl; -import org.apache.samza.config.Config; +import org.apache.samza.context.Context; import org.apache.samza.operators.spec.BroadcastOperatorSpec; import org.apache.samza.operators.spec.OperatorSpec; import org.apache.samza.system.ControlMessage; @@ -28,7 +28,6 @@ import org.apache.samza.system.OutgoingMessageEnvelope; import org.apache.samza.system.SystemStream; import org.apache.samza.system.WatermarkMessage; import org.apache.samza.task.MessageCollector; -import org.apache.samza.task.TaskContext; import org.apache.samza.task.TaskCoordinator; import java.util.Collection; @@ -40,14 +39,14 @@ class BroadcastOperatorImpl<M> extends OperatorImpl<M, Void> { private final SystemStream systemStream; private final String taskName; - BroadcastOperatorImpl(BroadcastOperatorSpec<M> broadcastOpSpec, SystemStream systemStream, TaskContext context) { + BroadcastOperatorImpl(BroadcastOperatorSpec<M> broadcastOpSpec, SystemStream systemStream, Context context) { this.broadcastOpSpec = broadcastOpSpec; this.systemStream = systemStream; - this.taskName = context.getTaskName().getTaskName(); + this.taskName = context.getTaskContext().getTaskModel().getTaskName().getTaskName(); } @Override - protected void handleInit(Config config, TaskContext context) { + protected void handleInit(Context context) { } @Override http://git-wip-us.apache.org/repos/asf/samza/blob/9d2d49e9/samza-core/src/main/java/org/apache/samza/operators/impl/InputOperatorImpl.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/operators/impl/InputOperatorImpl.java b/samza-core/src/main/java/org/apache/samza/operators/impl/InputOperatorImpl.java index 6cc57e0..2a73064 100644 --- a/samza-core/src/main/java/org/apache/samza/operators/impl/InputOperatorImpl.java +++ b/samza-core/src/main/java/org/apache/samza/operators/impl/InputOperatorImpl.java @@ -18,14 +18,13 @@ */ package org.apache.samza.operators.impl; -import org.apache.samza.config.Config; +import org.apache.samza.context.Context; import org.apache.samza.operators.KV; import org.apache.samza.operators.functions.InputTransformer; import org.apache.samza.operators.spec.InputOperatorSpec; import org.apache.samza.operators.spec.OperatorSpec; import org.apache.samza.system.IncomingMessageEnvelope; import org.apache.samza.task.MessageCollector; -import org.apache.samza.task.TaskContext; import org.apache.samza.task.TaskCoordinator; import java.util.Collection; @@ -44,7 +43,7 @@ public final class InputOperatorImpl extends OperatorImpl<IncomingMessageEnvelop } @Override - protected void handleInit(Config config, TaskContext context) { + protected void handleInit(Context context) { } @Override http://git-wip-us.apache.org/repos/asf/samza/blob/9d2d49e9/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImpl.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImpl.java b/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImpl.java index 5cafd26..675b211 100644 --- a/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImpl.java +++ b/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImpl.java @@ -21,23 +21,23 @@ package org.apache.samza.operators.impl; import org.apache.samza.SamzaException; import org.apache.samza.config.Config; import org.apache.samza.config.MetricsConfig; -import org.apache.samza.container.TaskContextImpl; import org.apache.samza.container.TaskName; -import org.apache.samza.job.model.ContainerModel; +import org.apache.samza.context.Context; +import org.apache.samza.context.TaskContextImpl; import org.apache.samza.job.model.TaskModel; -import org.apache.samza.operators.Scheduler; -import org.apache.samza.operators.functions.ScheduledFunction; -import org.apache.samza.operators.functions.WatermarkFunction; -import org.apache.samza.system.EndOfStreamMessage; import org.apache.samza.metrics.Counter; import org.apache.samza.metrics.MetricsRegistry; import org.apache.samza.metrics.Timer; +import org.apache.samza.operators.Scheduler; +import org.apache.samza.operators.functions.ScheduledFunction; +import org.apache.samza.operators.functions.WatermarkFunction; import org.apache.samza.operators.spec.OperatorSpec; +import org.apache.samza.scheduler.CallbackScheduler; +import org.apache.samza.system.EndOfStreamMessage; import org.apache.samza.system.SystemStream; import org.apache.samza.system.SystemStreamPartition; import org.apache.samza.system.WatermarkMessage; import org.apache.samza.task.MessageCollector; -import org.apache.samza.task.TaskContext; import org.apache.samza.task.TaskCoordinator; import org.apache.samza.util.HighResolutionClock; import org.slf4j.Logger; @@ -82,16 +82,15 @@ public abstract class OperatorImpl<M, RM> { private EndOfStreamStates eosStates; // watermark states private WatermarkStates watermarkStates; - private TaskContext taskContext; + private CallbackScheduler callbackScheduler; private ControlMessageSender controlMessageSender; /** * Initialize this {@link OperatorImpl} and its user-defined functions. * - * @param config the {@link Config} for the task - * @param context the {@link TaskContext} for the task + * @param context the {@link Context} for the task */ - public final void init(Config config, TaskContext context) { + public final void init(Context context) { String opId = getOpImplId(); if (initialized) { @@ -102,32 +101,24 @@ public abstract class OperatorImpl<M, RM> { throw new IllegalStateException(String.format("Attempted to initialize Operator %s after it was closed.", opId)); } - this.highResClock = createHighResClock(config); + this.highResClock = createHighResClock(context.getJobContext().getConfig()); registeredOperators = new HashSet<>(); prevOperators = new HashSet<>(); inputStreams = new HashSet<>(); - MetricsRegistry metricsRegistry = context.getMetricsRegistry(); + // TODO SAMZA-1935: the objects that are only accessible through TaskContextImpl should be moved somewhere else + TaskContextImpl taskContext = (TaskContextImpl) context.getTaskContext(); + MetricsRegistry metricsRegistry = taskContext.getTaskMetricsRegistry(); this.numMessage = metricsRegistry.newCounter(METRICS_GROUP, opId + "-messages"); this.handleMessageNs = metricsRegistry.newTimer(METRICS_GROUP, opId + "-handle-message-ns"); this.handleTimerNs = metricsRegistry.newTimer(METRICS_GROUP, opId + "-handle-timer-ns"); - this.taskName = context.getTaskName(); + this.taskName = taskContext.getTaskModel().getTaskName(); - TaskContextImpl taskContext = (TaskContextImpl) context; this.eosStates = (EndOfStreamStates) taskContext.fetchObject(EndOfStreamStates.class.getName()); this.watermarkStates = (WatermarkStates) taskContext.fetchObject(WatermarkStates.class.getName()); this.controlMessageSender = new ControlMessageSender(taskContext.getStreamMetadataCache()); - - if (taskContext.getJobModel() != null) { - ContainerModel containerModel = taskContext.getJobModel().getContainers() - .get(context.getSamzaContainerContext().id); - this.taskModel = containerModel.getTasks().get(taskName); - } else { - this.taskModel = null; - this.usedInCurrentTask = true; - } - - this.taskContext = taskContext; - handleInit(config, taskContext); + this.taskModel = taskContext.getTaskModel(); + this.callbackScheduler = taskContext.getCallbackScheduler(); + handleInit(context); initialized = true; } @@ -135,10 +126,9 @@ public abstract class OperatorImpl<M, RM> { /** * Initialize this {@link OperatorImpl} and its user-defined functions. * - * @param config the {@link Config} for the task - * @param context the {@link TaskContext} for the task + * @param context the {@link Context} for the task */ - protected abstract void handleInit(Config config, TaskContext context); + protected abstract void handleInit(Context context); /** * Register an operator that this operator should propagate its results to. @@ -448,7 +438,7 @@ public abstract class OperatorImpl<M, RM> { return new Scheduler<K>() { @Override public void schedule(K key, long time) { - taskContext.scheduleCallback(key, time, (k, collector, coordinator) -> { + callbackScheduler.scheduleCallback(key, time, (k, collector, coordinator) -> { final ScheduledFunction<K, RM> scheduledFn = getOperatorSpec().getScheduledFn(); if (scheduledFn != null) { final Collection<RM> output = scheduledFn.onCallback(key, time); @@ -468,7 +458,7 @@ public abstract class OperatorImpl<M, RM> { @Override public void delete(K key) { - taskContext.deleteScheduledCallback(key); + callbackScheduler.deleteCallback(key); } }; } http://git-wip-us.apache.org/repos/asf/samza/blob/9d2d49e9/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImplGraph.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImplGraph.java b/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImplGraph.java index d76c7de..e668b91 100644 --- a/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImplGraph.java +++ b/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImplGraph.java @@ -23,20 +23,18 @@ import com.google.common.collect.Lists; import com.google.common.collect.Multimap; import org.apache.samza.config.Config; import org.apache.samza.config.StreamConfig; -import org.apache.samza.container.SamzaContainerContext; -import org.apache.samza.container.TaskContextImpl; +import org.apache.samza.context.Context; +import org.apache.samza.context.TaskContextImpl; import org.apache.samza.job.model.JobModel; -import org.apache.samza.metrics.MetricsRegistry; import org.apache.samza.operators.KV; +import org.apache.samza.operators.OperatorSpecGraph; import org.apache.samza.operators.Scheduler; import org.apache.samza.operators.functions.JoinFunction; import org.apache.samza.operators.functions.PartialJoinFunction; -import org.apache.samza.util.TimestampedValue; import org.apache.samza.operators.spec.BroadcastOperatorSpec; import org.apache.samza.operators.spec.InputOperatorSpec; import org.apache.samza.operators.spec.JoinOperatorSpec; import org.apache.samza.operators.spec.OperatorSpec; -import org.apache.samza.operators.OperatorSpecGraph; import org.apache.samza.operators.spec.OutputOperatorSpec; import org.apache.samza.operators.spec.PartitionByOperatorSpec; import org.apache.samza.operators.spec.SendToTableOperatorSpec; @@ -46,8 +44,8 @@ import org.apache.samza.operators.spec.StreamTableJoinOperatorSpec; import org.apache.samza.operators.spec.WindowOperatorSpec; import org.apache.samza.storage.kv.KeyValueStore; import org.apache.samza.system.SystemStream; -import org.apache.samza.task.TaskContext; import org.apache.samza.util.Clock; +import org.apache.samza.util.TimestampedValue; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -93,14 +91,14 @@ public class OperatorImplGraph { * in the {@code specGraph}. * * @param specGraph the {@link OperatorSpecGraph} containing the logical {@link OperatorSpec} DAG - * @param config the {@link Config} required to instantiate operators - * @param context the {@link TaskContext} required to instantiate operators + * @param context the {@link Context} required to instantiate operators * @param clock the {@link Clock} to get current time */ - public OperatorImplGraph(OperatorSpecGraph specGraph, Config config, TaskContext context, Clock clock) { + public OperatorImplGraph(OperatorSpecGraph specGraph, Context context, Clock clock) { this.clock = clock; - StreamConfig streamConfig = new StreamConfig(config); - TaskContextImpl taskContext = (TaskContextImpl) context; + StreamConfig streamConfig = new StreamConfig(context.getJobContext().getConfig()); + // TODO SAMZA-1935: the objects that are only accessible through TaskContextImpl should be moved somewhere else + TaskContextImpl taskContext = (TaskContextImpl) context.getTaskContext(); Map<SystemStream, Integer> producerTaskCounts = hasIntermediateStreams(specGraph) ? getProducerTaskCountForIntermediateStreams( @@ -113,15 +111,16 @@ public class OperatorImplGraph { // set states for end-of-stream taskContext.registerObject(EndOfStreamStates.class.getName(), - new EndOfStreamStates(context.getSystemStreamPartitions(), producerTaskCounts)); + new EndOfStreamStates(taskContext.getTaskModel().getSystemStreamPartitions(), producerTaskCounts)); // set states for watermark taskContext.registerObject(WatermarkStates.class.getName(), - new WatermarkStates(context.getSystemStreamPartitions(), producerTaskCounts, getMetricsRegistry(context))); + new WatermarkStates(taskContext.getTaskModel().getSystemStreamPartitions(), producerTaskCounts, + context.getContainerContext().getContainerMetricsRegistry())); specGraph.getInputOperators().forEach((streamId, inputOpSpec) -> { SystemStream systemStream = streamConfig.streamIdToSystemStream(streamId); InputOperatorImpl inputOperatorImpl = - (InputOperatorImpl) createAndRegisterOperatorImpl(null, inputOpSpec, systemStream, config, context); + (InputOperatorImpl) createAndRegisterOperatorImpl(null, inputOpSpec, systemStream, context); this.inputOperators.put(systemStream, inputOperatorImpl); }); } @@ -158,18 +157,16 @@ public class OperatorImplGraph { * @param prevOperatorSpec the parent of the current {@code operatorSpec} in the traversal * @param operatorSpec the {@link OperatorSpec} to create the {@link OperatorImpl} for * @param inputStream the source input stream that we traverse the {@link OperatorSpecGraph} from - * @param config the {@link Config} required to instantiate operators - * @param context the {@link TaskContext} required to instantiate operators + * @param context the {@link Context} required to instantiate operators * @return the operator implementation for the operatorSpec */ private OperatorImpl createAndRegisterOperatorImpl(OperatorSpec prevOperatorSpec, OperatorSpec operatorSpec, - SystemStream inputStream, Config config, TaskContext context) { - + SystemStream inputStream, Context context) { if (!operatorImpls.containsKey(operatorSpec.getOpId()) || operatorSpec instanceof JoinOperatorSpec) { // Either this is the first time we've seen this operatorSpec, or this is a join operator spec // and we need to create 2 partial join operator impls for it. Initialize and register the sub-DAG. - OperatorImpl operatorImpl = createOperatorImpl(prevOperatorSpec, operatorSpec, config, context); - operatorImpl.init(config, context); + OperatorImpl operatorImpl = createOperatorImpl(prevOperatorSpec, operatorSpec, context); + operatorImpl.init(context); operatorImpl.registerInputStream(inputStream); if (operatorSpec.getScheduledFn() != null) { @@ -185,8 +182,7 @@ public class OperatorImplGraph { Collection<OperatorSpec> registeredSpecs = operatorSpec.getRegisteredOperatorSpecs(); registeredSpecs.forEach(registeredSpec -> { LOG.debug("Creating operator {} with opCode: {}", registeredSpec.getOpId(), registeredSpec.getOpCode()); - OperatorImpl nextImpl = - createAndRegisterOperatorImpl(operatorSpec, registeredSpec, inputStream, config, context); + OperatorImpl nextImpl = createAndRegisterOperatorImpl(operatorSpec, registeredSpec, inputStream, context); operatorImpl.registerNextOperator(nextImpl); }); return operatorImpl; @@ -197,9 +193,8 @@ public class OperatorImplGraph { // We still need to traverse the DAG further to register the input streams. Collection<OperatorSpec> registeredSpecs = operatorSpec.getRegisteredOperatorSpecs(); - registeredSpecs.forEach(registeredSpec -> { - createAndRegisterOperatorImpl(operatorSpec, registeredSpec, inputStream, config, context); - }); + registeredSpecs.forEach( + registeredSpec -> createAndRegisterOperatorImpl(operatorSpec, registeredSpec, inputStream, context)); return operatorImpl; } } @@ -209,19 +204,18 @@ public class OperatorImplGraph { * * @param prevOperatorSpec the original {@link OperatorSpec} that produces output for {@code operatorSpec} from {@link OperatorSpecGraph} * @param operatorSpec the original {@link OperatorSpec} from {@link OperatorSpecGraph} - * @param config the {@link Config} required to instantiate operators - * @param context the {@link TaskContext} required to instantiate operators + * @param context the {@link Context} required to instantiate operators * @return the {@link OperatorImpl} implementation instance */ - OperatorImpl createOperatorImpl(OperatorSpec prevOperatorSpec, OperatorSpec operatorSpec, - Config config, TaskContext context) { + OperatorImpl createOperatorImpl(OperatorSpec prevOperatorSpec, OperatorSpec operatorSpec, Context context) { + Config config = context.getJobContext().getConfig(); StreamConfig streamConfig = new StreamConfig(config); if (operatorSpec instanceof InputOperatorSpec) { return new InputOperatorImpl((InputOperatorSpec) operatorSpec); } else if (operatorSpec instanceof StreamOperatorSpec) { return new StreamOperatorImpl((StreamOperatorSpec) operatorSpec); } else if (operatorSpec instanceof SinkOperatorSpec) { - return new SinkOperatorImpl((SinkOperatorSpec) operatorSpec, config, context); + return new SinkOperatorImpl((SinkOperatorSpec) operatorSpec); } else if (operatorSpec instanceof OutputOperatorSpec) { String streamId = ((OutputOperatorSpec) operatorSpec).getOutputStream().getStreamId(); SystemStream systemStream = streamConfig.streamIdToSystemStream(streamId); @@ -234,12 +228,11 @@ public class OperatorImplGraph { return new WindowOperatorImpl((WindowOperatorSpec) operatorSpec, clock); } else if (operatorSpec instanceof JoinOperatorSpec) { return getOrCreatePartialJoinOpImpls((JoinOperatorSpec) operatorSpec, - prevOperatorSpec.equals(((JoinOperatorSpec) operatorSpec).getLeftInputOpSpec()), - config, context, clock); + prevOperatorSpec.equals(((JoinOperatorSpec) operatorSpec).getLeftInputOpSpec()), clock); } else if (operatorSpec instanceof StreamTableJoinOperatorSpec) { - return new StreamTableJoinOperatorImpl((StreamTableJoinOperatorSpec) operatorSpec, config, context); + return new StreamTableJoinOperatorImpl((StreamTableJoinOperatorSpec) operatorSpec, context); } else if (operatorSpec instanceof SendToTableOperatorSpec) { - return new SendToTableOperatorImpl((SendToTableOperatorSpec) operatorSpec, config, context); + return new SendToTableOperatorImpl((SendToTableOperatorSpec) operatorSpec, context); } else if (operatorSpec instanceof BroadcastOperatorSpec) { String streamId = ((BroadcastOperatorSpec) operatorSpec).getOutputStream().getStreamId(); SystemStream systemStream = streamConfig.streamIdToSystemStream(streamId); @@ -250,14 +243,14 @@ public class OperatorImplGraph { } private PartialJoinOperatorImpl getOrCreatePartialJoinOpImpls(JoinOperatorSpec joinOpSpec, boolean isLeft, - Config config, TaskContext context, Clock clock) { + Clock clock) { // get the per task pair of PartialJoinOperatorImpl for the corresponding {@code joinOpSpec} KV<PartialJoinOperatorImpl, PartialJoinOperatorImpl> partialJoinOpImpls = joinOpImpls.computeIfAbsent(joinOpSpec.getOpId(), joinOpId -> { PartialJoinFunction leftJoinFn = createLeftJoinFn(joinOpSpec); PartialJoinFunction rightJoinFn = createRightJoinFn(joinOpSpec); - return new KV(new PartialJoinOperatorImpl(joinOpSpec, true, leftJoinFn, rightJoinFn, config, context, clock), - new PartialJoinOperatorImpl(joinOpSpec, false, rightJoinFn, leftJoinFn, config, context, clock)); + return new KV(new PartialJoinOperatorImpl(joinOpSpec, true, leftJoinFn, rightJoinFn, clock), + new PartialJoinOperatorImpl(joinOpSpec, false, rightJoinFn, leftJoinFn, clock)); }); if (isLeft) { // we got here from the left side of the join @@ -288,12 +281,13 @@ public class OperatorImplGraph { } @Override - public void init(Config config, TaskContext context) { + public void init(Context context) { String leftStoreName = joinOpSpec.getLeftOpId(); - leftStreamState = (KeyValueStore<Object, TimestampedValue<Object>>) context.getStore(leftStoreName); + leftStreamState = + (KeyValueStore<Object, TimestampedValue<Object>>) context.getTaskContext().getStore(leftStoreName); // user-defined joinFn should only be initialized once, so we do it only in left partial join function. - joinFn.init(config, context); + joinFn.init(context); } @Override @@ -320,9 +314,10 @@ public class OperatorImplGraph { } @Override - public void init(Config config, TaskContext context) { + public void init(Context context) { String rightStoreName = joinOpSpec.getRightOpId(); - rightStreamState = (KeyValueStore<Object, TimestampedValue<Object>>) context.getStore(rightStoreName); + rightStreamState = + (KeyValueStore<Object, TimestampedValue<Object>>) context.getTaskContext().getStore(rightStoreName); // user-defined joinFn should only be initialized once, // so we do it only in left partial join function and not here again. @@ -405,9 +400,4 @@ public class OperatorImplGraph { private boolean hasIntermediateStreams(OperatorSpecGraph specGraph) { return !Collections.disjoint(specGraph.getInputOperators().keySet(), specGraph.getOutputStreams().keySet()); } - - private static MetricsRegistry getMetricsRegistry(TaskContext context) { - final SamzaContainerContext containerContext = context.getSamzaContainerContext(); - return containerContext != null ? containerContext.metricsRegistry : context.getMetricsRegistry(); - } } http://git-wip-us.apache.org/repos/asf/samza/blob/9d2d49e9/samza-core/src/main/java/org/apache/samza/operators/impl/OutputOperatorImpl.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/operators/impl/OutputOperatorImpl.java b/samza-core/src/main/java/org/apache/samza/operators/impl/OutputOperatorImpl.java index 22fbb1b..407cdd9 100644 --- a/samza-core/src/main/java/org/apache/samza/operators/impl/OutputOperatorImpl.java +++ b/samza-core/src/main/java/org/apache/samza/operators/impl/OutputOperatorImpl.java @@ -18,7 +18,7 @@ */ package org.apache.samza.operators.impl; -import org.apache.samza.config.Config; +import org.apache.samza.context.Context; import org.apache.samza.operators.KV; import org.apache.samza.operators.spec.OperatorSpec; import org.apache.samza.operators.spec.OutputOperatorSpec; @@ -26,7 +26,6 @@ import org.apache.samza.operators.spec.OutputStreamImpl; import org.apache.samza.system.OutgoingMessageEnvelope; import org.apache.samza.system.SystemStream; import org.apache.samza.task.MessageCollector; -import org.apache.samza.task.TaskContext; import org.apache.samza.task.TaskCoordinator; import java.util.Collection; @@ -49,7 +48,7 @@ class OutputOperatorImpl<M> extends OperatorImpl<M, Void> { } @Override - protected void handleInit(Config config, TaskContext context) { + protected void handleInit(Context context) { } @Override http://git-wip-us.apache.org/repos/asf/samza/blob/9d2d49e9/samza-core/src/main/java/org/apache/samza/operators/impl/PartialJoinOperatorImpl.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/operators/impl/PartialJoinOperatorImpl.java b/samza-core/src/main/java/org/apache/samza/operators/impl/PartialJoinOperatorImpl.java index 0cdde49..55658eb 100644 --- a/samza-core/src/main/java/org/apache/samza/operators/impl/PartialJoinOperatorImpl.java +++ b/samza-core/src/main/java/org/apache/samza/operators/impl/PartialJoinOperatorImpl.java @@ -19,16 +19,15 @@ package org.apache.samza.operators.impl; import org.apache.samza.SamzaException; -import org.apache.samza.config.Config; +import org.apache.samza.context.Context; import org.apache.samza.operators.functions.PartialJoinFunction; -import org.apache.samza.util.TimestampedValue; import org.apache.samza.operators.spec.JoinOperatorSpec; import org.apache.samza.operators.spec.OperatorSpec; import org.apache.samza.storage.kv.KeyValueStore; import org.apache.samza.task.MessageCollector; -import org.apache.samza.task.TaskContext; import org.apache.samza.task.TaskCoordinator; import org.apache.samza.util.Clock; +import org.apache.samza.util.TimestampedValue; import java.util.Collection; import java.util.Collections; @@ -54,7 +53,7 @@ class PartialJoinOperatorImpl<K, M, OM, JM> extends OperatorImpl<M, JM> { PartialJoinOperatorImpl(JoinOperatorSpec<K, M, OM, JM> joinOpSpec, boolean isLeftSide, PartialJoinFunction<K, M, OM, JM> thisPartialJoinFn, PartialJoinFunction<K, OM, M, JM> otherPartialJoinFn, - Config config, TaskContext context, Clock clock) { + Clock clock) { this.joinOpSpec = joinOpSpec; this.isLeftSide = isLeftSide; this.thisPartialJoinFn = thisPartialJoinFn; @@ -64,8 +63,8 @@ class PartialJoinOperatorImpl<K, M, OM, JM> extends OperatorImpl<M, JM> { } @Override - protected void handleInit(Config config, TaskContext context) { - this.thisPartialJoinFn.init(config, context); + protected void handleInit(Context context) { + this.thisPartialJoinFn.init(context); } @Override
