http://git-wip-us.apache.org/repos/asf/samza/blob/9d2d49e9/samza-core/src/main/java/org/apache/samza/operators/impl/PartitionByOperatorImpl.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/operators/impl/PartitionByOperatorImpl.java b/samza-core/src/main/java/org/apache/samza/operators/impl/PartitionByOperatorImpl.java index 63e269d..88644ce 100644 --- a/samza-core/src/main/java/org/apache/samza/operators/impl/PartitionByOperatorImpl.java +++ b/samza-core/src/main/java/org/apache/samza/operators/impl/PartitionByOperatorImpl.java @@ -18,8 +18,8 @@ */ package org.apache.samza.operators.impl; -import org.apache.samza.config.Config; -import org.apache.samza.container.TaskContextImpl; +import org.apache.samza.context.Context; +import org.apache.samza.context.TaskContextImpl; import org.apache.samza.operators.functions.MapFunction; import org.apache.samza.operators.spec.OperatorSpec; import org.apache.samza.operators.spec.PartitionByOperatorSpec; @@ -30,7 +30,6 @@ import org.apache.samza.system.StreamMetadataCache; import org.apache.samza.system.SystemStream; import org.apache.samza.system.WatermarkMessage; import org.apache.samza.task.MessageCollector; -import org.apache.samza.task.TaskContext; import org.apache.samza.task.TaskCoordinator; import java.util.Collection; @@ -50,20 +49,20 @@ class PartitionByOperatorImpl<M, K, V> extends OperatorImpl<M, Void> { private final ControlMessageSender controlMessageSender; PartitionByOperatorImpl(PartitionByOperatorSpec<M, K, V> partitionByOpSpec, - SystemStream systemStream, TaskContext context) { + SystemStream systemStream, Context context) { this.partitionByOpSpec = partitionByOpSpec; this.systemStream = systemStream; this.keyFunction = partitionByOpSpec.getKeyFunction(); this.valueFunction = partitionByOpSpec.getValueFunction(); - this.taskName = context.getTaskName().getTaskName(); - StreamMetadataCache streamMetadataCache = ((TaskContextImpl) context).getStreamMetadataCache(); + this.taskName = context.getTaskContext().getTaskModel().getTaskName().getTaskName(); + StreamMetadataCache streamMetadataCache = ((TaskContextImpl) context.getTaskContext()).getStreamMetadataCache(); this.controlMessageSender = new ControlMessageSender(streamMetadataCache); } @Override - protected void handleInit(Config config, TaskContext context) { - this.keyFunction.init(config, context); - this.valueFunction.init(config, context); + protected void handleInit(Context context) { + this.keyFunction.init(context); + this.valueFunction.init(context); } @Override
http://git-wip-us.apache.org/repos/asf/samza/blob/9d2d49e9/samza-core/src/main/java/org/apache/samza/operators/impl/SendToTableOperatorImpl.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/operators/impl/SendToTableOperatorImpl.java b/samza-core/src/main/java/org/apache/samza/operators/impl/SendToTableOperatorImpl.java index 5ce1328..be3e0a3 100644 --- a/samza-core/src/main/java/org/apache/samza/operators/impl/SendToTableOperatorImpl.java +++ b/samza-core/src/main/java/org/apache/samza/operators/impl/SendToTableOperatorImpl.java @@ -18,18 +18,17 @@ */ package org.apache.samza.operators.impl; -import java.util.Collection; -import java.util.Collections; - -import org.apache.samza.config.Config; +import org.apache.samza.context.Context; import org.apache.samza.operators.KV; import org.apache.samza.operators.spec.OperatorSpec; import org.apache.samza.operators.spec.SendToTableOperatorSpec; import org.apache.samza.table.ReadWriteTable; import org.apache.samza.task.MessageCollector; -import org.apache.samza.task.TaskContext; import org.apache.samza.task.TaskCoordinator; +import java.util.Collection; +import java.util.Collections; + /** * Implementation of a send-stream-to-table operator that stores the record @@ -43,13 +42,13 @@ public class SendToTableOperatorImpl<K, V> extends OperatorImpl<KV<K, V>, Void> private final SendToTableOperatorSpec<K, V> sendToTableOpSpec; private final ReadWriteTable<K, V> table; - SendToTableOperatorImpl(SendToTableOperatorSpec<K, V> sendToTableOpSpec, Config config, TaskContext context) { + SendToTableOperatorImpl(SendToTableOperatorSpec<K, V> sendToTableOpSpec, Context context) { this.sendToTableOpSpec = sendToTableOpSpec; - this.table = (ReadWriteTable) context.getTable(sendToTableOpSpec.getTableSpec().getId()); + this.table = (ReadWriteTable) context.getTaskContext().getTable(sendToTableOpSpec.getTableSpec().getId()); } @Override - protected void handleInit(Config config, TaskContext context) { + protected void handleInit(Context context) { } @Override http://git-wip-us.apache.org/repos/asf/samza/blob/9d2d49e9/samza-core/src/main/java/org/apache/samza/operators/impl/SinkOperatorImpl.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/operators/impl/SinkOperatorImpl.java b/samza-core/src/main/java/org/apache/samza/operators/impl/SinkOperatorImpl.java index 5dbe27f..6fe9006 100644 --- a/samza-core/src/main/java/org/apache/samza/operators/impl/SinkOperatorImpl.java +++ b/samza-core/src/main/java/org/apache/samza/operators/impl/SinkOperatorImpl.java @@ -18,12 +18,11 @@ */ package org.apache.samza.operators.impl; -import org.apache.samza.config.Config; +import org.apache.samza.context.Context; import org.apache.samza.operators.functions.SinkFunction; import org.apache.samza.operators.spec.OperatorSpec; import org.apache.samza.operators.spec.SinkOperatorSpec; import org.apache.samza.task.MessageCollector; -import org.apache.samza.task.TaskContext; import org.apache.samza.task.TaskCoordinator; import java.util.Collection; @@ -38,14 +37,14 @@ class SinkOperatorImpl<M> extends OperatorImpl<M, Void> { private final SinkOperatorSpec<M> sinkOpSpec; private final SinkFunction<M> sinkFn; - SinkOperatorImpl(SinkOperatorSpec<M> sinkOpSpec, Config config, TaskContext context) { + SinkOperatorImpl(SinkOperatorSpec<M> sinkOpSpec) { this.sinkOpSpec = sinkOpSpec; this.sinkFn = sinkOpSpec.getSinkFn(); } @Override - protected void handleInit(Config config, TaskContext context) { - this.sinkFn.init(config, context); + protected void handleInit(Context context) { + this.sinkFn.init(context); } @Override http://git-wip-us.apache.org/repos/asf/samza/blob/9d2d49e9/samza-core/src/main/java/org/apache/samza/operators/impl/StreamOperatorImpl.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/operators/impl/StreamOperatorImpl.java b/samza-core/src/main/java/org/apache/samza/operators/impl/StreamOperatorImpl.java index 6cd426b..1a615bd 100644 --- a/samza-core/src/main/java/org/apache/samza/operators/impl/StreamOperatorImpl.java +++ b/samza-core/src/main/java/org/apache/samza/operators/impl/StreamOperatorImpl.java @@ -18,12 +18,11 @@ */ package org.apache.samza.operators.impl; -import org.apache.samza.config.Config; +import org.apache.samza.context.Context; import org.apache.samza.operators.functions.FlatMapFunction; import org.apache.samza.operators.spec.OperatorSpec; import org.apache.samza.operators.spec.StreamOperatorSpec; import org.apache.samza.task.MessageCollector; -import org.apache.samza.task.TaskContext; import org.apache.samza.task.TaskCoordinator; import java.util.Collection; @@ -46,8 +45,8 @@ class StreamOperatorImpl<M, RM> extends OperatorImpl<M, RM> { } @Override - protected void handleInit(Config config, TaskContext context) { - transformFn.init(config, context); + protected void handleInit(Context context) { + transformFn.init(context); } @Override http://git-wip-us.apache.org/repos/asf/samza/blob/9d2d49e9/samza-core/src/main/java/org/apache/samza/operators/impl/StreamTableJoinOperatorImpl.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/operators/impl/StreamTableJoinOperatorImpl.java b/samza-core/src/main/java/org/apache/samza/operators/impl/StreamTableJoinOperatorImpl.java index 54a5770..d44241d 100644 --- a/samza-core/src/main/java/org/apache/samza/operators/impl/StreamTableJoinOperatorImpl.java +++ b/samza-core/src/main/java/org/apache/samza/operators/impl/StreamTableJoinOperatorImpl.java @@ -18,18 +18,17 @@ */ package org.apache.samza.operators.impl; -import java.util.Collection; -import java.util.Collections; - -import org.apache.samza.config.Config; +import org.apache.samza.context.Context; import org.apache.samza.operators.KV; import org.apache.samza.operators.spec.OperatorSpec; import org.apache.samza.operators.spec.StreamTableJoinOperatorSpec; import org.apache.samza.table.ReadableTable; import org.apache.samza.task.MessageCollector; -import org.apache.samza.task.TaskContext; import org.apache.samza.task.TaskCoordinator; +import java.util.Collection; +import java.util.Collections; + /** * Implementation of a stream-table join operator that first retrieve the value of @@ -45,15 +44,14 @@ class StreamTableJoinOperatorImpl<K, M, R extends KV, JM> extends OperatorImpl<M private final StreamTableJoinOperatorSpec<K, M, R, JM> joinOpSpec; private final ReadableTable<K, ?> table; - StreamTableJoinOperatorImpl(StreamTableJoinOperatorSpec<K, M, R, JM> joinOpSpec, - Config config, TaskContext context) { + StreamTableJoinOperatorImpl(StreamTableJoinOperatorSpec<K, M, R, JM> joinOpSpec, Context context) { this.joinOpSpec = joinOpSpec; - this.table = (ReadableTable) context.getTable(joinOpSpec.getTableSpec().getId()); + this.table = (ReadableTable) context.getTaskContext().getTable(joinOpSpec.getTableSpec().getId()); } @Override - protected void handleInit(Config config, TaskContext context) { - this.joinOpSpec.getJoinFn().init(config, context); + protected void handleInit(Context context) { + this.joinOpSpec.getJoinFn().init(context); } @Override http://git-wip-us.apache.org/repos/asf/samza/blob/9d2d49e9/samza-core/src/main/java/org/apache/samza/operators/impl/WindowOperatorImpl.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/operators/impl/WindowOperatorImpl.java b/samza-core/src/main/java/org/apache/samza/operators/impl/WindowOperatorImpl.java index b175671..c09c5f8 100644 --- a/samza-core/src/main/java/org/apache/samza/operators/impl/WindowOperatorImpl.java +++ b/samza-core/src/main/java/org/apache/samza/operators/impl/WindowOperatorImpl.java @@ -21,14 +21,13 @@ package org.apache.samza.operators.impl; import com.google.common.base.Preconditions; -import org.apache.samza.config.Config; +import org.apache.samza.context.Context; import org.apache.samza.operators.functions.FoldLeftFunction; import org.apache.samza.operators.functions.MapFunction; import org.apache.samza.operators.functions.SupplierFunction; import org.apache.samza.operators.impl.store.TimeSeriesKey; import org.apache.samza.operators.impl.store.TimeSeriesStore; import org.apache.samza.operators.impl.store.TimeSeriesStoreImpl; -import org.apache.samza.util.TimestampedValue; import org.apache.samza.operators.spec.OperatorSpec; import org.apache.samza.operators.spec.WindowOperatorSpec; import org.apache.samza.operators.triggers.FiringType; @@ -45,9 +44,9 @@ import org.apache.samza.operators.windows.internal.WindowType; import org.apache.samza.storage.kv.ClosableIterator; import org.apache.samza.storage.kv.KeyValueStore; import org.apache.samza.task.MessageCollector; -import org.apache.samza.task.TaskContext; import org.apache.samza.task.TaskCoordinator; import org.apache.samza.util.Clock; +import org.apache.samza.util.TimestampedValue; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -111,23 +110,23 @@ public class WindowOperatorImpl<M, K> extends OperatorImpl<M, WindowPane<K, Obje } @Override - protected void handleInit(Config config, TaskContext context) { + protected void handleInit(Context context) { KeyValueStore<TimeSeriesKey<K>, Object> store = - (KeyValueStore<TimeSeriesKey<K>, Object>) context.getStore(windowOpSpec.getOpId()); + (KeyValueStore<TimeSeriesKey<K>, Object>) context.getTaskContext().getStore(windowOpSpec.getOpId()); if (initializer != null) { - initializer.init(config, context); + initializer.init(context); } if (keyFn != null) { - keyFn.init(config, context); + keyFn.init(context); } // For aggregating windows, we use the store in over-write mode since we only retain the aggregated // value. Else, we use the store in append-mode. if (foldLeftFn != null) { - foldLeftFn.init(config, context); + foldLeftFn.init(context); timeSeriesStore = new TimeSeriesStoreImpl(store, false); } else { timeSeriesStore = new TimeSeriesStoreImpl(store, true); http://git-wip-us.apache.org/repos/asf/samza/blob/9d2d49e9/samza-core/src/main/java/org/apache/samza/operators/spec/FilterOperatorSpec.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/operators/spec/FilterOperatorSpec.java b/samza-core/src/main/java/org/apache/samza/operators/spec/FilterOperatorSpec.java index 4e640dc..c1d62f5 100644 --- a/samza-core/src/main/java/org/apache/samza/operators/spec/FilterOperatorSpec.java +++ b/samza-core/src/main/java/org/apache/samza/operators/spec/FilterOperatorSpec.java @@ -20,12 +20,11 @@ package org.apache.samza.operators.spec; import java.util.ArrayList; import java.util.Collection; -import org.apache.samza.config.Config; +import org.apache.samza.context.Context; import org.apache.samza.operators.functions.FilterFunction; import org.apache.samza.operators.functions.FlatMapFunction; import org.apache.samza.operators.functions.ScheduledFunction; import org.apache.samza.operators.functions.WatermarkFunction; -import org.apache.samza.task.TaskContext; /** @@ -50,8 +49,8 @@ class FilterOperatorSpec<M> extends StreamOperatorSpec<M, M> { } @Override - public void init(Config config, TaskContext context) { - filterFn.init(config, context); + public void init(Context context) { + filterFn.init(context); } @Override http://git-wip-us.apache.org/repos/asf/samza/blob/9d2d49e9/samza-core/src/main/java/org/apache/samza/operators/spec/MapOperatorSpec.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/operators/spec/MapOperatorSpec.java b/samza-core/src/main/java/org/apache/samza/operators/spec/MapOperatorSpec.java index 6ce522f..d3a587a 100644 --- a/samza-core/src/main/java/org/apache/samza/operators/spec/MapOperatorSpec.java +++ b/samza-core/src/main/java/org/apache/samza/operators/spec/MapOperatorSpec.java @@ -20,12 +20,11 @@ package org.apache.samza.operators.spec; import java.util.ArrayList; import java.util.Collection; -import org.apache.samza.config.Config; +import org.apache.samza.context.Context; import org.apache.samza.operators.functions.FlatMapFunction; import org.apache.samza.operators.functions.MapFunction; import org.apache.samza.operators.functions.ScheduledFunction; import org.apache.samza.operators.functions.WatermarkFunction; -import org.apache.samza.task.TaskContext; /** @@ -53,8 +52,8 @@ class MapOperatorSpec<M, OM> extends StreamOperatorSpec<M, OM> { } @Override - public void init(Config config, TaskContext context) { - mapFn.init(config, context); + public void init(Context context) { + mapFn.init(context); } @Override http://git-wip-us.apache.org/repos/asf/samza/blob/9d2d49e9/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java b/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java index 26e52f2..3149989 100644 --- a/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java +++ b/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java @@ -23,6 +23,7 @@ import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableList; import com.google.common.util.concurrent.ThreadFactoryBuilder; import java.util.Map; +import java.util.Optional; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -35,6 +36,11 @@ import org.apache.samza.config.JobCoordinatorConfig; import org.apache.samza.config.TaskConfigJava; import org.apache.samza.container.SamzaContainer; import org.apache.samza.container.SamzaContainerListener; +import org.apache.samza.context.ApplicationContainerContext; +import org.apache.samza.context.ApplicationContainerContextFactory; +import org.apache.samza.context.ApplicationTaskContext; +import org.apache.samza.context.ApplicationTaskContextFactory; +import org.apache.samza.context.JobContextImpl; import org.apache.samza.coordinator.JobCoordinator; import org.apache.samza.coordinator.JobCoordinatorFactory; import org.apache.samza.coordinator.JobCoordinatorListener; @@ -46,6 +52,8 @@ import org.apache.samza.util.ScalaJavaUtil; import org.apache.samza.util.Util; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import scala.Option; + /** * StreamProcessor can be embedded in any application or executed in a distributed environment (aka cluster) as an @@ -97,6 +105,16 @@ public class StreamProcessor { private final JobCoordinator jobCoordinator; private final ProcessorLifecycleListener processorListener; private final TaskFactory taskFactory; + /** + * Type parameter needs to be {@link ApplicationContainerContext} so that we can eventually call the base methods of + * the context object. + */ + private final Optional<ApplicationContainerContextFactory<ApplicationContainerContext>> applicationDefinedContainerContextFactoryOptional; + /** + * Type parameter needs to be {@link ApplicationTaskContext} so that we can eventually call the base methods of the + * context object. + */ + private final Optional<ApplicationTaskContextFactory<ApplicationTaskContext>> applicationDefinedTaskContextFactoryOptional; private final Map<String, MetricsReporter> customMetricsReporter; private final Config config; private final long taskShutdownMs; @@ -143,57 +161,60 @@ public class StreamProcessor { JobCoordinatorListener jobCoordinatorListener = null; /** - * StreamProcessor encapsulates and manages the lifecycle of {@link JobCoordinator} and {@link SamzaContainer}. - * - * <p> - * On startup, StreamProcessor starts the JobCoordinator. Schedules the SamzaContainer to run in a ExecutorService - * when it receives new {@link JobModel} from JobCoordinator. - * <p> - * - * <b>Note:</b> Lifecycle of the ExecutorService is fully managed by the StreamProcessor. + * Same as {@link #StreamProcessor(Config, Map, TaskFactory, ProcessorLifecycleListener, JobCoordinator)}, except + * it creates a {@link JobCoordinator} instead of accepting it as an argument. * - * @param config configuration required to launch {@link JobCoordinator} and {@link SamzaContainer}. - * @param customMetricsReporters metricReporter instances that will be used by SamzaContainer and JobCoordinator to report metrics. - * @param taskFactory the {@link TaskFactory} to be used for creating task instances. - * @param processorListener listener to the StreamProcessor life cycle. + * Deprecated: Use {@link #StreamProcessor(Config, Map, TaskFactory, Optional, Optional, + * StreamProcessorLifecycleListenerFactory, JobCoordinator)} instead. */ + @Deprecated public StreamProcessor(Config config, Map<String, MetricsReporter> customMetricsReporters, TaskFactory taskFactory, ProcessorLifecycleListener processorListener) { this(config, customMetricsReporters, taskFactory, processorListener, null); } /** - * Same as {@link #StreamProcessor(Config, Map, TaskFactory, ProcessorLifecycleListener)}, except the - * {@link JobCoordinator} is given for this {@link StreamProcessor}. - * @param config configuration required to launch {@link JobCoordinator} and {@link SamzaContainer} - * @param customMetricsReporters metric Reporter - * @param taskFactory task factory to instantiate the Task + * Same as {@link #StreamProcessor(Config, Map, TaskFactory, Optional, Optional, + * StreamProcessorLifecycleListenerFactory, JobCoordinator)}, with the following differences: + * <ol> + * <li>Passes null for application-defined context factories</li> + * <li>Accepts a {@link ProcessorLifecycleListener} directly instead of a + * {@link StreamProcessorLifecycleListenerFactory}</li> + * </ol> + * Deprecated: Use {@link #StreamProcessor(Config, Map, TaskFactory, Optional, Optional, + * StreamProcessorLifecycleListenerFactory, JobCoordinator)} instead. + * * @param processorListener listener to the StreamProcessor life cycle - * @param jobCoordinator the instance of {@link JobCoordinator} */ + @Deprecated public StreamProcessor(Config config, Map<String, MetricsReporter> customMetricsReporters, TaskFactory taskFactory, ProcessorLifecycleListener processorListener, JobCoordinator jobCoordinator) { - this(config, customMetricsReporters, taskFactory, sp -> processorListener, jobCoordinator); + this(config, customMetricsReporters, taskFactory, Optional.empty(), Optional.empty(), sp -> processorListener, + jobCoordinator); } /** - * Same as {@link #StreamProcessor(Config, Map, TaskFactory, ProcessorLifecycleListener, JobCoordinator)}, except - * there is a {@link StreamProcessorLifecycleListenerFactory} as input instead of {@link ProcessorLifecycleListener}. - * This is useful to create a {@link ProcessorLifecycleListener} with a reference to this {@link StreamProcessor} + * Builds a {@link StreamProcessor} with full specification of processing components. * * @param config configuration required to launch {@link JobCoordinator} and {@link SamzaContainer} - * @param customMetricsReporters metric Reporter + * @param customMetricsReporters registered with the metrics system to report metrics * @param taskFactory task factory to instantiate the Task - * @param listenerFactory listener to the StreamProcessor life cycle + * @param applicationDefinedContainerContextFactoryOptional optional factory for application-defined container context + * @param applicationDefinedTaskContextFactoryOptional optional factory for application-defined task context + * @param listenerFactory factory for creating a listener to the StreamProcessor life cycle * @param jobCoordinator the instance of {@link JobCoordinator} */ public StreamProcessor(Config config, Map<String, MetricsReporter> customMetricsReporters, TaskFactory taskFactory, + Optional<ApplicationContainerContextFactory<ApplicationContainerContext>> applicationDefinedContainerContextFactoryOptional, + Optional<ApplicationTaskContextFactory<ApplicationTaskContext>> applicationDefinedTaskContextFactoryOptional, StreamProcessorLifecycleListenerFactory listenerFactory, JobCoordinator jobCoordinator) { Preconditions.checkNotNull(listenerFactory, "StreamProcessorListenerFactory cannot be null."); - this.taskFactory = taskFactory; this.config = config; - this.taskShutdownMs = new TaskConfigJava(config).getShutdownMs(); this.customMetricsReporter = customMetricsReporters; + this.taskFactory = taskFactory; + this.applicationDefinedContainerContextFactoryOptional = applicationDefinedContainerContextFactoryOptional; + this.applicationDefinedTaskContextFactoryOptional = applicationDefinedTaskContextFactoryOptional; + this.taskShutdownMs = new TaskConfigJava(config).getShutdownMs(); this.jobCoordinator = (jobCoordinator != null) ? jobCoordinator : createJobCoordinator(); this.jobCoordinatorListener = createJobCoordinatorListener(); this.jobCoordinator.setListener(jobCoordinatorListener); @@ -283,7 +304,10 @@ public class StreamProcessor { @VisibleForTesting SamzaContainer createSamzaContainer(String processorId, JobModel jobModel) { - return SamzaContainer.apply(processorId, jobModel, config, ScalaJavaUtil.toScalaMap(customMetricsReporter), taskFactory); + return SamzaContainer.apply(processorId, jobModel, ScalaJavaUtil.toScalaMap(this.customMetricsReporter), + this.taskFactory, JobContextImpl.fromConfigWithDefaults(this.config), + Option.apply(this.applicationDefinedContainerContextFactoryOptional.orElse(null)), + Option.apply(this.applicationDefinedTaskContextFactoryOptional.orElse(null))); } private JobCoordinator createJobCoordinator() { http://git-wip-us.apache.org/repos/asf/samza/blob/9d2d49e9/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java b/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java index 7100482..a5eeba1 100644 --- a/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java +++ b/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java @@ -167,7 +167,8 @@ public class LocalApplicationRunner implements ApplicationRunner { // TODO: the null processorId has to be fixed after SAMZA-1835 appDesc.getMetricsReporterFactories().forEach((name, factory) -> reporters.put(name, factory.getMetricsReporter(name, null, config))); - return new StreamProcessor(config, reporters, taskFactory, listenerFactory, null); + return new StreamProcessor(config, reporters, taskFactory, appDesc.getApplicationContainerContextFactory(), + appDesc.getApplicationTaskContextFactory(), listenerFactory, null); } /** http://git-wip-us.apache.org/repos/asf/samza/blob/9d2d49e9/samza-core/src/main/java/org/apache/samza/runtime/LocalContainerRunner.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/runtime/LocalContainerRunner.java b/samza-core/src/main/java/org/apache/samza/runtime/LocalContainerRunner.java index add7e69..94ff1eb 100644 --- a/samza-core/src/main/java/org/apache/samza/runtime/LocalContainerRunner.java +++ b/samza-core/src/main/java/org/apache/samza/runtime/LocalContainerRunner.java @@ -25,8 +25,8 @@ import java.util.Random; import org.slf4j.MDC; import org.apache.samza.SamzaException; import org.apache.samza.application.ApplicationDescriptor; -import org.apache.samza.application.ApplicationDescriptorUtil; import org.apache.samza.application.ApplicationDescriptorImpl; +import org.apache.samza.application.ApplicationDescriptorUtil; import org.apache.samza.application.ApplicationUtil; import org.apache.samza.config.Config; import org.apache.samza.config.JobConfig; @@ -36,6 +36,7 @@ import org.apache.samza.container.ContainerHeartbeatMonitor; import org.apache.samza.container.SamzaContainer; import org.apache.samza.container.SamzaContainer$; import org.apache.samza.container.SamzaContainerListener; +import org.apache.samza.context.JobContextImpl; import org.apache.samza.job.model.JobModel; import org.apache.samza.metrics.MetricsReporter; import org.apache.samza.task.TaskFactory; @@ -44,6 +45,8 @@ import org.apache.samza.util.SamzaUncaughtExceptionHandler; import org.apache.samza.util.ScalaJavaUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import scala.Option; + /** * Launches and manages the lifecycle for {@link SamzaContainer}s in YARN. @@ -93,9 +96,11 @@ public class LocalContainerRunner { SamzaContainer container = SamzaContainer$.MODULE$.apply( containerId, jobModel, - config, ScalaJavaUtil.toScalaMap(loadMetricsReporters(appDesc, containerId, config)), - taskFactory); + taskFactory, + JobContextImpl.fromConfigWithDefaults(config), + Option.apply(appDesc.getApplicationContainerContextFactory().orElse(null)), + Option.apply(appDesc.getApplicationTaskContextFactory().orElse(null))); ProcessorLifecycleListener listener = appDesc.getProcessorLifecycleListenerFactory() .createInstance(new ProcessorContext() { }, config); http://git-wip-us.apache.org/repos/asf/samza/blob/9d2d49e9/samza-core/src/main/java/org/apache/samza/storage/StorageRecovery.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/storage/StorageRecovery.java b/samza-core/src/main/java/org/apache/samza/storage/StorageRecovery.java index 9a76d75..be074ee 100644 --- a/samza-core/src/main/java/org/apache/samza/storage/StorageRecovery.java +++ b/samza-core/src/main/java/org/apache/samza/storage/StorageRecovery.java @@ -27,13 +27,14 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Map.Entry; - import org.apache.samza.SamzaException; import org.apache.samza.config.Config; import org.apache.samza.config.JavaStorageConfig; import org.apache.samza.config.JavaSystemConfig; import org.apache.samza.config.StorageConfig; -import org.apache.samza.container.SamzaContainerContext; +import org.apache.samza.context.ContainerContext; +import org.apache.samza.context.ContainerContextImpl; +import org.apache.samza.context.JobContextImpl; import org.apache.samza.coordinator.JobModelManager; import org.apache.samza.coordinator.stream.CoordinatorStreamManager; import org.apache.samza.job.model.ContainerModel; @@ -209,8 +210,7 @@ public class StorageRecovery extends CommandLine { for (ContainerModel containerModel : containers.values()) { HashMap<String, StorageEngine> taskStores = new HashMap<String, StorageEngine>(); - SamzaContainerContext containerContext = new SamzaContainerContext(containerModel.getId(), jobConfig, containerModel.getTasks() - .keySet(), new MetricsRegistryMap()); + ContainerContext containerContext = new ContainerContextImpl(containerModel, new MetricsRegistryMap()); for (TaskModel taskModel : containerModel.getTasks().values()) { HashMap<String, SystemConsumer> storeConsumers = getStoreConsumers(); @@ -233,6 +233,7 @@ public class StorageRecovery extends CommandLine { null, new MetricsRegistryMap(), changeLogSystemStreamPartition, + JobContextImpl.fromConfigWithDefaults(jobConfig), containerContext); taskStores.put(storeName, storageEngine); } http://git-wip-us.apache.org/repos/asf/samza/blob/9d2d49e9/samza-core/src/main/java/org/apache/samza/table/TableManager.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/table/TableManager.java b/samza-core/src/main/java/org/apache/samza/table/TableManager.java index ae72414..d7b15a4 100644 --- a/samza-core/src/main/java/org/apache/samza/table/TableManager.java +++ b/samza-core/src/main/java/org/apache/samza/table/TableManager.java @@ -18,21 +18,19 @@ */ package org.apache.samza.table; -import java.util.HashMap; -import java.util.Map; - +import com.google.common.base.Preconditions; import org.apache.samza.SamzaException; import org.apache.samza.config.Config; import org.apache.samza.config.JavaTableConfig; -import org.apache.samza.container.SamzaContainerContext; +import org.apache.samza.context.Context; import org.apache.samza.serializers.KVSerde; import org.apache.samza.serializers.Serde; -import org.apache.samza.task.TaskContext; import org.apache.samza.util.Util; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.google.common.base.Preconditions; +import java.util.HashMap; +import java.util.Map; /** @@ -97,12 +95,10 @@ public class TableManager { /** * Initialize table providers with container and task contexts - * @param containerContext context for the Samza container - * @param taskContext context for the current task, nullable for global tables + * @param context context for the task */ - public void init(SamzaContainerContext containerContext, TaskContext taskContext) { - Preconditions.checkNotNull(containerContext, "null container context."); - tableContexts.values().forEach(ctx -> ctx.tableProvider.init(containerContext, taskContext)); + public void init(Context context) { + tableContexts.values().forEach(ctx -> ctx.tableProvider.init(context)); initialized = true; } http://git-wip-us.apache.org/repos/asf/samza/blob/9d2d49e9/samza-core/src/main/java/org/apache/samza/table/caching/CachingTable.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/table/caching/CachingTable.java b/samza-core/src/main/java/org/apache/samza/table/caching/CachingTable.java index b7aa33c..32d2bed 100644 --- a/samza-core/src/main/java/org/apache/samza/table/caching/CachingTable.java +++ b/samza-core/src/main/java/org/apache/samza/table/caching/CachingTable.java @@ -19,25 +19,23 @@ package org.apache.samza.table.caching; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.atomic.AtomicLong; -import java.util.stream.Collectors; - +import com.google.common.base.Preconditions; import org.apache.samza.SamzaException; -import org.apache.samza.container.SamzaContainerContext; +import org.apache.samza.context.Context; import org.apache.samza.storage.kv.Entry; import org.apache.samza.table.ReadWriteTable; import org.apache.samza.table.ReadableTable; import org.apache.samza.table.utils.DefaultTableReadMetrics; import org.apache.samza.table.utils.DefaultTableWriteMetrics; import org.apache.samza.table.utils.TableMetricsUtil; -import org.apache.samza.task.TaskContext; -import com.google.common.base.Preconditions; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.atomic.AtomicLong; +import java.util.stream.Collectors; /** @@ -91,10 +89,10 @@ public class CachingTable<K, V> implements ReadWriteTable<K, V> { * {@inheritDoc} */ @Override - public void init(SamzaContainerContext containerContext, TaskContext taskContext) { - readMetrics = new DefaultTableReadMetrics(containerContext, taskContext, this, tableId); - writeMetrics = new DefaultTableWriteMetrics(containerContext, taskContext, this, tableId); - TableMetricsUtil tableMetricsUtil = new TableMetricsUtil(containerContext, taskContext, this, tableId); + public void init(Context context) { + readMetrics = new DefaultTableReadMetrics(context, this, tableId); + writeMetrics = new DefaultTableWriteMetrics(context, this, tableId); + TableMetricsUtil tableMetricsUtil = new TableMetricsUtil(context, this, tableId); tableMetricsUtil.newGauge("hit-rate", () -> hitRate()); tableMetricsUtil.newGauge("miss-rate", () -> missRate()); tableMetricsUtil.newGauge("req-count", () -> requestCount()); http://git-wip-us.apache.org/repos/asf/samza/blob/9d2d49e9/samza-core/src/main/java/org/apache/samza/table/caching/CachingTableProvider.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/table/caching/CachingTableProvider.java b/samza-core/src/main/java/org/apache/samza/table/caching/CachingTableProvider.java index d5f7767..c959a56 100644 --- a/samza-core/src/main/java/org/apache/samza/table/caching/CachingTableProvider.java +++ b/samza-core/src/main/java/org/apache/samza/table/caching/CachingTableProvider.java @@ -54,13 +54,13 @@ public class CachingTableProvider extends BaseTableProvider { @Override public Table getTable() { String realTableId = tableSpec.getConfig().get(REAL_TABLE_ID); - ReadableTable table = (ReadableTable) taskContext.getTable(realTableId); + ReadableTable table = (ReadableTable) this.context.getTaskContext().getTable(realTableId); String cacheTableId = tableSpec.getConfig().get(CACHE_TABLE_ID); ReadWriteTable cache; if (cacheTableId != null) { - cache = (ReadWriteTable) taskContext.getTable(cacheTableId); + cache = (ReadWriteTable) this.context.getTaskContext().getTable(cacheTableId); } else { cache = createDefaultCacheTable(realTableId); defaultCaches.add(cache); @@ -68,7 +68,7 @@ public class CachingTableProvider extends BaseTableProvider { boolean isWriteAround = Boolean.parseBoolean(tableSpec.getConfig().get(WRITE_AROUND)); CachingTable cachingTable = new CachingTable(tableSpec.getId(), table, cache, isWriteAround); - cachingTable.init(containerContext, taskContext); + cachingTable.init(this.context); return cachingTable; } @@ -97,7 +97,7 @@ public class CachingTableProvider extends BaseTableProvider { readTtlMs, writeTtlMs, cacheSize)); GuavaCacheTable cacheTable = new GuavaCacheTable(tableId + "-def-cache", cacheBuilder.build()); - cacheTable.init(containerContext, taskContext); + cacheTable.init(this.context); return cacheTable; } http://git-wip-us.apache.org/repos/asf/samza/blob/9d2d49e9/samza-core/src/main/java/org/apache/samza/table/caching/guava/GuavaCacheTable.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/table/caching/guava/GuavaCacheTable.java b/samza-core/src/main/java/org/apache/samza/table/caching/guava/GuavaCacheTable.java index a8beb3b..5f77ee4 100644 --- a/samza-core/src/main/java/org/apache/samza/table/caching/guava/GuavaCacheTable.java +++ b/samza-core/src/main/java/org/apache/samza/table/caching/guava/GuavaCacheTable.java @@ -19,19 +19,17 @@ package org.apache.samza.table.caching.guava; -import java.util.ArrayList; -import java.util.List; -import java.util.Map; -import java.util.concurrent.CompletableFuture; - +import com.google.common.cache.Cache; import org.apache.samza.SamzaException; -import org.apache.samza.container.SamzaContainerContext; +import org.apache.samza.context.Context; import org.apache.samza.storage.kv.Entry; import org.apache.samza.table.ReadWriteTable; import org.apache.samza.table.utils.TableMetricsUtil; -import org.apache.samza.task.TaskContext; -import com.google.common.cache.Cache; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; /** @@ -54,8 +52,8 @@ public class GuavaCacheTable<K, V> implements ReadWriteTable<K, V> { * {@inheritDoc} */ @Override - public void init(SamzaContainerContext containerContext, TaskContext taskContext) { - TableMetricsUtil tableMetricsUtil = new TableMetricsUtil(containerContext, taskContext, this, tableId); + public void init(Context context) { + TableMetricsUtil tableMetricsUtil = new TableMetricsUtil(context, this, tableId); // hit- and miss-rate are provided by CachingTable. tableMetricsUtil.newGauge("evict-count", () -> cache.stats().evictionCount()); } http://git-wip-us.apache.org/repos/asf/samza/blob/9d2d49e9/samza-core/src/main/java/org/apache/samza/table/caching/guava/GuavaCacheTableProvider.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/table/caching/guava/GuavaCacheTableProvider.java b/samza-core/src/main/java/org/apache/samza/table/caching/guava/GuavaCacheTableProvider.java index 1513249..39f332e 100644 --- a/samza-core/src/main/java/org/apache/samza/table/caching/guava/GuavaCacheTableProvider.java +++ b/samza-core/src/main/java/org/apache/samza/table/caching/guava/GuavaCacheTableProvider.java @@ -47,7 +47,7 @@ public class GuavaCacheTableProvider extends BaseTableProvider { public Table getTable() { Cache guavaCache = SerdeUtils.deserialize(GUAVA_CACHE, tableSpec.getConfig().get(GUAVA_CACHE)); GuavaCacheTable table = new GuavaCacheTable(tableSpec.getId(), guavaCache); - table.init(containerContext, taskContext); + table.init(this.context); guavaTables.add(table); return table; } http://git-wip-us.apache.org/repos/asf/samza/blob/9d2d49e9/samza-core/src/main/java/org/apache/samza/table/remote/RemoteReadWriteTable.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/table/remote/RemoteReadWriteTable.java b/samza-core/src/main/java/org/apache/samza/table/remote/RemoteReadWriteTable.java index 9ef4c1b..4cbc270 100644 --- a/samza-core/src/main/java/org/apache/samza/table/remote/RemoteReadWriteTable.java +++ b/samza-core/src/main/java/org/apache/samza/table/remote/RemoteReadWriteTable.java @@ -19,21 +19,19 @@ package org.apache.samza.table.remote; -import java.util.List; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ExecutorService; -import java.util.stream.Collectors; - +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; import org.apache.samza.SamzaException; -import org.apache.samza.container.SamzaContainerContext; +import org.apache.samza.context.Context; import org.apache.samza.storage.kv.Entry; import org.apache.samza.table.ReadWriteTable; import org.apache.samza.table.utils.DefaultTableWriteMetrics; import org.apache.samza.table.utils.TableMetricsUtil; -import org.apache.samza.task.TaskContext; -import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Preconditions; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutorService; +import java.util.stream.Collectors; /** @@ -63,10 +61,10 @@ public class RemoteReadWriteTable<K, V> extends RemoteReadableTable<K, V> implem * {@inheritDoc} */ @Override - public void init(SamzaContainerContext containerContext, TaskContext taskContext) { - super.init(containerContext, taskContext); - writeMetrics = new DefaultTableWriteMetrics(containerContext, taskContext, this, tableId); - TableMetricsUtil tableMetricsUtil = new TableMetricsUtil(containerContext, taskContext, this, tableId); + public void init(Context context) { + super.init(context); + writeMetrics = new DefaultTableWriteMetrics(context, this, tableId); + TableMetricsUtil tableMetricsUtil = new TableMetricsUtil(context, this, tableId); writeRateLimiter.setTimerMetric(tableMetricsUtil.newTimer("put-throttle-ns")); } http://git-wip-us.apache.org/repos/asf/samza/blob/9d2d49e9/samza-core/src/main/java/org/apache/samza/table/remote/RemoteReadableTable.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/table/remote/RemoteReadableTable.java b/samza-core/src/main/java/org/apache/samza/table/remote/RemoteReadableTable.java index b3d82f3..9487e39 100644 --- a/samza-core/src/main/java/org/apache/samza/table/remote/RemoteReadableTable.java +++ b/samza-core/src/main/java/org/apache/samza/table/remote/RemoteReadableTable.java @@ -19,28 +19,26 @@ package org.apache.samza.table.remote; -import java.util.Collection; -import java.util.Collections; -import java.util.List; -import java.util.Map; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ExecutorService; -import java.util.function.BiFunction; -import java.util.function.Function; - +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; import org.apache.samza.SamzaException; -import org.apache.samza.container.SamzaContainerContext; +import org.apache.samza.context.Context; import org.apache.samza.metrics.Timer; import org.apache.samza.storage.kv.Entry; import org.apache.samza.table.ReadableTable; import org.apache.samza.table.utils.DefaultTableReadMetrics; import org.apache.samza.table.utils.TableMetricsUtil; -import org.apache.samza.task.TaskContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Preconditions; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutorService; +import java.util.function.BiFunction; +import java.util.function.Function; /** @@ -110,9 +108,9 @@ public class RemoteReadableTable<K, V> implements ReadableTable<K, V> { * {@inheritDoc} */ @Override - public void init(SamzaContainerContext containerContext, TaskContext taskContext) { - readMetrics = new DefaultTableReadMetrics(containerContext, taskContext, this, tableId); - TableMetricsUtil tableMetricsUtil = new TableMetricsUtil(containerContext, taskContext, this, tableId); + public void init(Context context) { + readMetrics = new DefaultTableReadMetrics(context, this, tableId); + TableMetricsUtil tableMetricsUtil = new TableMetricsUtil(context, this, tableId); readRateLimiter.setTimerMetric(tableMetricsUtil.newTimer("get-throttle-ns")); } http://git-wip-us.apache.org/repos/asf/samza/blob/9d2d49e9/samza-core/src/main/java/org/apache/samza/table/remote/RemoteTableProvider.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/table/remote/RemoteTableProvider.java b/samza-core/src/main/java/org/apache/samza/table/remote/RemoteTableProvider.java index cae0bbd..9415e70 100644 --- a/samza-core/src/main/java/org/apache/samza/table/remote/RemoteTableProvider.java +++ b/samza-core/src/main/java/org/apache/samza/table/remote/RemoteTableProvider.java @@ -19,14 +19,6 @@ package org.apache.samza.table.remote; -import java.util.ArrayList; -import java.util.List; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; - import org.apache.samza.table.Table; import org.apache.samza.table.TableSpec; import org.apache.samza.table.retry.RetriableReadFunction; @@ -37,6 +29,14 @@ import org.apache.samza.table.utils.SerdeUtils; import org.apache.samza.table.utils.TableMetricsUtil; import org.apache.samza.util.RateLimiter; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; + import static org.apache.samza.table.remote.RemoteTableDescriptor.RL_READ_TAG; import static org.apache.samza.table.remote.RemoteTableDescriptor.RL_WRITE_TAG; @@ -83,7 +83,7 @@ public class RemoteTableProvider extends BaseTableProvider { TableReadFunction readFn = getReadFn(); RateLimiter rateLimiter = deserializeObject(RATE_LIMITER); if (rateLimiter != null) { - rateLimiter.init(containerContext.config, taskContext); + rateLimiter.init(this.context); } TableRateLimiter.CreditFunction<?, ?> readCreditFn = deserializeObject(READ_CREDIT_FN); TableRateLimiter readRateLimiter = new TableRateLimiter(tableSpec.getId(), rateLimiter, readCreditFn, RL_READ_TAG); @@ -150,7 +150,7 @@ public class RemoteTableProvider extends BaseTableProvider { writeRateLimiter, tableExecutors.get(tableId), callbackExecutors.get(tableId)); } - TableMetricsUtil metricsUtil = new TableMetricsUtil(containerContext, taskContext, table, tableId); + TableMetricsUtil metricsUtil = new TableMetricsUtil(this.context, table, tableId); if (readRetryPolicy != null) { ((RetriableReadFunction) readFn).setMetrics(metricsUtil); } @@ -158,7 +158,7 @@ public class RemoteTableProvider extends BaseTableProvider { ((RetriableWriteFunction) writeFn).setMetrics(metricsUtil); } - table.init(containerContext, taskContext); + table.init(this.context); tables.add(table); return table; } @@ -184,7 +184,7 @@ public class RemoteTableProvider extends BaseTableProvider { private TableReadFunction<?, ?> getReadFn() { TableReadFunction<?, ?> readFn = deserializeObject(READ_FN); if (readFn != null) { - readFn.init(containerContext.config, taskContext); + readFn.init(this.context); } return readFn; } @@ -192,7 +192,7 @@ public class RemoteTableProvider extends BaseTableProvider { private TableWriteFunction<?, ?> getWriteFn() { TableWriteFunction<?, ?> writeFn = deserializeObject(WRITE_FN); if (writeFn != null) { - writeFn.init(containerContext.config, taskContext); + writeFn.init(this.context); } return writeFn; } http://git-wip-us.apache.org/repos/asf/samza/blob/9d2d49e9/samza-core/src/main/java/org/apache/samza/table/utils/BaseTableProvider.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/table/utils/BaseTableProvider.java b/samza-core/src/main/java/org/apache/samza/table/utils/BaseTableProvider.java index 960e2a4..dfbd835 100644 --- a/samza-core/src/main/java/org/apache/samza/table/utils/BaseTableProvider.java +++ b/samza-core/src/main/java/org/apache/samza/table/utils/BaseTableProvider.java @@ -22,10 +22,9 @@ import java.util.HashMap; import java.util.Map; import org.apache.samza.config.Config; import org.apache.samza.config.JavaTableConfig; -import org.apache.samza.container.SamzaContainerContext; +import org.apache.samza.context.Context; import org.apache.samza.table.TableProvider; import org.apache.samza.table.TableSpec; -import org.apache.samza.task.TaskContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -39,8 +38,7 @@ abstract public class BaseTableProvider implements TableProvider { final protected TableSpec tableSpec; - protected SamzaContainerContext containerContext; - protected TaskContext taskContext; + protected Context context; public BaseTableProvider(TableSpec tableSpec) { this.tableSpec = tableSpec; @@ -50,9 +48,8 @@ abstract public class BaseTableProvider implements TableProvider { * {@inheritDoc} */ @Override - public void init(SamzaContainerContext containerContext, TaskContext taskContext) { - this.containerContext = containerContext; - this.taskContext = taskContext; + public void init(Context context) { + this.context = context; } /** http://git-wip-us.apache.org/repos/asf/samza/blob/9d2d49e9/samza-core/src/main/java/org/apache/samza/table/utils/DefaultTableReadMetrics.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/table/utils/DefaultTableReadMetrics.java b/samza-core/src/main/java/org/apache/samza/table/utils/DefaultTableReadMetrics.java index 2acd082..090c8c1 100644 --- a/samza-core/src/main/java/org/apache/samza/table/utils/DefaultTableReadMetrics.java +++ b/samza-core/src/main/java/org/apache/samza/table/utils/DefaultTableReadMetrics.java @@ -18,11 +18,10 @@ */ package org.apache.samza.table.utils; -import org.apache.samza.container.SamzaContainerContext; +import org.apache.samza.context.Context; import org.apache.samza.metrics.Counter; import org.apache.samza.metrics.Timer; import org.apache.samza.table.Table; -import org.apache.samza.task.TaskContext; /** @@ -39,14 +38,12 @@ public class DefaultTableReadMetrics { /** * Constructor based on container and task container context * - * @param containerContext container context - * @param taskContext task context + * @param context {@link Context} for this task * @param table underlying table * @param tableId table Id */ - public DefaultTableReadMetrics(SamzaContainerContext containerContext, TaskContext taskContext, - Table table, String tableId) { - TableMetricsUtil tableMetricsUtil = new TableMetricsUtil(containerContext, taskContext, table, tableId); + public DefaultTableReadMetrics(Context context, Table table, String tableId) { + TableMetricsUtil tableMetricsUtil = new TableMetricsUtil(context, table, tableId); getNs = tableMetricsUtil.newTimer("get-ns"); getAllNs = tableMetricsUtil.newTimer("getAll-ns"); numGets = tableMetricsUtil.newCounter("num-gets"); http://git-wip-us.apache.org/repos/asf/samza/blob/9d2d49e9/samza-core/src/main/java/org/apache/samza/table/utils/DefaultTableWriteMetrics.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/table/utils/DefaultTableWriteMetrics.java b/samza-core/src/main/java/org/apache/samza/table/utils/DefaultTableWriteMetrics.java index a32d6d5..69d4ef2 100644 --- a/samza-core/src/main/java/org/apache/samza/table/utils/DefaultTableWriteMetrics.java +++ b/samza-core/src/main/java/org/apache/samza/table/utils/DefaultTableWriteMetrics.java @@ -18,11 +18,10 @@ */ package org.apache.samza.table.utils; -import org.apache.samza.container.SamzaContainerContext; +import org.apache.samza.context.Context; import org.apache.samza.metrics.Counter; import org.apache.samza.metrics.Timer; import org.apache.samza.table.Table; -import org.apache.samza.task.TaskContext; public class DefaultTableWriteMetrics { @@ -43,14 +42,12 @@ public class DefaultTableWriteMetrics { /** * Utility class that contains the default set of write metrics. * - * @param containerContext container context - * @param taskContext task context + * @param context {@link Context} for this task * @param table underlying table * @param tableId table Id */ - public DefaultTableWriteMetrics(SamzaContainerContext containerContext, TaskContext taskContext, - Table table, String tableId) { - TableMetricsUtil tableMetricsUtil = new TableMetricsUtil(containerContext, taskContext, table, tableId); + public DefaultTableWriteMetrics(Context context, Table table, String tableId) { + TableMetricsUtil tableMetricsUtil = new TableMetricsUtil(context, table, tableId); putNs = tableMetricsUtil.newTimer("put-ns"); putAllNs = tableMetricsUtil.newTimer("putAll-ns"); deleteNs = tableMetricsUtil.newTimer("delete-ns"); http://git-wip-us.apache.org/repos/asf/samza/blob/9d2d49e9/samza-core/src/main/java/org/apache/samza/table/utils/TableMetricsUtil.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/table/utils/TableMetricsUtil.java b/samza-core/src/main/java/org/apache/samza/table/utils/TableMetricsUtil.java index 6805c64..1b19272 100644 --- a/samza-core/src/main/java/org/apache/samza/table/utils/TableMetricsUtil.java +++ b/samza-core/src/main/java/org/apache/samza/table/utils/TableMetricsUtil.java @@ -19,18 +19,16 @@ package org.apache.samza.table.utils; -import java.util.function.Supplier; - import com.google.common.base.Preconditions; - -import org.apache.samza.container.SamzaContainerContext; +import org.apache.samza.context.Context; import org.apache.samza.metrics.Counter; import org.apache.samza.metrics.Gauge; import org.apache.samza.metrics.MetricsRegistry; import org.apache.samza.metrics.Timer; import org.apache.samza.table.Table; import org.apache.samza.table.caching.SupplierGauge; -import org.apache.samza.task.TaskContext; + +import java.util.function.Supplier; /** @@ -46,21 +44,16 @@ public class TableMetricsUtil { /** * Constructor based on container context * - * @param containerContext container context - * @param taskContext task context + * @param context {@link Context} for this task * @param table underlying table * @param tableId table Id */ - public TableMetricsUtil(SamzaContainerContext containerContext, TaskContext taskContext, - Table table, String tableId) { - - Preconditions.checkNotNull(containerContext); + public TableMetricsUtil(Context context, Table table, String tableId) { + Preconditions.checkNotNull(context); Preconditions.checkNotNull(table); Preconditions.checkNotNull(tableId); - this.metricsRegistry = taskContext == null // The table is at container level, when the task - ? containerContext.metricsRegistry // context passed in is null - : taskContext.getMetricsRegistry(); + this.metricsRegistry = context.getTaskContext().getTaskMetricsRegistry(); this.groupName = table.getClass().getSimpleName(); this.tableId = tableId; } http://git-wip-us.apache.org/repos/asf/samza/blob/9d2d49e9/samza-core/src/main/java/org/apache/samza/task/AsyncRunLoop.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/task/AsyncRunLoop.java b/samza-core/src/main/java/org/apache/samza/task/AsyncRunLoop.java index 111869c..6c255f1 100644 --- a/samza-core/src/main/java/org/apache/samza/task/AsyncRunLoop.java +++ b/samza-core/src/main/java/org/apache/samza/task/AsyncRunLoop.java @@ -33,16 +33,15 @@ import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; - import org.apache.samza.SamzaException; import org.apache.samza.container.SamzaContainerMetrics; import org.apache.samza.container.TaskInstance; import org.apache.samza.container.TaskInstanceMetrics; import org.apache.samza.container.TaskName; -import org.apache.samza.util.HighResolutionClock; import org.apache.samza.system.IncomingMessageEnvelope; import org.apache.samza.system.SystemConsumers; import org.apache.samza.system.SystemStreamPartition; +import org.apache.samza.util.HighResolutionClock; import org.apache.samza.util.Throttleable; import org.apache.samza.util.ThrottlingScheduler; import org.slf4j.Logger; @@ -374,7 +373,7 @@ public class AsyncRunLoop implements Runnable, Throttleable { }, commitMs, commitMs, TimeUnit.MILLISECONDS); } - final EpochTimeScheduler epochTimeScheduler = task.context().getTimerScheduler(); + final EpochTimeScheduler epochTimeScheduler = task.epochTimeScheduler(); if (epochTimeScheduler != null) { epochTimeScheduler.registerListener(() -> { state.needScheduler(); http://git-wip-us.apache.org/repos/asf/samza/blob/9d2d49e9/samza-core/src/main/java/org/apache/samza/task/AsyncStreamTaskAdapter.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/task/AsyncStreamTaskAdapter.java b/samza-core/src/main/java/org/apache/samza/task/AsyncStreamTaskAdapter.java index e2fea95..fcd9766 100644 --- a/samza-core/src/main/java/org/apache/samza/task/AsyncStreamTaskAdapter.java +++ b/samza-core/src/main/java/org/apache/samza/task/AsyncStreamTaskAdapter.java @@ -20,7 +20,7 @@ package org.apache.samza.task; import java.util.concurrent.ExecutorService; -import org.apache.samza.config.Config; +import org.apache.samza.context.Context; import org.apache.samza.system.IncomingMessageEnvelope; @@ -40,9 +40,9 @@ public class AsyncStreamTaskAdapter implements AsyncStreamTask, InitableTask, Wi } @Override - public void init(Config config, TaskContext context) throws Exception { + public void init(Context context) throws Exception { if (wrappedTask instanceof InitableTask) { - ((InitableTask) wrappedTask).init(config, context); + ((InitableTask) wrappedTask).init(context); } } http://git-wip-us.apache.org/repos/asf/samza/blob/9d2d49e9/samza-core/src/main/java/org/apache/samza/task/StreamOperatorTask.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/task/StreamOperatorTask.java b/samza-core/src/main/java/org/apache/samza/task/StreamOperatorTask.java index aa896c2..218ba5d 100644 --- a/samza-core/src/main/java/org/apache/samza/task/StreamOperatorTask.java +++ b/samza-core/src/main/java/org/apache/samza/task/StreamOperatorTask.java @@ -18,14 +18,13 @@ */ package org.apache.samza.task; -import org.apache.samza.config.Config; +import org.apache.samza.context.Context; import org.apache.samza.operators.OperatorSpecGraph; -import org.apache.samza.system.EndOfStreamMessage; -import org.apache.samza.system.MessageType; -import org.apache.samza.operators.ContextManager; import org.apache.samza.operators.impl.InputOperatorImpl; import org.apache.samza.operators.impl.OperatorImplGraph; +import org.apache.samza.system.EndOfStreamMessage; import org.apache.samza.system.IncomingMessageEnvelope; +import org.apache.samza.system.MessageType; import org.apache.samza.system.SystemStream; import org.apache.samza.system.WatermarkMessage; import org.apache.samza.util.Clock; @@ -42,8 +41,6 @@ public class StreamOperatorTask implements StreamTask, InitableTask, WindowableT private static final Logger LOG = LoggerFactory.getLogger(StreamOperatorTask.class); private final OperatorSpecGraph specGraph; - // TODO: to be replaced by proper scope of shared context factory in SAMZA-1714 - private final ContextManager contextManager; private final Clock clock; private OperatorImplGraph operatorImplGraph; @@ -52,17 +49,15 @@ public class StreamOperatorTask implements StreamTask, InitableTask, WindowableT * Constructs an adaptor task to run the user-implemented {@link OperatorSpecGraph}. * @param specGraph the serialized version of user-implemented {@link OperatorSpecGraph} * that includes the logical DAG - * @param contextManager the {@link ContextManager} used to set up the shared context used by operators in the DAG * @param clock the {@link Clock} to use for time-keeping */ - public StreamOperatorTask(OperatorSpecGraph specGraph, ContextManager contextManager, Clock clock) { + public StreamOperatorTask(OperatorSpecGraph specGraph, Clock clock) { this.specGraph = specGraph.clone(); - this.contextManager = contextManager; this.clock = clock; } - public StreamOperatorTask(OperatorSpecGraph specGraph, ContextManager contextManager) { - this(specGraph, contextManager, SystemClock.instance()); + public StreamOperatorTask(OperatorSpecGraph specGraph) { + this(specGraph, SystemClock.instance()); } /** @@ -75,20 +70,13 @@ public class StreamOperatorTask implements StreamTask, InitableTask, WindowableT * an immutable {@link OperatorSpecGraph} accordingly, which is passed in to this class to create the {@link OperatorImplGraph} * corresponding to the logical DAG. * - * @param config allows accessing of fields in the configuration files that this StreamTask is specified in * @param context allows initializing and accessing contextual data of this StreamTask * @throws Exception in case of initialization errors */ @Override - public final void init(Config config, TaskContext context) throws Exception { - - // get the user-implemented per task context manager and initialize it - if (this.contextManager != null) { - this.contextManager.init(config, context); - } - + public final void init(Context context) throws Exception { // create the operator impl DAG corresponding to the logical operator spec DAG - this.operatorImplGraph = new OperatorImplGraph(specGraph, config, context, clock); + this.operatorImplGraph = new OperatorImplGraph(specGraph, context, clock); } /** @@ -133,9 +121,6 @@ public class StreamOperatorTask implements StreamTask, InitableTask, WindowableT @Override public void close() throws Exception { - if (this.contextManager != null) { - this.contextManager.close(); - } if (operatorImplGraph != null) { operatorImplGraph.close(); } http://git-wip-us.apache.org/repos/asf/samza/blob/9d2d49e9/samza-core/src/main/java/org/apache/samza/task/TaskFactoryUtil.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/task/TaskFactoryUtil.java b/samza-core/src/main/java/org/apache/samza/task/TaskFactoryUtil.java index 834777b..c312fac 100644 --- a/samza-core/src/main/java/org/apache/samza/task/TaskFactoryUtil.java +++ b/samza-core/src/main/java/org/apache/samza/task/TaskFactoryUtil.java @@ -48,8 +48,8 @@ public class TaskFactoryUtil { if (appDesc instanceof TaskApplicationDescriptorImpl) { return ((TaskApplicationDescriptorImpl) appDesc).getTaskFactory(); } else if (appDesc instanceof StreamApplicationDescriptorImpl) { - return (StreamTaskFactory) () -> new StreamOperatorTask(((StreamApplicationDescriptorImpl) appDesc).getOperatorSpecGraph(), - ((StreamApplicationDescriptorImpl) appDesc).getContextManager()); + return (StreamTaskFactory) () -> new StreamOperatorTask( + ((StreamApplicationDescriptorImpl) appDesc).getOperatorSpecGraph()); } throw new IllegalArgumentException(String.format("ApplicationDescriptorImpl has to be either TaskApplicationDescriptorImpl or " + "StreamApplicationDescriptorImpl. class %s is not supported", appDesc.getClass().getName())); http://git-wip-us.apache.org/repos/asf/samza/blob/9d2d49e9/samza-core/src/main/java/org/apache/samza/util/EmbeddedTaggedRateLimiter.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/util/EmbeddedTaggedRateLimiter.java b/samza-core/src/main/java/org/apache/samza/util/EmbeddedTaggedRateLimiter.java index 1cf9a9c..a91d663 100644 --- a/samza-core/src/main/java/org/apache/samza/util/EmbeddedTaggedRateLimiter.java +++ b/samza-core/src/main/java/org/apache/samza/util/EmbeddedTaggedRateLimiter.java @@ -18,21 +18,20 @@ */ package org.apache.samza.util; +import com.google.common.base.Preconditions; +import com.google.common.base.Stopwatch; +import org.apache.commons.lang3.tuple.ImmutablePair; +import org.apache.samza.container.TaskName; +import org.apache.samza.context.Context; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import java.util.Collections; import java.util.Map; import java.util.Set; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; -import org.apache.commons.lang3.tuple.ImmutablePair; -import org.apache.samza.config.Config; -import org.apache.samza.task.TaskContext; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.google.common.base.Preconditions; -import com.google.common.base.Stopwatch; - import static java.util.concurrent.TimeUnit.NANOSECONDS; @@ -106,16 +105,15 @@ public class EmbeddedTaggedRateLimiter implements RateLimiter { } @Override - public void init(Config config, TaskContext taskContext) { + public void init(Context context) { this.tagToRateLimiterMap = Collections.unmodifiableMap(tagToTargetRateMap.entrySet().stream() .map(e -> { String tag = e.getKey(); - int effectiveRate = e.getValue(); - if (taskContext != null) { - effectiveRate /= taskContext.getSamzaContainerContext().taskNames.size(); - LOGGER.info(String.format("Effective rate limit for task %s and tag %s is %d", - taskContext.getTaskName(), tag, effectiveRate)); - } + int numTasksInContainer = context.getContainerContext().getContainerModel().getTasks().keySet().size(); + int effectiveRate = e.getValue() / numTasksInContainer; + TaskName taskName = context.getTaskContext().getTaskModel().getTaskName(); + LOGGER.info(String.format("Effective rate limit for task %s and tag %s is %d", taskName, tag, + effectiveRate)); return new ImmutablePair<>(tag, com.google.common.util.concurrent.RateLimiter.create(effectiveRate)); }) .collect(Collectors.toMap(ImmutablePair::getKey, ImmutablePair::getValue)) http://git-wip-us.apache.org/repos/asf/samza/blob/9d2d49e9/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala index 3c10aae..3292986 100644 --- a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala +++ b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala @@ -43,6 +43,7 @@ import org.apache.samza.config._ import org.apache.samza.container.disk.DiskSpaceMonitor.Listener import org.apache.samza.container.disk.{DiskQuotaPolicyFactory, DiskSpaceMonitor, NoThrottlingDiskQuotaPolicyFactory, PollingScanDiskSpaceMonitor} import org.apache.samza.container.host.{StatisticsMonitorImpl, SystemMemoryStatistics, SystemStatisticsMonitor} +import org.apache.samza.context._ import org.apache.samza.job.model.{ContainerModel, JobModel} import org.apache.samza.metrics.{JmxServer, JvmMetrics, MetricsRegistryMap, MetricsReporter} import org.apache.samza.serializers._ @@ -122,9 +123,13 @@ object SamzaContainer extends Logging { def apply( containerId: String, jobModel: JobModel, - config: Config, customReporters: Map[String, MetricsReporter] = Map[String, MetricsReporter](), - taskFactory: TaskFactory[_]) = { + taskFactory: TaskFactory[_], + jobContext: JobContext, + applicationContainerContextFactoryOption: Option[ApplicationContainerContextFactory[ApplicationContainerContext]], + applicationTaskContextFactoryOption: Option[ApplicationTaskContextFactory[ApplicationTaskContext]] + ) = { + val config = jobContext.getConfig val containerModel = jobModel.getContainers.get(containerId) val containerName = "samza-container-%s" format containerId val maxChangeLogStreamPartitions = jobModel.maxChangeLogStreamPartitions @@ -488,8 +493,10 @@ object SamzaContainer extends Logging { .asScala .map(_.getTaskName) .toSet - val containerContext = new SamzaContainerContext(containerId, config, taskNames.asJava, samzaContainerMetrics.registry) + val containerContext = new ContainerContextImpl(containerModel, samzaContainerMetrics.registry) + val applicationContainerContextOption = applicationContainerContextFactoryOption + .map(_.create(jobContext, containerContext)) val storeWatchPaths = new util.HashSet[Path]() @@ -571,6 +578,7 @@ object SamzaContainer extends Logging { collector, taskInstanceMetrics.registry, changeLogSystemStreamPartition, + jobContext, containerContext) (storeName, storageEngine) } @@ -635,13 +643,11 @@ object SamzaContainer extends Logging { def createTaskInstance(task: Any): TaskInstance = new TaskInstance( task = task, - taskName = taskName, - config = config, + taskModel = taskModel, metrics = taskInstanceMetrics, systemAdmins = systemAdmins, consumerMultiplexer = consumerMultiplexer, collector = collector, - containerContext = containerContext, offsetManager = offsetManager, storageManager = storageManager, tableManager = tableManager, @@ -652,7 +658,11 @@ object SamzaContainer extends Logging { streamMetadataCache = streamMetadataCache, timerExecutor = timerExecutor, sideInputSSPs = taskSideInputSSPs, - sideInputStorageManager = sideInputStorageManager) + sideInputStorageManager = sideInputStorageManager, + jobContext = jobContext, + containerContext = containerContext, + applicationContainerContextOption = applicationContainerContextOption, + applicationTaskContextFactoryOption = applicationTaskContextFactoryOption) val taskInstance = createTaskInstance(task) @@ -708,7 +718,7 @@ object SamzaContainer extends Logging { info("Samza container setup complete.") new SamzaContainer( - containerContext = containerContext, + config = config, taskInstances = taskInstances, runLoop = runLoop, systemAdmins = systemAdmins, @@ -722,10 +732,11 @@ object SamzaContainer extends Logging { diskSpaceMonitor = diskSpaceMonitor, hostStatisticsMonitor = memoryStatisticsMonitor, taskThreadPool = taskThreadPool, - timerExecutor = timerExecutor) + timerExecutor = timerExecutor, + containerContext = containerContext, + applicationContainerContextOption = applicationContainerContextOption) } - /** * Builds the set of SSPs for all changelogs on this container. */ @@ -741,7 +752,7 @@ object SamzaContainer extends Logging { } class SamzaContainer( - containerContext: SamzaContainerContext, + config: Config, taskInstances: Map[TaskName, TaskInstance], runLoop: Runnable, systemAdmins: SystemAdmins, @@ -756,12 +767,14 @@ class SamzaContainer( reporters: Map[String, MetricsReporter] = Map(), jvm: JvmMetrics = null, taskThreadPool: ExecutorService = null, - timerExecutor: ScheduledExecutorService = Executors.newSingleThreadScheduledExecutor) extends Runnable with Logging { + timerExecutor: ScheduledExecutorService = Executors.newSingleThreadScheduledExecutor, + containerContext: ContainerContext, + applicationContainerContextOption: Option[ApplicationContainerContext]) extends Runnable with Logging { - val shutdownMs = containerContext.config.getShutdownMs.getOrElse(TaskConfigJava.DEFAULT_TASK_SHUTDOWN_MS) + val shutdownMs = config.getShutdownMs.getOrElse(TaskConfigJava.DEFAULT_TASK_SHUTDOWN_MS) var shutdownHookThread: Thread = null var jmxServer: JmxServer = null - val isAutoCommitEnabled = containerContext.config.isAutoCommitEnabled + val isAutoCommitEnabled = config.isAutoCommitEnabled @volatile private var status = SamzaContainerStatus.NOT_STARTED private var exceptionSeen: Throwable = null @@ -789,6 +802,7 @@ class SamzaContainer( status = SamzaContainerStatus.STARTING jmxServer = new JmxServer() + applicationContainerContextOption.foreach(_.start) startMetrics startDiagnostics @@ -841,6 +855,8 @@ class SamzaContainer( shutdownSecurityManger shutdownAdmins + applicationContainerContextOption.foreach(_.stop) + if (!status.equals(SamzaContainerStatus.FAILED)) { status = SamzaContainerStatus.STOPPED } @@ -930,18 +946,18 @@ class SamzaContainer( } def startDiagnostics { - if (containerContext.config.getDiagnosticsEnabled) { + if (config.getDiagnosticsEnabled) { info("Starting diagnostics.") try { - val diagnosticsAppender = Class.forName(containerContext.config.getDiagnosticsAppenderClass). + val diagnosticsAppender = Class.forName(config.getDiagnosticsAppenderClass). getDeclaredConstructor(classOf[SamzaContainerMetrics]).newInstance(this.metrics); } catch { case e@(_: ClassNotFoundException | _: InstantiationException | _: InvocationTargetException) => { error("Failed to instantiate diagnostic appender", e) throw new ConfigException("Failed to instantiate diagnostic appender class " + - containerContext.config.getDiagnosticsAppenderClass, e) + config.getDiagnosticsAppenderClass, e) } } } @@ -958,24 +974,25 @@ class SamzaContainer( } def storeContainerLocality { - val isHostAffinityEnabled: Boolean = new ClusterManagerConfig(containerContext.config).getHostAffinityEnabled + val isHostAffinityEnabled: Boolean = new ClusterManagerConfig(config).getHostAffinityEnabled if (isHostAffinityEnabled) { - val localityManager: LocalityManager = new LocalityManager(containerContext.config, containerContext.metricsRegistry) - val containerName = "SamzaContainer-" + String.valueOf(containerContext.id) + val localityManager: LocalityManager = new LocalityManager(config, containerContext.getContainerMetricsRegistry) + val containerId = containerContext.getContainerModel.getId + val containerName = "SamzaContainer-" + containerId info("Registering %s with metadata store" format containerName) try { val hostInet = Util.getLocalHost val jmxUrl = if (jmxServer != null) jmxServer.getJmxUrl else "" val jmxTunnelingUrl = if (jmxServer != null) jmxServer.getTunnelingJmxUrl else "" info("Writing container locality and JMX address to metadata store") - localityManager.writeContainerToHostMapping(containerContext.id, hostInet.getHostName) + localityManager.writeContainerToHostMapping(containerId, hostInet.getHostName) } catch { case uhe: UnknownHostException => warn("Received UnknownHostException when persisting locality info for container %s: " + - "%s" format (containerContext.id, uhe.getMessage)) //No-op + "%s" format (containerId, uhe.getMessage)) //No-op case unknownException: Throwable => warn("Received an exception when persisting locality info for container %s: " + - "%s" format (containerContext.id, unknownException.getMessage)) + "%s" format (containerId, unknownException.getMessage)) } finally { info("Shutting down locality manager.") localityManager.close() @@ -1016,7 +1033,6 @@ class SamzaContainer( systemAdmins.start } - def startProducers { info("Registering task instances with producers.") @@ -1092,7 +1108,6 @@ class SamzaContainer( systemAdmins.stop } - def shutdownProducers { info("Shutting down producer multiplexer.") @@ -1185,4 +1200,4 @@ class SamzaContainer( hostStatisticsMonitor.stop() } } -} +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/samza/blob/9d2d49e9/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala b/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala index 9f4fd17..f8e9c63 100644 --- a/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala +++ b/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala @@ -20,15 +20,17 @@ package org.apache.samza.container +import java.util.Optional import java.util.concurrent.ScheduledExecutorService import org.apache.samza.SamzaException import org.apache.samza.checkpoint.OffsetManager import org.apache.samza.config.Config import org.apache.samza.config.StreamConfig.Config2Stream -import org.apache.samza.job.model.JobModel +import org.apache.samza.context._ +import org.apache.samza.job.model.{JobModel, TaskModel} import org.apache.samza.metrics.MetricsReporter -import org.apache.samza.scheduler.ScheduledCallback +import org.apache.samza.scheduler.{CallbackSchedulerImpl, ScheduledCallback} import org.apache.samza.storage.kv.KeyValueStore import org.apache.samza.storage.{TaskSideInputStorageManager, TaskStorageManager} import org.apache.samza.system._ @@ -36,19 +38,17 @@ import org.apache.samza.table.TableManager import org.apache.samza.task._ import org.apache.samza.util.{Logging, ScalaJavaUtil} -import scala.collection.JavaConverters._ import scala.collection.JavaConversions._ +import scala.collection.JavaConverters._ import scala.collection.Map class TaskInstance( val task: Any, - val taskName: TaskName, - config: Config, + taskModel: TaskModel, val metrics: TaskInstanceMetrics, systemAdmins: SystemAdmins, consumerMultiplexer: SystemConsumers, collector: TaskInstanceCollector, - containerContext: SamzaContainerContext, val offsetManager: OffsetManager = new OffsetManager, storageManager: TaskStorageManager = null, tableManager: TableManager = null, @@ -59,15 +59,23 @@ class TaskInstance( streamMetadataCache: StreamMetadataCache = null, timerExecutor : ScheduledExecutorService = null, sideInputSSPs: Set[SystemStreamPartition] = Set(), - sideInputStorageManager: TaskSideInputStorageManager = null) extends Logging { - + sideInputStorageManager: TaskSideInputStorageManager = null, + jobContext: JobContext, + containerContext: ContainerContext, + applicationContainerContextOption: Option[ApplicationContainerContext], + applicationTaskContextFactoryOption: Option[ApplicationTaskContextFactory[ApplicationTaskContext]] +) extends Logging { + + val taskName: TaskName = taskModel.getTaskName val isInitableTask = task.isInstanceOf[InitableTask] val isWindowableTask = task.isInstanceOf[WindowableTask] val isEndOfStreamListenerTask = task.isInstanceOf[EndOfStreamListenerTask] val isClosableTask = task.isInstanceOf[ClosableTask] val isAsyncTask = task.isInstanceOf[AsyncStreamTask] - val kvStoreSupplier = ScalaJavaUtil.toJavaFunction( + val epochTimeScheduler: EpochTimeScheduler = EpochTimeScheduler.create(timerExecutor) + + private val kvStoreSupplier = ScalaJavaUtil.toJavaFunction( (storeName: String) => { if (storageManager != null && storageManager.getStore(storeName).isDefined) { storageManager.getStore(storeName).get.asInstanceOf[KeyValueStore[_, _]] @@ -77,9 +85,14 @@ class TaskInstance( null } }) - - val context = new TaskContextImpl(taskName, metrics, containerContext, systemStreamPartitions.asJava, offsetManager, - kvStoreSupplier, tableManager, jobModel, streamMetadataCache, timerExecutor) + private val taskContext = new TaskContextImpl(taskModel, metrics.registry, kvStoreSupplier, tableManager, + new CallbackSchedulerImpl(epochTimeScheduler), offsetManager, jobModel, streamMetadataCache) + // need separate field for this instead of using it through Context, since Context throws an exception if it is null + private val applicationTaskContextOption = applicationTaskContextFactoryOption.map(_.create(jobContext, + containerContext, taskContext, applicationContainerContextOption.orNull)) + val context = new ContextImpl(jobContext, containerContext, taskContext, + Optional.ofNullable(applicationContainerContextOption.orNull), + Optional.ofNullable(applicationTaskContextOption.orNull)) // store the (ssp -> if this ssp has caught up) mapping. "caught up" // means the same ssp in other taskInstances have the same offset as @@ -88,6 +101,8 @@ class TaskInstance( scala.collection.mutable.Map[SystemStreamPartition, Boolean]() systemStreamPartitions.foreach(ssp2CaughtupMapping += _ -> false) + private val config: Config = jobContext.getConfig + val intermediateStreams: Set[String] = config.getStreamIds.filter(config.getIsIntermediateStream).toSet val streamsToDeleteCommittedMessages: Set[String] = config.getStreamIds.filter(config.getDeleteCommittedMessages).map(config.getPhysicalName).toSet @@ -126,7 +141,7 @@ class TaskInstance( if (tableManager != null) { debug("Starting table manager for taskName: %s" format taskName) - tableManager.init(containerContext, context) + tableManager.init(context) } else { debug("Skipping table manager initialization for taskName: %s" format taskName) } @@ -136,10 +151,14 @@ class TaskInstance( if (isInitableTask) { debug("Initializing task for taskName: %s" format taskName) - task.asInstanceOf[InitableTask].init(config, context) + task.asInstanceOf[InitableTask].init(context) } else { debug("Skipping task initialization for taskName: %s" format taskName) } + applicationTaskContextOption.foreach(applicationTaskContext => { + debug("Starting application-defined task context for taskName: %s" format taskName) + applicationTaskContext.start() + }) } def registerProducers { @@ -226,7 +245,7 @@ class TaskInstance( trace("Scheduler for taskName: %s" format taskName) exceptionHandler.maybeHandle { - context.getTimerScheduler.removeReadyTimers().entrySet().foreach { entry => + epochTimeScheduler.removeReadyTimers().entrySet().foreach { entry => entry.getValue.asInstanceOf[ScheduledCallback[Any]].onCallback(entry.getKey.getKey, collector, coordinator) } } @@ -266,6 +285,10 @@ class TaskInstance( } def shutdownTask { + applicationTaskContextOption.foreach(applicationTaskContext => { + debug("Stopping application-defined task context for taskName: %s" format taskName) + applicationTaskContext.stop() + }) if (task.isInstanceOf[ClosableTask]) { debug("Shutting down stream task for taskName: %s" format taskName) http://git-wip-us.apache.org/repos/asf/samza/blob/9d2d49e9/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala b/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala index bec4ec0..929d6a4 100644 --- a/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala +++ b/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala @@ -24,6 +24,7 @@ import org.apache.samza.config.JobConfig._ import org.apache.samza.config.ShellCommandConfig._ import org.apache.samza.config.{Config, TaskConfigJava} import org.apache.samza.container.{SamzaContainer, SamzaContainerListener, TaskName} +import org.apache.samza.context.JobContextImpl import org.apache.samza.coordinator.JobModelManager import org.apache.samza.coordinator.stream.CoordinatorStreamManager import org.apache.samza.job.{StreamJob, StreamJobFactory} @@ -112,9 +113,12 @@ class ThreadJobFactory extends StreamJobFactory with Logging { val container = SamzaContainer( containerId, jobModel, - config, Map[String, MetricsReporter](), - taskFactory) + taskFactory, + JobContextImpl.fromConfigWithDefaults(config), + Option(appDesc.getApplicationContainerContextFactory.orElse(null)), + Option(appDesc.getApplicationTaskContextFactory.orElse(null)) + ) container.setContainerListener(containerListener) val threadJob = new ThreadJob(container)
