This is an automated email from the ASF dual-hosted git repository.
cameronlee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git
The following commit(s) were added to refs/heads/master by this push:
new 38e642d SAMZA-2197: [Scala cleanup] Convert TaskConfig to Java
(#1039)
38e642d is described below
commit 38e642d330acc6d08ab1f82580bb712f88ce7b7e
Author: Cameron Lee <[email protected]>
AuthorDate: Thu Jun 6 13:33:35 2019 -0700
SAMZA-2197: [Scala cleanup] Convert TaskConfig to Java (#1039)
* SAMZA-2197: [Scala cleanup] Convert TaskConfig to Java
* moving constants to TaskConfigJava
* renaming TaskConfigJava to TaskConfig and modifying visibility on some
constants
* update private method access in TestJobCoordinator
* clean up unused methods in TaskConfig
* update javadocs, use Java Optional, move some defaults to TaskConfig
* post-merge fixes
* test and import fixes
* fix javadoc, clean up if-else check, fix format
---
.../samza/system/kinesis/KinesisSystemFactory.java | 4 +-
.../samza/coordinator/AzureJobCoordinator.java | 3 +-
.../apache/samza/application/ApplicationUtil.java | 7 +-
.../clustermanager/ClusterBasedJobCoordinator.java | 4 +-
.../apache/samza/config/DefaultChooserConfig.java | 8 +-
.../java/org/apache/samza/config/TaskConfig.java | 299 +++++++++++++++++
.../org/apache/samza/config/TaskConfigJava.java | 178 ----------
.../org/apache/samza/container/RunLoopFactory.java | 14 +-
.../stream/AllSspToSingleTaskGrouperFactory.java | 4 +-
.../container/grouper/stream/GroupByPartition.java | 4 +-
.../stream/GroupBySystemStreamPartition.java | 4 +-
.../container/grouper/stream/SSPGrouperProxy.java | 4 +-
.../samza/coordinator/MetadataResourceUtil.java | 6 +-
.../execution/JobNodeConfigurationGenerator.java | 9 +-
.../org/apache/samza/execution/JobPlanner.java | 6 +-
.../org/apache/samza/execution/StreamManager.java | 13 +-
.../apache/samza/processor/StreamProcessor.java | 4 +-
.../org/apache/samza/util/DiagnosticsUtil.java | 4 +-
.../java/org/apache/samza/zk/ZkJobCoordinator.java | 4 +-
.../apache/samza/checkpoint/CheckpointTool.scala | 15 +-
.../apache/samza/config/RegExTopicGenerator.scala | 8 +-
.../scala/org/apache/samza/config/TaskConfig.scala | 157 ---------
.../apache/samza/container/SamzaContainer.scala | 31 +-
.../container/TaskInstanceExceptionHandler.scala | 15 +-
.../apache/samza/coordinator/JobModelManager.scala | 11 +-
.../apache/samza/job/local/ProcessJobFactory.scala | 8 +-
.../apache/samza/job/local/ThreadJobFactory.scala | 2 +-
.../reporter/MetricsSnapshotReporterFactory.scala | 3 +-
.../samza/storage/ContainerStorageManager.java | 5 +-
.../org/apache/samza/system/SystemConsumers.scala | 5 +-
.../samza/system/chooser/DefaultChooser.scala | 4 +-
.../main/scala/org/apache/samza/util/Util.scala | 25 +-
.../samza/application/TestApplicationUtil.java | 10 +-
.../org/apache/samza/config/TestTaskConfig.java | 367 +++++++++++++++++++++
.../apache/samza/config/TestTaskConfigJava.java | 93 ------
.../samza/execution/TestExecutionPlanner.java | 10 +-
.../TestJobNodeConfigurationGenerator.java | 11 +-
.../TestTaskInstanceExceptionHandler.scala | 2 +-
.../samza/coordinator/TestJobCoordinator.scala | 4 +-
.../apache/samza/system/TestSystemConsumers.scala | 14 +-
.../samza/system/kafka/KafkaSystemFactory.scala | 4 +-
.../apache/samza/logging/log4j/StreamAppender.java | 2 +-
.../samza/logging/log4j2/StreamAppender.java | 2 +-
.../samza/sql/client/impl/SamzaExecutor.java | 2 +-
.../apache/samza/sql/util/SamzaSqlTestConfig.java | 2 +-
.../apache/samza/test/framework/TestRunner.java | 12 +-
.../samza/processor/TestZkStreamProcessorBase.java | 4 +-
.../EndOfStreamIntegrationTest.java | 2 +-
.../controlmessages/WatermarkIntegrationTest.java | 4 +-
.../samza/test/framework/FaultInjectionTest.java | 4 +-
.../samza/test/operator/TestAsyncFlatMap.java | 2 +-
.../operator/TestRepartitionJoinWindowApp.java | 6 +-
.../test/operator/TestRepartitionWindowApp.java | 2 +-
.../processor/TestZkLocalApplicationRunner.java | 17 +-
.../samza/test/table/TestLocalTableEndToEnd.java | 2 +-
.../test/integration/StreamTaskTestUtil.scala | 17 +-
.../org/apache/samza/tools/SamzaSqlConsole.java | 2 +-
.../benchmark/SystemConsumerWithSamzaBench.java | 2 +-
58 files changed, 850 insertions(+), 617 deletions(-)
diff --git
a/samza-aws/src/main/java/org/apache/samza/system/kinesis/KinesisSystemFactory.java
b/samza-aws/src/main/java/org/apache/samza/system/kinesis/KinesisSystemFactory.java
index 558e871..15b44bc 100644
---
a/samza-aws/src/main/java/org/apache/samza/system/kinesis/KinesisSystemFactory.java
+++
b/samza-aws/src/main/java/org/apache/samza/system/kinesis/KinesisSystemFactory.java
@@ -23,7 +23,7 @@ import org.apache.samza.config.Config;
import org.apache.samza.config.ConfigException;
import org.apache.samza.config.JobConfig;
import org.apache.samza.config.StreamConfig;
-import org.apache.samza.config.TaskConfigJava;
+import org.apache.samza.config.TaskConfig;
import
org.apache.samza.container.grouper.stream.AllSspToSingleTaskGrouperFactory;
import org.apache.samza.metrics.MetricsRegistry;
import org.apache.samza.system.SystemAdmin;
@@ -69,7 +69,7 @@ public class KinesisSystemFactory implements SystemFactory {
}
// Kinesis streams cannot be configured as broadcast streams
- TaskConfigJava taskConfig = new TaskConfigJava(config);
+ TaskConfig taskConfig = new TaskConfig(config);
if (taskConfig.getBroadcastSystemStreams().stream().anyMatch(ss ->
system.equals(ss.getSystem()))) {
throw new ConfigException("Kinesis streams cannot be configured as
broadcast streams.");
}
diff --git
a/samza-azure/src/main/java/org/apache/samza/coordinator/AzureJobCoordinator.java
b/samza-azure/src/main/java/org/apache/samza/coordinator/AzureJobCoordinator.java
index 8c4afdc..db082e3 100644
---
a/samza-azure/src/main/java/org/apache/samza/coordinator/AzureJobCoordinator.java
+++
b/samza-azure/src/main/java/org/apache/samza/coordinator/AzureJobCoordinator.java
@@ -271,7 +271,8 @@ public class AzureJobCoordinator implements JobCoordinator {
*/
private Set<SystemStreamPartition> getInputStreamPartitions() {
TaskConfig taskConfig = new TaskConfig(config);
- scala.collection.immutable.Set<SystemStream> inputSystemStreams =
taskConfig.getInputStreams();
+ scala.collection.immutable.Set<SystemStream> inputSystemStreams =
+
JavaConverters.asScalaSetConverter(taskConfig.getInputStreams()).asScala().toSet();
// Get the set of partitions for each SystemStream from the stream metadata
Set<SystemStreamPartition>
diff --git
a/samza-core/src/main/java/org/apache/samza/application/ApplicationUtil.java
b/samza-core/src/main/java/org/apache/samza/application/ApplicationUtil.java
index f779619..edcb3ae 100644
--- a/samza-core/src/main/java/org/apache/samza/application/ApplicationUtil.java
+++ b/samza-core/src/main/java/org/apache/samza/application/ApplicationUtil.java
@@ -18,12 +18,12 @@
*/
package org.apache.samza.application;
+import java.util.Optional;
import org.apache.commons.lang3.StringUtils;
import org.apache.samza.config.ApplicationConfig;
import org.apache.samza.config.Config;
import org.apache.samza.config.ConfigException;
import org.apache.samza.config.TaskConfig;
-import scala.Option;
/**
@@ -52,8 +52,9 @@ public class ApplicationUtil {
}
}
// no app.class defined. It has to be a legacy application with task.class
configuration
- Option<String> taskClassOption = new TaskConfig(config).getTaskClass();
- if (!taskClassOption.isDefined() ||
!StringUtils.isNotBlank(taskClassOption.getOrElse(null))) {
+ TaskConfig taskConfig = new TaskConfig(config);
+ Optional<String> taskClassOption = taskConfig.getTaskClass();
+ if (!taskClassOption.isPresent() ||
StringUtils.isBlank(taskClassOption.get())) {
// no task.class defined either. This is wrong.
throw new ConfigException("Legacy task applications must set a non-empty
task.class in configuration.");
}
diff --git
a/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java
b/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java
index bfd036c..7338765 100644
---
a/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java
+++
b/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java
@@ -33,7 +33,7 @@ import org.apache.samza.config.JobConfig;
import org.apache.samza.config.MapConfig;
import org.apache.samza.config.ShellCommandConfig;
import org.apache.samza.config.StorageConfig;
-import org.apache.samza.config.TaskConfigJava;
+import org.apache.samza.config.TaskConfig;
import org.apache.samza.container.TaskName;
import org.apache.samza.coordinator.InputStreamsDiscoveredException;
import org.apache.samza.coordinator.JobModelManager;
@@ -320,7 +320,7 @@ public class ClusterBasedJobCoordinator {
private Optional<StreamPartitionCountMonitor>
getPartitionCountMonitor(Config config, SystemAdmins systemAdmins) {
StreamMetadataCache streamMetadata = new StreamMetadataCache(systemAdmins,
0, SystemClock.instance());
- Set<SystemStream> inputStreamsToMonitor = new
TaskConfigJava(config).getAllInputStreams();
+ Set<SystemStream> inputStreamsToMonitor = new
TaskConfig(config).getAllInputStreams();
if (inputStreamsToMonitor.isEmpty()) {
throw new SamzaException("Input streams to a job can not be empty.");
}
diff --git
a/samza-core/src/main/java/org/apache/samza/config/DefaultChooserConfig.java
b/samza-core/src/main/java/org/apache/samza/config/DefaultChooserConfig.java
index ff344c9..c753b4e 100644
--- a/samza-core/src/main/java/org/apache/samza/config/DefaultChooserConfig.java
+++ b/samza-core/src/main/java/org/apache/samza/config/DefaultChooserConfig.java
@@ -32,12 +32,12 @@ import org.apache.samza.system.SystemStream;
public class DefaultChooserConfig extends MapConfig {
private static final String BATCH_SIZE = "task.consumer.batch.size";
- private final TaskConfigJava taskConfigJava;
+ private final TaskConfig taskConfig;
private final StreamConfig streamConfig;
public DefaultChooserConfig(Config config) {
super(config);
- taskConfigJava = new TaskConfigJava(config);
+ taskConfig = new TaskConfig(config);
streamConfig = new StreamConfig(config);
}
@@ -53,7 +53,7 @@ public class DefaultChooserConfig extends MapConfig {
*/
public Set<SystemStream> getBootstrapStreams() {
Set<SystemStream> bootstrapInputs = new HashSet<>();
- Set<SystemStream> allInputs = taskConfigJava.getAllInputStreams();
+ Set<SystemStream> allInputs = taskConfig.getAllInputStreams();
for (SystemStream systemStream : allInputs) {
if (streamConfig.getBootstrapEnabled(systemStream)) {
bootstrapInputs.add(systemStream);
@@ -71,7 +71,7 @@ public class DefaultChooserConfig extends MapConfig {
* were not explicitly configured with a priority are not returned.
*/
public Map<SystemStream, Integer> getPriorityStreams() {
- Set<SystemStream> allInputs = taskConfigJava.getAllInputStreams();
+ Set<SystemStream> allInputs = taskConfig.getAllInputStreams();
Map<SystemStream, Integer> priorityStreams = new HashMap<>();
for (SystemStream systemStream : allInputs) {
diff --git a/samza-core/src/main/java/org/apache/samza/config/TaskConfig.java
b/samza-core/src/main/java/org/apache/samza/config/TaskConfig.java
new file mode 100644
index 0000000..7c02054
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/config/TaskConfig.java
@@ -0,0 +1,299 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.config;
+
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.samza.Partition;
+import org.apache.samza.checkpoint.CheckpointManager;
+import org.apache.samza.checkpoint.CheckpointManagerFactory;
+import org.apache.samza.container.grouper.task.GroupByContainerCountFactory;
+import org.apache.samza.metrics.MetricsRegistry;
+import org.apache.samza.system.SystemStream;
+import org.apache.samza.system.SystemStreamPartition;
+import org.apache.samza.system.chooser.RoundRobinChooserFactory;
+import org.apache.samza.util.ReflectionUtil;
+import org.apache.samza.util.StreamUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class TaskConfig extends MapConfig {
+ public static final Logger LOGGER =
LoggerFactory.getLogger(TaskConfig.class);
+
+ // comma-separated list of system-streams
+ public static final String INPUT_STREAMS = "task.inputs";
+ // window period in milliseconds
+ public static final String WINDOW_MS = "task.window.ms";
+ static final long DEFAULT_WINDOW_MS = -1L;
+ // commit period in milliseconds
+ public static final String COMMIT_MS = "task.commit.ms";
+ static final long DEFAULT_COMMIT_MS = 60000L;
+ // how long to wait for a clean shutdown
+ public static final String TASK_SHUTDOWN_MS = "task.shutdown.ms";
+ static final long DEFAULT_TASK_SHUTDOWN_MS = 30000L;
+ // legacy config for specifying task class; replaced by SamzaApplication and
app.class
+ public static final String TASK_CLASS = "task.class";
+ // command builder to use for launching a Samza job
+ public static final String COMMAND_BUILDER = "task.command.class";
+ // message chooser for controlling stream consumption
+ public static final String MESSAGE_CHOOSER_CLASS_NAME = "task.chooser.class";
+ // define whether to drop the messages or not when deserialization fails
+ public static final String DROP_DESERIALIZATION_ERRORS =
"task.drop.deserialization.errors";
+ // define whether to drop the messages or not when serialization fails
+ public static final String DROP_SERIALIZATION_ERRORS =
"task.drop.serialization.errors";
+ // whether to ignore producer errors and drop the messages that failed to
send
+ public static final String DROP_PRODUCER_ERRORS =
"task.drop.producer.errors";
+ // exceptions to ignore in process and window
+ public static final String IGNORED_EXCEPTIONS = "task.ignored.exceptions";
+ // class name for task grouper
+ public static final String GROUPER_FACTORY = "task.name.grouper.factory";
+ // max number of messages to process concurrently
+ public static final String MAX_CONCURRENCY = "task.max.concurrency";
+ static final int DEFAULT_MAX_CONCURRENCY = 1;
+ // timeout for triggering a callback
+ public static final String CALLBACK_TIMEOUT_MS = "task.callback.timeout.ms";
+ static final long DEFAULT_CALLBACK_TIMEOUT_MS = -1L;
+ // enable async commit
+ public static final String ASYNC_COMMIT = "task.async.commit";
+ // maximum time to wait for a task worker to complete when there are no new
messages to handle
+ public static final String MAX_IDLE_MS = "task.max.idle.ms";
+ static final long DEFAULT_MAX_IDLE_MS = 10L;
+ /**
+ * Samza's container polls for more messages under two conditions. The first
+ * condition arises when there are simply no remaining buffered messages to
+ * process for any input SystemStreamPartition. The second condition arises
+ * when some input SystemStreamPartitions have empty buffers, but some do
+ * not. In the latter case, a polling interval is defined to determine how
+ * often to refresh the empty SystemStreamPartition buffers. By default,
+ * this interval is 50ms, which means that any empty SystemStreamPartition
+ * buffer will be refreshed at least every 50ms. A higher value here means
+ * that empty SystemStreamPartitions will be refreshed less often, which
+ * means more latency is introduced, but less CPU and network will be used.
+ * Decreasing this value means that empty SystemStreamPartitions are
+ * refreshed more frequently, thereby introducing less latency, but
+ * increasing CPU and network utilization.
+ */
+ public static final String POLL_INTERVAL_MS = "task.poll.interval.ms";
+ public static final int DEFAULT_POLL_INTERVAL_MS = 50;
+ // broadcast streams consumed by all tasks. e.g. kafka.foo#1
+ public static final String BROADCAST_INPUT_STREAMS = "task.broadcast.inputs";
+ private static final String BROADCAST_STREAM_PATTERN = "^[\\d]+$";
+ private static final String BROADCAST_STREAM_RANGE_PATTERN =
"^\\[[\\d]+\\-[\\d]+\\]$";
+ public static final String CHECKPOINT_MANAGER_FACTORY =
"task.checkpoint.factory";
+
+ public TaskConfig(Config config) {
+ super(config);
+ }
+
+ /**
+ * Get the input streams, not including the broadcast streams. Use {@link
#getAllInputStreams()} to also get the
+ * broadcast streams.
+ */
+ public Set<SystemStream> getInputStreams() {
+ Optional<String> inputStreams = Optional.ofNullable(get(INPUT_STREAMS));
+ if (!inputStreams.isPresent() || inputStreams.get().isEmpty()) {
+ return Collections.emptySet();
+ } else {
+ return Stream.of(inputStreams.get().split(","))
+ .map(systemStreamNames ->
StreamUtil.getSystemStreamFromNames(systemStreamNames.trim()))
+ .collect(Collectors.toSet());
+ }
+ }
+
+ public long getWindowMs() {
+ return getLong(WINDOW_MS, DEFAULT_WINDOW_MS);
+ }
+
+ public long getCommitMs() {
+ return getLong(COMMIT_MS, DEFAULT_COMMIT_MS);
+ }
+
+ public Optional<String> getTaskClass() {
+ return Optional.ofNullable(get(TASK_CLASS));
+ }
+
+ public String getCommandClass(String defaultCommandClass) {
+ return get(COMMAND_BUILDER, defaultCommandClass);
+ }
+
+ public String getMessageChooserClass() {
+ return
Optional.ofNullable(get(MESSAGE_CHOOSER_CLASS_NAME)).orElse(RoundRobinChooserFactory.class.getName());
+ }
+
+ public boolean getDropDeserializationErrors() {
+ return getBoolean(DROP_DESERIALIZATION_ERRORS, false);
+ }
+
+ public boolean getDropSerializationErrors() {
+ return getBoolean(DROP_SERIALIZATION_ERRORS, false);
+ }
+
+ public boolean getDropProducerErrors() {
+ return getBoolean(DROP_PRODUCER_ERRORS, false);
+ }
+
+ public int getPollIntervalMs() {
+ return getInt(POLL_INTERVAL_MS, DEFAULT_POLL_INTERVAL_MS);
+ }
+
+ public Optional<String> getIgnoredExceptions() {
+ return Optional.ofNullable(get(IGNORED_EXCEPTIONS));
+ }
+
+ public String getTaskNameGrouperFactory() {
+ Optional<String> taskNameGrouperFactory =
Optional.ofNullable(get(GROUPER_FACTORY));
+ if (taskNameGrouperFactory.isPresent()) {
+ return taskNameGrouperFactory.get();
+ } else {
+ LOGGER.info(String.format("No %s configuration, using %s",
GROUPER_FACTORY,
+ GroupByContainerCountFactory.class.getName()));
+ return GroupByContainerCountFactory.class.getName();
+ }
+ }
+
+ public int getMaxConcurrency() {
+ return getInt(MAX_CONCURRENCY, DEFAULT_MAX_CONCURRENCY);
+ }
+
+ public long getCallbackTimeoutMs() {
+ return getLong(CALLBACK_TIMEOUT_MS, DEFAULT_CALLBACK_TIMEOUT_MS);
+ }
+
+ public boolean getAsyncCommit() {
+ return getBoolean(ASYNC_COMMIT, false);
+ }
+
+ public long getMaxIdleMs() {
+ return getLong(MAX_IDLE_MS, DEFAULT_MAX_IDLE_MS);
+ }
+
+ /**
+ * Create the checkpoint manager
+ *
+ * @param metricsRegistry Registry of metrics to use. Can be null if not
using metrics.
+ * @return CheckpointManager object if checkpoint manager factory is
configured, otherwise empty.
+ */
+ public Optional<CheckpointManager> getCheckpointManager(MetricsRegistry
metricsRegistry, ClassLoader classLoader) {
+ return Optional.ofNullable(get(CHECKPOINT_MANAGER_FACTORY))
+ .filter(StringUtils::isNotBlank)
+ .map(checkpointManagerFactoryName ->
ReflectionUtil.getObj(classLoader, checkpointManagerFactoryName,
+ CheckpointManagerFactory.class).getCheckpointManager(this,
metricsRegistry));
+ }
+
+ /**
+ * Get the systemStreamPartitions of the broadcast stream. Specifying
+ * one partition for one stream or a range of the partitions for one
+ * stream is allowed.
+ *
+ * @return a Set of SystemStreamPartitions
+ */
+ public Set<SystemStreamPartition> getBroadcastSystemStreamPartitions() {
+ HashSet<SystemStreamPartition> systemStreamPartitionSet = new HashSet<>();
+ List<String> systemStreamPartitions = getList(BROADCAST_INPUT_STREAMS,
Collections.emptyList());
+
+ for (String systemStreamPartition : systemStreamPartitions) {
+ int hashPosition = systemStreamPartition.indexOf("#");
+ if (hashPosition == -1) {
+ throw new IllegalArgumentException("incorrect format in " +
systemStreamPartition
+ + ". Broadcast stream names should be in the form
'system.stream#partitionId' or 'system.stream#[partitionN-partitionM]'");
+ } else {
+ String systemStreamName = systemStreamPartition.substring(0,
hashPosition);
+ String partitionSegment = systemStreamPartition.substring(hashPosition
+ 1);
+ SystemStream systemStream =
StreamUtil.getSystemStreamFromNames(systemStreamName);
+
+ if (Pattern.matches(BROADCAST_STREAM_PATTERN, partitionSegment)) {
+ systemStreamPartitionSet.add(new SystemStreamPartition(systemStream,
new Partition(Integer.valueOf(partitionSegment))));
+ } else {
+ if (Pattern.matches(BROADCAST_STREAM_RANGE_PATTERN,
partitionSegment)) {
+ int partitionStart = Integer.valueOf(partitionSegment.substring(1,
partitionSegment.lastIndexOf("-")));
+ int partitionEnd =
Integer.valueOf(partitionSegment.substring(partitionSegment.lastIndexOf("-") +
1, partitionSegment.indexOf("]")));
+ if (partitionStart > partitionEnd) {
+ LOGGER.warn("The starting partition in stream " +
systemStream.toString() + " is bigger than the ending Partition. No partition
is added");
+ }
+ for (int i = partitionStart; i <= partitionEnd; i++) {
+ systemStreamPartitionSet.add(new
SystemStreamPartition(systemStream, new Partition(i)));
+ }
+ } else {
+ throw new IllegalArgumentException("incorrect format in " +
systemStreamPartition
+ + ". Broadcast stream names should be in the form
'system.stream#partitionId' or 'system.stream#[partitionN-partitionM]'");
+ }
+ }
+ }
+ }
+ return systemStreamPartitionSet;
+ }
+
+ /**
+ * Get the SystemStreams for the configured broadcast streams.
+ *
+ * @return the set of SystemStreams for which there are broadcast stream
SSPs configured.
+ */
+ public Set<SystemStream> getBroadcastSystemStreams() {
+ Set<SystemStream> broadcastSS = new HashSet<>();
+ Set<SystemStreamPartition> broadcastSSPs =
getBroadcastSystemStreamPartitions();
+ for (SystemStreamPartition bssp : broadcastSSPs) {
+ broadcastSS.add(bssp.getSystemStream());
+ }
+ return Collections.unmodifiableSet(broadcastSS);
+ }
+
+ /**
+ * Get the SystemStreams for the configured input and broadcast streams.
+ *
+ * @return the set of SystemStreams for both standard inputs and broadcast
stream inputs.
+ */
+ public Set<SystemStream> getAllInputStreams() {
+ Set<SystemStream> allInputSS = new HashSet<>();
+
+ allInputSS.addAll(getInputStreams());
+ allInputSS.addAll(getBroadcastSystemStreams());
+
+ return Collections.unmodifiableSet(allInputSS);
+ }
+
+ /**
+ * Returns a value indicating how long to wait for the tasks to shutdown
+ * If the value is not defined in the config or if does not parse correctly,
we return the default value -
+ * {@value #DEFAULT_TASK_SHUTDOWN_MS}
+ *
+ * @return Long value indicating how long to wait for all the tasks to
shutdown
+ */
+ public long getShutdownMs() {
+ String shutdownMs = get(TASK_SHUTDOWN_MS);
+ try {
+ return Long.parseLong(shutdownMs);
+ } catch (NumberFormatException nfe) {
+ LOGGER.warn(String.format(
+ "Unable to parse user-configure value for %s - %s. Using default
value %d",
+ TASK_SHUTDOWN_MS,
+ shutdownMs,
+ DEFAULT_TASK_SHUTDOWN_MS));
+ return DEFAULT_TASK_SHUTDOWN_MS;
+ }
+ }
+}
diff --git
a/samza-core/src/main/java/org/apache/samza/config/TaskConfigJava.java
b/samza-core/src/main/java/org/apache/samza/config/TaskConfigJava.java
deleted file mode 100644
index 6ef41e9..0000000
--- a/samza-core/src/main/java/org/apache/samza/config/TaskConfigJava.java
+++ /dev/null
@@ -1,178 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.config;
-
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-import java.util.regex.Pattern;
-
-import org.apache.commons.lang3.StringUtils;
-import org.apache.samza.Partition;
-import org.apache.samza.checkpoint.CheckpointManager;
-import org.apache.samza.checkpoint.CheckpointManagerFactory;
-import org.apache.samza.metrics.MetricsRegistry;
-import org.apache.samza.system.SystemStream;
-import org.apache.samza.system.SystemStreamPartition;
-import org.apache.samza.util.ReflectionUtil;
-import org.apache.samza.util.StreamUtil;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import scala.collection.JavaConverters;
-
-
-public class TaskConfigJava extends MapConfig {
- // Task Configs
- public static final String TASK_SHUTDOWN_MS = "task.shutdown.ms";
- public static final long DEFAULT_TASK_SHUTDOWN_MS = 30000L;
-
- // broadcast streams consumed by all tasks. e.g. kafka.foo#1
- public static final String BROADCAST_INPUT_STREAMS = "task.broadcast.inputs";
- private static final String BROADCAST_STREAM_PATTERN = "^[\\d]+$";
- private static final String BROADCAST_STREAM_RANGE_PATTERN =
"^\\[[\\d]+\\-[\\d]+\\]$";
-
- // class name to use when sending offset checkpoints
- public static final String CHECKPOINT_MANAGER_FACTORY =
"task.checkpoint.factory";
-
- public static final Logger LOGGER =
LoggerFactory.getLogger(TaskConfigJava.class);
-
- public TaskConfigJava(Config config) {
- super(config);
- }
-
- /**
- * Get the name of the checkpoint manager factory
- *
- * @return Name of checkpoint manager factory
- */
- public String getCheckpointManagerFactoryName() {
- return get(CHECKPOINT_MANAGER_FACTORY, null);
- }
-
- /**
- * Create the checkpoint manager
- *
- * @param metricsRegistry Registry of metrics to use. Can be null if not
using metrics.
- * @return CheckpointManager object if checkpoint manager factory is
configured, otherwise null.
- */
- public CheckpointManager getCheckpointManager(MetricsRegistry
metricsRegistry, ClassLoader classLoader) {
- // Initialize checkpoint streams during job coordination
- String checkpointManagerFactoryName = getCheckpointManagerFactoryName();
- if (StringUtils.isNotBlank(checkpointManagerFactoryName)) {
- return ReflectionUtil.getObj(classLoader, checkpointManagerFactoryName,
CheckpointManagerFactory.class)
- .getCheckpointManager(this, metricsRegistry);
- }
- return null;
- }
-
- /**
- * Get the systemStreamPartitions of the broadcast stream. Specifying
- * one partition for one stream or a range of the partitions for one
- * stream is allowed.
- *
- * @return a Set of SystemStreamPartitions
- */
- public Set<SystemStreamPartition> getBroadcastSystemStreamPartitions() {
- HashSet<SystemStreamPartition> systemStreamPartitionSet = new
HashSet<SystemStreamPartition>();
- List<String> systemStreamPartitions = getList(BROADCAST_INPUT_STREAMS,
Collections.<String>emptyList());
-
- for (String systemStreamPartition : systemStreamPartitions) {
- int hashPosition = systemStreamPartition.indexOf("#");
- if (hashPosition == -1) {
- throw new IllegalArgumentException("incorrect format in " +
systemStreamPartition
- + ". Broadcast stream names should be in the form
'system.stream#partitionId' or 'system.stream#[partitionN-partitionM]'");
- } else {
- String systemStreamName = systemStreamPartition.substring(0,
hashPosition);
- String partitionSegment = systemStreamPartition.substring(hashPosition
+ 1);
- SystemStream systemStream =
StreamUtil.getSystemStreamFromNames(systemStreamName);
-
- if (Pattern.matches(BROADCAST_STREAM_PATTERN, partitionSegment)) {
- systemStreamPartitionSet.add(new SystemStreamPartition(systemStream,
new Partition(Integer.valueOf(partitionSegment))));
- } else {
- if (Pattern.matches(BROADCAST_STREAM_RANGE_PATTERN,
partitionSegment)) {
- int partitionStart = Integer.valueOf(partitionSegment.substring(1,
partitionSegment.lastIndexOf("-")));
- int partitionEnd =
Integer.valueOf(partitionSegment.substring(partitionSegment.lastIndexOf("-") +
1, partitionSegment.indexOf("]")));
- if (partitionStart > partitionEnd) {
- LOGGER.warn("The starting partition in stream " +
systemStream.toString() + " is bigger than the ending Partition. No partition
is added");
- }
- for (int i = partitionStart; i <= partitionEnd; i++) {
- systemStreamPartitionSet.add(new
SystemStreamPartition(systemStream, new Partition(i)));
- }
- } else {
- throw new IllegalArgumentException("incorrect format in " +
systemStreamPartition
- + ". Broadcast stream names should be in the form
'system.stream#partitionId' or 'system.stream#[partitionN-partitionM]'");
- }
- }
- }
- }
- return systemStreamPartitionSet;
- }
-
- /**
- * Get the SystemStreams for the configured broadcast streams.
- *
- * @return the set of SystemStreams for which there are broadcast stream
SSPs configured.
- */
- public Set<SystemStream> getBroadcastSystemStreams() {
- Set<SystemStream> broadcastSS = new HashSet<>();
- Set<SystemStreamPartition> broadcastSSPs =
getBroadcastSystemStreamPartitions();
- for (SystemStreamPartition bssp : broadcastSSPs) {
- broadcastSS.add(bssp.getSystemStream());
- }
- return Collections.unmodifiableSet(broadcastSS);
- }
-
- /**
- * Get the SystemStreams for the configured input and broadcast streams.
- *
- * @return the set of SystemStreams for both standard inputs and broadcast
stream inputs.
- */
- public Set<SystemStream> getAllInputStreams() {
- Set<SystemStream> allInputSS = new HashSet<>();
-
- TaskConfig taskConfig = TaskConfig.Config2Task(this);
- allInputSS.addAll((Set<? extends SystemStream>)
JavaConverters.setAsJavaSetConverter(taskConfig.getInputStreams()).asJava());
- allInputSS.addAll(getBroadcastSystemStreams());
-
- return Collections.unmodifiableSet(allInputSS);
- }
-
- /**
- * Returns a value indicating how long to wait for the tasks to shutdown
- * If the value is not defined in the config or if does not parse correctly,
we return the default value -
- * {@value #DEFAULT_TASK_SHUTDOWN_MS}
- *
- * @return Long value indicating how long to wait for all the tasks to
shutdown
- */
- public long getShutdownMs() {
- String shutdownMs = get(TASK_SHUTDOWN_MS);
- try {
- return Long.parseLong(shutdownMs);
- } catch (NumberFormatException nfe) {
- LOGGER.warn(String.format(
- "Unable to parse user-configure value for %s - %s. Using default
value %d",
- TASK_SHUTDOWN_MS,
- shutdownMs,
- DEFAULT_TASK_SHUTDOWN_MS));
- return DEFAULT_TASK_SHUTDOWN_MS;
- }
- }
-}
diff --git
a/samza-core/src/main/java/org/apache/samza/container/RunLoopFactory.java
b/samza-core/src/main/java/org/apache/samza/container/RunLoopFactory.java
index b50a270..0e0a01c 100644
--- a/samza-core/src/main/java/org/apache/samza/container/RunLoopFactory.java
+++ b/samza-core/src/main/java/org/apache/samza/container/RunLoopFactory.java
@@ -41,14 +41,14 @@ public class RunLoopFactory {
ExecutorService threadPool,
long maxThrottlingDelayMs,
SamzaContainerMetrics containerMetrics,
- TaskConfig config,
+ TaskConfig taskConfig,
HighResolutionClock clock) {
- long taskWindowMs = config.getWindowMs();
+ long taskWindowMs = taskConfig.getWindowMs();
log.info("Got window milliseconds: {}.", taskWindowMs);
- long taskCommitMs = config.getCommitMs();
+ long taskCommitMs = taskConfig.getCommitMs();
log.info("Got commit milliseconds: {}.", taskCommitMs);
@@ -64,16 +64,16 @@ public class RunLoopFactory {
throw new SamzaException("Mixing StreamTask and AsyncStreamTask is not
supported");
}
- int taskMaxConcurrency = config.getMaxConcurrency();
+ int taskMaxConcurrency = taskConfig.getMaxConcurrency();
log.info("Got taskMaxConcurrency: {}.", taskMaxConcurrency);
- boolean isAsyncCommitEnabled = config.getAsyncCommit();
+ boolean isAsyncCommitEnabled = taskConfig.getAsyncCommit();
log.info("Got asyncCommitEnabled: {}.", isAsyncCommitEnabled);
- long callbackTimeout = config.getCallbackTimeoutMs();
+ long callbackTimeout = taskConfig.getCallbackTimeoutMs();
log.info("Got callbackTimeout: {}.", callbackTimeout);
- long maxIdleMs = config.getMaxIdleMs();
+ long maxIdleMs = taskConfig.getMaxIdleMs();
log.info("Got maxIdleMs: {}.", maxIdleMs);
log.info("Run loop in asynchronous mode.");
diff --git
a/samza-core/src/main/java/org/apache/samza/container/grouper/stream/AllSspToSingleTaskGrouperFactory.java
b/samza-core/src/main/java/org/apache/samza/container/grouper/stream/AllSspToSingleTaskGrouperFactory.java
index d3c5080..794a1f9 100644
---
a/samza-core/src/main/java/org/apache/samza/container/grouper/stream/AllSspToSingleTaskGrouperFactory.java
+++
b/samza-core/src/main/java/org/apache/samza/container/grouper/stream/AllSspToSingleTaskGrouperFactory.java
@@ -29,7 +29,7 @@ import org.apache.samza.SamzaException;
import org.apache.samza.config.Config;
import org.apache.samza.config.ConfigException;
import org.apache.samza.config.JobConfig;
-import org.apache.samza.config.TaskConfigJava;
+import org.apache.samza.config.TaskConfig;
import org.apache.samza.container.TaskName;
import org.apache.samza.system.SystemStreamPartition;
@@ -73,7 +73,7 @@ class AllSspToSingleTaskGrouper implements
SystemStreamPartitionGrouper {
public class AllSspToSingleTaskGrouperFactory implements
SystemStreamPartitionGrouperFactory {
@Override
public SystemStreamPartitionGrouper getSystemStreamPartitionGrouper(Config
config) {
- if (!(new TaskConfigJava(config).getBroadcastSystemStreams().isEmpty())) {
+ if (!(new TaskConfig(config).getBroadcastSystemStreams().isEmpty())) {
throw new ConfigException("The job configured with
AllSspToSingleTaskGrouper cannot have broadcast streams.");
}
diff --git
a/samza-core/src/main/java/org/apache/samza/container/grouper/stream/GroupByPartition.java
b/samza-core/src/main/java/org/apache/samza/container/grouper/stream/GroupByPartition.java
index 9542791..154aa67 100644
---
a/samza-core/src/main/java/org/apache/samza/container/grouper/stream/GroupByPartition.java
+++
b/samza-core/src/main/java/org/apache/samza/container/grouper/stream/GroupByPartition.java
@@ -24,7 +24,7 @@ import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import org.apache.samza.config.Config;
-import org.apache.samza.config.TaskConfigJava;
+import org.apache.samza.config.TaskConfig;
import org.apache.samza.container.TaskName;
import org.apache.samza.system.SystemStreamPartition;
@@ -38,7 +38,7 @@ public class GroupByPartition implements
SystemStreamPartitionGrouper {
private final Set<SystemStreamPartition> broadcastStreams;
public GroupByPartition(Config config) {
- TaskConfigJava taskConfig = new TaskConfigJava(config);
+ TaskConfig taskConfig = new TaskConfig(config);
this.broadcastStreams = taskConfig.getBroadcastSystemStreamPartitions();
}
diff --git
a/samza-core/src/main/java/org/apache/samza/container/grouper/stream/GroupBySystemStreamPartition.java
b/samza-core/src/main/java/org/apache/samza/container/grouper/stream/GroupBySystemStreamPartition.java
index 90828fd..9f3fbf1 100644
---
a/samza-core/src/main/java/org/apache/samza/container/grouper/stream/GroupBySystemStreamPartition.java
+++
b/samza-core/src/main/java/org/apache/samza/container/grouper/stream/GroupBySystemStreamPartition.java
@@ -25,7 +25,7 @@ import java.util.Map;
import java.util.Set;
import org.apache.samza.config.Config;
-import org.apache.samza.config.TaskConfigJava;
+import org.apache.samza.config.TaskConfig;
import org.apache.samza.container.TaskName;
import org.apache.samza.system.SystemStreamPartition;
@@ -37,7 +37,7 @@ public class GroupBySystemStreamPartition implements
SystemStreamPartitionGroupe
private final Set<SystemStreamPartition> broadcastStreams;
public GroupBySystemStreamPartition(Config config) {
- TaskConfigJava taskConfig = new TaskConfigJava(config);
+ TaskConfig taskConfig = new TaskConfig(config);
broadcastStreams = taskConfig.getBroadcastSystemStreamPartitions();
}
diff --git
a/samza-core/src/main/java/org/apache/samza/container/grouper/stream/SSPGrouperProxy.java
b/samza-core/src/main/java/org/apache/samza/container/grouper/stream/SSPGrouperProxy.java
index a24a828..29b27c5 100644
---
a/samza-core/src/main/java/org/apache/samza/container/grouper/stream/SSPGrouperProxy.java
+++
b/samza-core/src/main/java/org/apache/samza/container/grouper/stream/SSPGrouperProxy.java
@@ -29,7 +29,7 @@ import java.util.HashSet;
import com.google.common.base.Preconditions;
import org.apache.samza.config.Config;
import org.apache.samza.config.JobConfig;
-import org.apache.samza.config.TaskConfigJava;
+import org.apache.samza.config.TaskConfig;
import org.apache.samza.container.TaskName;
import org.apache.samza.container.grouper.task.GrouperMetadata;
import org.apache.samza.metrics.MetricsRegistryMap;
@@ -55,7 +55,7 @@ public class SSPGrouperProxy {
Preconditions.checkNotNull(config);
Preconditions.checkNotNull(grouper);
this.grouper = grouper;
- this.broadcastSystemStreamPartitions = new
TaskConfigJava(config).getBroadcastSystemStreamPartitions();
+ this.broadcastSystemStreamPartitions = new
TaskConfig(config).getBroadcastSystemStreamPartitions();
this.systemStreamPartitionMapper = getSystemStreamPartitionMapper(config,
classLoader);
}
diff --git
a/samza-core/src/main/java/org/apache/samza/coordinator/MetadataResourceUtil.java
b/samza-core/src/main/java/org/apache/samza/coordinator/MetadataResourceUtil.java
index 97037a5..148e942 100644
---
a/samza-core/src/main/java/org/apache/samza/coordinator/MetadataResourceUtil.java
+++
b/samza-core/src/main/java/org/apache/samza/coordinator/MetadataResourceUtil.java
@@ -21,7 +21,7 @@ package org.apache.samza.coordinator;
import com.google.common.annotations.VisibleForTesting;
import org.apache.samza.checkpoint.CheckpointManager;
-import org.apache.samza.config.TaskConfigJava;
+import org.apache.samza.config.TaskConfig;
import org.apache.samza.job.model.JobModel;
import org.apache.samza.metrics.MetricsRegistry;
import org.apache.samza.storage.ChangelogStreamManager;
@@ -41,8 +41,8 @@ public class MetadataResourceUtil {
*/
public MetadataResourceUtil(JobModel jobModel, MetricsRegistry
metricsRegistry, ClassLoader classLoader) {
this.jobModel = jobModel;
- this.checkpointManager =
- new
TaskConfigJava(jobModel.getConfig()).getCheckpointManager(metricsRegistry,
classLoader);
+ TaskConfig taskConfig = new TaskConfig(jobModel.getConfig());
+ this.checkpointManager = taskConfig.getCheckpointManager(metricsRegistry,
classLoader).orElse(null);
}
@VisibleForTesting
diff --git
a/samza-core/src/main/java/org/apache/samza/execution/JobNodeConfigurationGenerator.java
b/samza-core/src/main/java/org/apache/samza/execution/JobNodeConfigurationGenerator.java
index 15a1801..fdcce38 100644
---
a/samza-core/src/main/java/org/apache/samza/execution/JobNodeConfigurationGenerator.java
+++
b/samza-core/src/main/java/org/apache/samza/execution/JobNodeConfigurationGenerator.java
@@ -41,7 +41,6 @@ import org.apache.samza.config.SerializerConfig;
import org.apache.samza.config.StorageConfig;
import org.apache.samza.config.StreamConfig;
import org.apache.samza.config.TaskConfig;
-import org.apache.samza.config.TaskConfigJava;
import org.apache.samza.operators.KV;
import org.apache.samza.operators.spec.JoinOperatorSpec;
import org.apache.samza.operators.spec.OperatorSpec;
@@ -164,7 +163,7 @@ import org.slf4j.LoggerFactory;
configureTables(generatedConfig, originalConfig, reachableTables, inputs);
// generate the task.inputs configuration
- generatedConfig.put(TaskConfig.INPUT_STREAMS(),
Joiner.on(',').join(inputs));
+ generatedConfig.put(TaskConfig.INPUT_STREAMS, Joiner.on(',').join(inputs));
LOG.info("Job {} has generated configs {}", jobNode.getJobNameAndId(),
generatedConfig);
@@ -182,11 +181,11 @@ import org.slf4j.LoggerFactory;
if (broadcastStreams.isEmpty()) {
return;
}
- String broadcastInputs =
config.get(TaskConfigJava.BROADCAST_INPUT_STREAMS);
+ String broadcastInputs = config.get(TaskConfig.BROADCAST_INPUT_STREAMS);
if (StringUtils.isNotBlank(broadcastInputs)) {
broadcastStreams.add(broadcastInputs);
}
- configs.put(TaskConfigJava.BROADCAST_INPUT_STREAMS,
Joiner.on(',').join(broadcastStreams));
+ configs.put(TaskConfig.BROADCAST_INPUT_STREAMS,
Joiner.on(',').join(broadcastStreams));
}
private void configureWindowInterval(Map<String, String> configs, Config
config,
@@ -200,7 +199,7 @@ import org.slf4j.LoggerFactory;
long triggerInterval = computeTriggerInterval(reachableOperators);
LOG.info("Using triggering interval: {}", triggerInterval);
- configs.put(TaskConfig.WINDOW_MS(), String.valueOf(triggerInterval));
+ configs.put(TaskConfig.WINDOW_MS, String.valueOf(triggerInterval));
}
/**
diff --git
a/samza-core/src/main/java/org/apache/samza/execution/JobPlanner.java
b/samza-core/src/main/java/org/apache/samza/execution/JobPlanner.java
index f495870..577964e 100644
--- a/samza-core/src/main/java/org/apache/samza/execution/JobPlanner.java
+++ b/samza-core/src/main/java/org/apache/samza/execution/JobPlanner.java
@@ -78,11 +78,11 @@ public abstract class JobPlanner {
// TODO: This should all be consolidated with ExecutionPlanner after
fixing SAMZA-1811
// Don't generate any configurations for LegacyTaskApplications
if (!LegacyTaskApplication.class.isAssignableFrom(appDesc.getAppClass())) {
- if (userConfig.containsKey(TaskConfig.INPUT_STREAMS())) {
+ if (userConfig.containsKey(TaskConfig.INPUT_STREAMS)) {
LOG.warn("SamzaApplications should not specify task.inputs in
configuration. " +
"Specify them using InputDescriptors instead. Ignoring configured
task.inputs value of " +
- userConfig.get(TaskConfig.INPUT_STREAMS()));
- allowedUserConfig.remove(TaskConfig.INPUT_STREAMS());
+ userConfig.get(TaskConfig.INPUT_STREAMS));
+ allowedUserConfig.remove(TaskConfig.INPUT_STREAMS);
}
generatedConfig.putAll(getGeneratedConfig(runId));
}
diff --git
a/samza-core/src/main/java/org/apache/samza/execution/StreamManager.java
b/samza-core/src/main/java/org/apache/samza/execution/StreamManager.java
index 6c308af..fe0d9fd 100644
--- a/samza-core/src/main/java/org/apache/samza/execution/StreamManager.java
+++ b/samza-core/src/main/java/org/apache/samza/execution/StreamManager.java
@@ -30,7 +30,6 @@ import java.util.stream.Collectors;
import org.apache.samza.SamzaException;
import org.apache.samza.checkpoint.CheckpointManager;
-import org.apache.samza.checkpoint.CheckpointManagerFactory;
import org.apache.samza.config.*;
import org.apache.samza.metrics.MetricsRegistryMap;
import org.apache.samza.system.StreamSpec;
@@ -38,13 +37,11 @@ import org.apache.samza.system.SystemAdmin;
import org.apache.samza.system.SystemAdmins;
import org.apache.samza.system.SystemStream;
import org.apache.samza.system.SystemStreamMetadata;
-import org.apache.samza.util.ReflectionUtil;
import org.apache.samza.util.StreamUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.collection.JavaConversions;
-import static org.apache.samza.util.ScalaJavaUtil.defaultValue;
public class StreamManager {
private static final Logger LOGGER =
LoggerFactory.getLogger(StreamManager.class);
@@ -127,14 +124,8 @@ public class StreamManager {
//Find checkpoint stream and clean up
TaskConfig taskConfig = new TaskConfig(prevConfig);
- String checkpointManagerFactoryClassName =
taskConfig.getCheckpointManagerFactory()
- .getOrElse(defaultValue(null));
- if (checkpointManagerFactoryClassName != null) {
- CheckpointManager checkpointManager =
- ReflectionUtil.getObj(classLoader,
checkpointManagerFactoryClassName, CheckpointManagerFactory.class)
- .getCheckpointManager(prevConfig, new MetricsRegistryMap());
- checkpointManager.clearCheckpoints();
- }
+ taskConfig.getCheckpointManager(new MetricsRegistryMap(), classLoader)
+ .ifPresent(CheckpointManager::clearCheckpoints);
//Find changelog streams and remove them
StorageConfig storageConfig = new StorageConfig(prevConfig);
diff --git
a/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java
b/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java
index 6e45a02..3ba200b 100644
--- a/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java
+++ b/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java
@@ -37,7 +37,7 @@ import org.apache.samza.config.Config;
import org.apache.samza.config.JobConfig;
import org.apache.samza.config.JobCoordinatorConfig;
import org.apache.samza.config.MetricsConfig;
-import org.apache.samza.config.TaskConfigJava;
+import org.apache.samza.config.TaskConfig;
import org.apache.samza.container.SamzaContainer;
import org.apache.samza.container.SamzaContainerListener;
import org.apache.samza.context.ApplicationContainerContext;
@@ -247,7 +247,7 @@ public class StreamProcessor {
this.applicationDefinedContainerContextFactoryOptional =
applicationDefinedContainerContextFactoryOptional;
this.applicationDefinedTaskContextFactoryOptional =
applicationDefinedTaskContextFactoryOptional;
this.externalContextOptional = externalContextOptional;
- this.taskShutdownMs = new TaskConfigJava(config).getShutdownMs();
+ this.taskShutdownMs = new TaskConfig(config).getShutdownMs();
this.jobCoordinator = (jobCoordinator != null) ? jobCoordinator :
createJobCoordinator();
this.jobCoordinatorListener = createJobCoordinatorListener();
this.jobCoordinator.setListener(jobCoordinatorListener);
diff --git
a/samza-core/src/main/java/org/apache/samza/util/DiagnosticsUtil.java
b/samza-core/src/main/java/org/apache/samza/util/DiagnosticsUtil.java
index 3c05228..6060720 100644
--- a/samza-core/src/main/java/org/apache/samza/util/DiagnosticsUtil.java
+++ b/samza-core/src/main/java/org/apache/samza/util/DiagnosticsUtil.java
@@ -29,7 +29,7 @@ import org.apache.samza.config.ConfigException;
import org.apache.samza.config.JobConfig;
import org.apache.samza.config.MetricsConfig;
import org.apache.samza.config.SystemConfig;
-import org.apache.samza.config.TaskConfigJava;
+import org.apache.samza.config.TaskConfig;
import org.apache.samza.diagnostics.DiagnosticsManager;
import org.apache.samza.metrics.MetricsRegistryMap;
import org.apache.samza.metrics.reporter.Metrics;
@@ -111,7 +111,7 @@ public class DiagnosticsUtil {
SystemFactory systemFactory =
Util.getObj(diagnosticsSystemFactoryName.get(), SystemFactory.class);
SystemProducer systemProducer =
systemFactory.getProducer(diagnosticsSystemStream.getSystem(), config, new
MetricsRegistryMap());
DiagnosticsManager diagnosticsManager = new DiagnosticsManager(jobName,
jobId, containerId, execEnvContainerId.orElse(""), taskClassVersion,
- samzaVersion, hostName, diagnosticsSystemStream, systemProducer,
Duration.ofMillis(new TaskConfigJava(config).getShutdownMs()));
+ samzaVersion, hostName, diagnosticsSystemStream, systemProducer,
Duration.ofMillis(new TaskConfig(config).getShutdownMs()));
MetricsSnapshotReporter diagnosticsReporter =
new MetricsSnapshotReporter(systemProducer, diagnosticsSystemStream,
publishInterval, jobName, jobId,
diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
index fcc73ef..b0b9836 100644
--- a/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
+++ b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
@@ -34,7 +34,7 @@ import org.apache.samza.config.Config;
import org.apache.samza.config.JobConfig;
import org.apache.samza.config.MapConfig;
import org.apache.samza.config.StorageConfig;
-import org.apache.samza.config.TaskConfigJava;
+import org.apache.samza.config.TaskConfig;
import org.apache.samza.config.ZkConfig;
import org.apache.samza.container.TaskName;
import org.apache.samza.container.grouper.task.GrouperMetadata;
@@ -368,7 +368,7 @@ public class ZkJobCoordinator implements JobCoordinator {
@VisibleForTesting
StreamPartitionCountMonitor getPartitionCountMonitor() {
StreamMetadataCache streamMetadata = new StreamMetadataCache(systemAdmins,
0, SystemClock.instance());
- Set<SystemStream> inputStreamsToMonitor = new
TaskConfigJava(config).getAllInputStreams();
+ Set<SystemStream> inputStreamsToMonitor = new
TaskConfig(config).getAllInputStreams();
return new StreamPartitionCountMonitor(
inputStreamsToMonitor,
diff --git
a/samza-core/src/main/scala/org/apache/samza/checkpoint/CheckpointTool.scala
b/samza-core/src/main/scala/org/apache/samza/checkpoint/CheckpointTool.scala
index 31cbda5..7c0cedd 100644
--- a/samza-core/src/main/scala/org/apache/samza/checkpoint/CheckpointTool.scala
+++ b/samza-core/src/main/scala/org/apache/samza/checkpoint/CheckpointTool.scala
@@ -21,12 +21,12 @@ package org.apache.samza.checkpoint
import java.net.URI
import java.util
+import java.util.function.Supplier
import java.util.regex.Pattern
import joptsimple.ArgumentAcceptingOptionSpec
import joptsimple.OptionSet
import org.apache.samza.checkpoint.CheckpointTool.TaskNameToCheckpointMap
-import org.apache.samza.config.TaskConfig.Config2Task
import org.apache.samza.config._
import org.apache.samza.container.TaskName
import org.apache.samza.job.JobRunner.info
@@ -42,6 +42,7 @@ import
org.apache.samza.coordinator.metadatastore.{CoordinatorStreamStore, Names
import org.apache.samza.coordinator.stream.messages.SetChangelogMapping
import org.apache.samza.execution.JobPlanner
import org.apache.samza.storage.ChangelogStreamManager
+import org.apache.samza.util.ScalaJavaUtil.JavaOptionals
import scala.collection.mutable.ListBuffer
@@ -158,14 +159,12 @@ class CheckpointTool(newOffsets: TaskNameToCheckpointMap,
coordinatorStreamStore
combinedConfigMap.putAll(userDefinedConfig)
val combinedConfig: Config = new MapConfig(combinedConfigMap)
+ val taskConfig = new TaskConfig(combinedConfig)
// Instantiate the checkpoint manager with coordinator stream
configuration.
- val checkpointManager: CheckpointManager =
combinedConfig.getCheckpointManagerFactory() match {
- case Some(className) =>
- ReflectionUtil.getObj(classLoader, className,
classOf[CheckpointManagerFactory])
- .getCheckpointManager(combinedConfig, new MetricsRegistryMap)
- case _ =>
- throw new SamzaException("Configuration: task.checkpoint.factory is
not defined.")
- }
+ val checkpointManager: CheckpointManager =
+ JavaOptionals.toRichOptional(taskConfig.getCheckpointManager(new
MetricsRegistryMap, classLoader))
+ .toOption
+ .getOrElse(throw new SamzaException("Configuration:
task.checkpoint.factory is not defined."))
try {
// Find all the TaskNames that would be generated for this job config
val changelogManager = new ChangelogStreamManager(new
NamespaceAwareCoordinatorStreamStore(coordinatorStreamStore,
SetChangelogMapping.TYPE))
diff --git
a/samza-core/src/main/scala/org/apache/samza/config/RegExTopicGenerator.scala
b/samza-core/src/main/scala/org/apache/samza/config/RegExTopicGenerator.scala
index 5af55c7..3dcdf05 100644
---
a/samza-core/src/main/scala/org/apache/samza/config/RegExTopicGenerator.scala
+++
b/samza-core/src/main/scala/org/apache/samza/config/RegExTopicGenerator.scala
@@ -21,9 +21,8 @@ package org.apache.samza.config
import org.apache.samza.SamzaException
import org.apache.samza.config.JobConfig.REGEX_RESOLVED_STREAMS
-import org.apache.samza.config.TaskConfig.Config2Task
-import org.apache.samza.system.{StreamMetadataCache, SystemAdmins,
SystemStream}
-import org.apache.samza.util.{Logging, StreamUtil, SystemClock}
+import org.apache.samza.system.SystemStream
+import org.apache.samza.util.{Logging, StreamUtil}
import scala.collection.JavaConverters._
import scala.collection._
@@ -56,7 +55,8 @@ class RegExTopicGenerator extends ConfigRewriter with Logging
{
.getRegexResolvedSystem(rewriterName)
.getOrElse(throw new SamzaException("No system defined for %s." format
rewriterName))
val topics = getTopicsFromSystemAdmin(rewriterName, config)
- val existingInputStreams = config.getInputStreams
+ val taskConfig = new TaskConfig(config)
+ val existingInputStreams =
JavaConverters.asScalaSetConverter(taskConfig.getInputStreams).asScala.toSet
val newInputStreams = new mutable.HashSet[SystemStream]
val keysAndValsToAdd = new mutable.HashMap[String, String]
diff --git a/samza-core/src/main/scala/org/apache/samza/config/TaskConfig.scala
b/samza-core/src/main/scala/org/apache/samza/config/TaskConfig.scala
deleted file mode 100644
index a887355..0000000
--- a/samza-core/src/main/scala/org/apache/samza/config/TaskConfig.scala
+++ /dev/null
@@ -1,157 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.config
-
-import org.apache.samza.checkpoint.CheckpointManager
-import org.apache.samza.metrics.MetricsRegistry
-import org.apache.samza.system.SystemStream
-import org.apache.samza.util.{Logging, StreamUtil}
-
-object TaskConfig {
- // task config constants
- val INPUT_STREAMS = "task.inputs" // streaming.input-streams
- val WINDOW_MS = "task.window.ms" // window period in milliseconds
- val COMMIT_MS = "task.commit.ms" // commit period in milliseconds
- val SHUTDOWN_MS = "task.shutdown.ms" // how long to wait for a clean shutdown
- val TASK_CLASS = "task.class" // streaming.task-factory-class
- val COMMAND_BUILDER = "task.command.class" // streaming.task-factory-class
- val LIFECYCLE_LISTENERS = "task.lifecycle.listeners" // li-generator,foo
- val LIFECYCLE_LISTENER = "task.lifecycle.listener.%s.class" //
task.lifecycle.listener.li-generator.class
- val CHECKPOINT_MANAGER_FACTORY = TaskConfigJava.CHECKPOINT_MANAGER_FACTORY
// class name to use when sending offset checkpoints
- val MESSAGE_CHOOSER_CLASS_NAME = "task.chooser.class"
- val DROP_DESERIALIZATION_ERRORS = "task.drop.deserialization.errors" //
define whether drop the messages or not when deserialization fails
- val DROP_SERIALIZATION_ERRORS = "task.drop.serialization.errors" // define
whether drop the messages or not when serialization fails
- val DROP_PRODUCER_ERRORS = "task.drop.producer.errors" // whether to ignore
producer errors and drop the messages that failed to send
- val IGNORED_EXCEPTIONS = "task.ignored.exceptions" // exceptions to ignore
in process and window
- val GROUPER_FACTORY = "task.name.grouper.factory" // class name for task
grouper
- val MAX_CONCURRENCY = "task.max.concurrency" // max number of concurrent
process for a AsyncStreamTask
- val CALLBACK_TIMEOUT_MS = "task.callback.timeout.ms" // timeout period for
triggering a callback
- val ASYNC_COMMIT = "task.async.commit" // to enable async commit in a
AsyncStreamTask
- val MAX_IDLE_MS = "task.max.idle.ms" // maximum time to wait for a task
worker to complete when there are no new messages to handle
-
- val DEFAULT_WINDOW_MS: Long = -1L
- val DEFAULT_COMMIT_MS = 60000L
- val DEFAULT_CALLBACK_TIMEOUT_MS: Long = -1L
- val DEFAULT_MAX_CONCURRENCY: Int = 1
- val DEFAULT_MAX_IDLE_MS: Long = 10
-
- /**
- * Samza's container polls for more messages under two conditions. The first
- * condition arises when there are simply no remaining buffered messages to
- * process for any input SystemStreamPartition. The second condition arises
- * when some input SystemStreamPartitions have empty buffers, but some do
- * not. In the latter case, a polling interval is defined to determine how
- * often to refresh the empty SystemStreamPartition buffers. By default,
- * this interval is 50ms, which means that any empty SystemStreamPartition
- * buffer will be refreshed at least every 50ms. A higher value here means
- * that empty SystemStreamPartitions will be refreshed less often, which
- * means more latency is introduced, but less CPU and network will be used.
- * Decreasing this value means that empty SystemStreamPartitions are
- * refreshed more frequently, thereby introducing less latency, but
- * increasing CPU and network utilization.
- */
- val POLL_INTERVAL_MS = "task.poll.interval.ms"
-
- implicit def Config2Task(config: Config) = new TaskConfig(config)
-}
-
-class TaskConfig(config: Config) extends ScalaMapConfig(config) with Logging {
- val javaTaskConfig = new TaskConfigJava(config)
-
- def getInputStreams = getOption(TaskConfig.INPUT_STREAMS) match {
- case Some(streams) => if (streams.length > 0) {
- streams.split(",").map(systemStreamNames => {
- StreamUtil.getSystemStreamFromNames(systemStreamNames.trim)
- }).toSet
- } else {
- Set[SystemStream]()
- }
- case _ => Set[SystemStream]()
- }
-
- def getWindowMs: Long = getOption(TaskConfig.WINDOW_MS) match {
- case Some(ms) => ms.toLong
- case _ => TaskConfig.DEFAULT_WINDOW_MS
- }
-
- def getCommitMs: Long = getOption(TaskConfig.COMMIT_MS) match {
- case Some(ms) => ms.toLong
- case _ => TaskConfig.DEFAULT_COMMIT_MS
- }
-
- def getShutdownMs: Option[Long] = getOption(TaskConfig.SHUTDOWN_MS) match {
- case Some(ms) => Some(ms.toLong)
- case _ => None
- }
-
- def getTaskClass = getOption(TaskConfig.TASK_CLASS)
-
- def getCommandClass = getOption(TaskConfig.COMMAND_BUILDER)
-
- def getCommandClass(defaultValue: String) =
getOrElse(TaskConfig.COMMAND_BUILDER, defaultValue)
-
- def getCheckpointManagerFactory() =
Option(javaTaskConfig.getCheckpointManagerFactoryName)
-
- def getMessageChooserClass = getOption(TaskConfig.MESSAGE_CHOOSER_CLASS_NAME)
-
- def getDropDeserializationErrors =
getBoolean(TaskConfig.DROP_DESERIALIZATION_ERRORS, false)
-
- def getDropSerializationErrors =
getBoolean(TaskConfig.DROP_SERIALIZATION_ERRORS, false)
-
- def getDropProducerErrors = getBoolean(TaskConfig.DROP_PRODUCER_ERRORS,
false)
-
- def getPollIntervalMs = getOption(TaskConfig.POLL_INTERVAL_MS)
-
- def getIgnoredExceptions = getOption(TaskConfig.IGNORED_EXCEPTIONS)
-
- def getTaskNameGrouperFactory = {
- getOption(TaskConfig.GROUPER_FACTORY) match {
- case Some(grouperFactory) => grouperFactory
- case _ =>
- info("No %s configuration, using
'org.apache.samza.container.grouper.task.GroupByContainerCountFactory'" format
TaskConfig.GROUPER_FACTORY)
- "org.apache.samza.container.grouper.task.GroupByContainerCountFactory"
- }
- }
-
- def getMaxConcurrency: Int = getOption(TaskConfig.MAX_CONCURRENCY) match {
- case Some(count) => count.toInt
- case _ => TaskConfig.DEFAULT_MAX_CONCURRENCY
- }
-
- def getCallbackTimeoutMs: Long = getOption(TaskConfig.CALLBACK_TIMEOUT_MS)
match {
- case Some(ms) => ms.toLong
- case _ => TaskConfig.DEFAULT_CALLBACK_TIMEOUT_MS
- }
-
- def getAsyncCommit: Boolean = getOption(TaskConfig.ASYNC_COMMIT) match {
- case Some(asyncCommit) => asyncCommit.toBoolean
- case _ => false
- }
-
- def isAutoCommitEnabled: Boolean = getOption(TaskConfig.COMMIT_MS) match {
- case Some(commitMs) => commitMs.toInt > 0
- case _ => TaskConfig.DEFAULT_COMMIT_MS > 0
- }
-
- def getMaxIdleMs: Long = getOption(TaskConfig.MAX_IDLE_MS) match {
- case Some(ms) => ms.toLong
- case _ => TaskConfig.DEFAULT_MAX_IDLE_MS
- }
-}
diff --git
a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
index e1cd92a..9734cb5 100644
--- a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
+++ b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
@@ -21,7 +21,6 @@ package org.apache.samza.container
import java.io.File
import java.lang.management.ManagementFactory
-import java.lang.reflect.InvocationTargetException
import java.net.{URL, UnknownHostException}
import java.nio.file.Path
import java.time.Duration
@@ -31,12 +30,11 @@ import java.util.concurrent.{ExecutorService, Executors,
ScheduledExecutorServic
import com.google.common.annotations.VisibleForTesting
import com.google.common.util.concurrent.ThreadFactoryBuilder
-import org.apache.samza.checkpoint.{CheckpointListener,
CheckpointManagerFactory, OffsetManager, OffsetManagerMetrics}
+import org.apache.samza.checkpoint.{CheckpointListener, OffsetManager,
OffsetManagerMetrics}
import org.apache.samza.config.JobConfig.Config2Job
import org.apache.samza.config.MetricsConfig.Config2Metrics
import org.apache.samza.config.SerializerConfig.Config2Serializer
import org.apache.samza.config.StreamConfig.Config2Stream
-import org.apache.samza.config.TaskConfig.Config2Task
import org.apache.samza.config._
import org.apache.samza.container.disk.DiskSpaceMonitor.Listener
import org.apache.samza.container.disk.{DiskQuotaPolicyFactory,
DiskSpaceMonitor, NoThrottlingDiskQuotaPolicyFactory,
PollingScanDiskSpaceMonitor}
@@ -50,7 +48,7 @@ import org.apache.samza.serializers.model.SamzaObjectMapper
import org.apache.samza.startpoint.StartpointManager
import org.apache.samza.storage._
import org.apache.samza.system._
-import org.apache.samza.system.chooser.{DefaultChooser, MessageChooserFactory,
RoundRobinChooserFactory}
+import org.apache.samza.system.chooser.{DefaultChooser, MessageChooserFactory}
import org.apache.samza.table.TableManager
import org.apache.samza.task._
import org.apache.samza.util.ScalaJavaUtil.JavaOptionals
@@ -403,7 +401,8 @@ object SamzaContainer extends Logging {
info("Setting up message chooser.")
- val chooserFactoryClassName =
config.getMessageChooserClass.getOrElse(classOf[RoundRobinChooserFactory].getName)
+ val taskConfig = new TaskConfig(config)
+ val chooserFactoryClassName = taskConfig.getMessageChooserClass
val chooserFactory = ReflectionUtil.getObj(classLoader,
chooserFactoryClassName, classOf[MessageChooserFactory])
@@ -424,11 +423,7 @@ object SamzaContainer extends Logging {
}
info("Got security manager: %s" format securityManager)
- val checkpointManager = config.getCheckpointManagerFactory()
- .filterNot(_.isEmpty)
- .map(ReflectionUtil.getObj(classLoader, _,
classOf[CheckpointManagerFactory])
- .getCheckpointManager(config, samzaContainerMetrics.registry))
- .orNull
+ val checkpointManager =
taskConfig.getCheckpointManager(samzaContainerMetrics.registry,
classLoader).orElse(null)
info("Got checkpoint manager: %s" format checkpointManager)
// create a map of consumers with callbacks to pass to the OffsetManager
@@ -439,13 +434,10 @@ object SamzaContainer extends Logging {
val offsetManager = OffsetManager(inputStreamMetadata, config,
checkpointManager, startpointManager, systemAdmins, checkpointListeners,
offsetManagerMetrics)
info("Got offset manager: %s" format offsetManager)
- val dropDeserializationError = config.getDropDeserializationErrors
- val dropSerializationError = config.getDropSerializationErrors
+ val dropDeserializationError = taskConfig.getDropDeserializationErrors
+ val dropSerializationError = taskConfig.getDropSerializationErrors
- val pollIntervalMs = config
- .getPollIntervalMs
- .getOrElse(SystemConsumers.DEFAULT_POLL_INTERVAL_MS.toString)
- .toInt
+ val pollIntervalMs = taskConfig.getPollIntervalMs
val consumerMultiplexer = new SystemConsumers(
chooser = chooser,
@@ -585,7 +577,7 @@ object SamzaContainer extends Logging {
storageManager = storageManager,
tableManager = tableManager,
systemStreamPartitions = taskSSPs -- taskSideInputSSPs,
- exceptionHandler =
TaskInstanceExceptionHandler(taskInstanceMetrics.get(taskName).get, config),
+ exceptionHandler =
TaskInstanceExceptionHandler(taskInstanceMetrics.get(taskName).get, taskConfig),
jobModel = jobModel,
streamMetadataCache = streamMetadataCache,
timerExecutor = timerExecutor,
@@ -609,7 +601,7 @@ object SamzaContainer extends Logging {
taskThreadPool,
maxThrottlingDelayMs,
samzaContainerMetrics,
- config,
+ taskConfig,
clock)
val memoryStatisticsMonitor : SystemStatisticsMonitor = new
StatisticsMonitorImpl()
@@ -713,7 +705,8 @@ class SamzaContainer(
containerStorageManager: ContainerStorageManager,
diagnosticsManager: Option[DiagnosticsManager] = Option.empty) extends
Runnable with Logging {
- val shutdownMs =
config.getShutdownMs.getOrElse(TaskConfigJava.DEFAULT_TASK_SHUTDOWN_MS)
+ private val taskConfig = new TaskConfig(config)
+ val shutdownMs: Long = taskConfig.getShutdownMs
var shutdownHookThread: Thread = null
var jmxServer: JmxServer = null
diff --git
a/samza-core/src/main/scala/org/apache/samza/container/TaskInstanceExceptionHandler.scala
b/samza-core/src/main/scala/org/apache/samza/container/TaskInstanceExceptionHandler.scala
index 99b729f..7216b4c 100644
---
a/samza-core/src/main/scala/org/apache/samza/container/TaskInstanceExceptionHandler.scala
+++
b/samza-core/src/main/scala/org/apache/samza/container/TaskInstanceExceptionHandler.scala
@@ -19,11 +19,11 @@
package org.apache.samza.container
-import org.apache.samza.config.Config
-import org.apache.samza.config.TaskConfig.Config2Task
+import org.apache.samza.config.TaskConfig
import org.apache.samza.metrics.Counter
import org.apache.samza.metrics.MetricsHelper
import org.apache.samza.util.Logging
+import org.apache.samza.util.ScalaJavaUtil.JavaOptionals
/**
* Handles exceptions thrown in a {@link TaskInstance}'s process or window
@@ -88,16 +88,15 @@ object TaskInstanceExceptionHandler {
* Creates a new TaskInstanceExceptionHandler using the provided
* configuration.
*
- * @param metrics The {@link TaskInstanceMetrics} used to track exception
- * counts.
- * @param config The configuration to read the list of ignored exceptions
- * from.
+ * @param metrics The {@link TaskInstanceMetrics} used to track exception
counts.
+ * @param taskConfig The configuration to read the list of ignored
exceptions from.
*/
- def apply(metrics: MetricsHelper, config: Config) =
+ def apply(metrics: MetricsHelper, taskConfig: TaskConfig) = {
new TaskInstanceExceptionHandler(
metrics = metrics,
- ignoredExceptions = config.getIgnoredExceptions match {
+ ignoredExceptions =
JavaOptionals.toRichOptional(taskConfig.getIgnoredExceptions).toOption match {
case Some(exceptions) => exceptions.split(",").toSet
case _ => Set[String]()
})
+ }
}
diff --git
a/samza-core/src/main/scala/org/apache/samza/coordinator/JobModelManager.scala
b/samza-core/src/main/scala/org/apache/samza/coordinator/JobModelManager.scala
index 06fdced..04a3ec2 100644
---
a/samza-core/src/main/scala/org/apache/samza/coordinator/JobModelManager.scala
+++
b/samza-core/src/main/scala/org/apache/samza/coordinator/JobModelManager.scala
@@ -25,7 +25,6 @@ import java.util.concurrent.atomic.AtomicReference
import org.apache.samza.{Partition, SamzaException}
import org.apache.samza.config._
import org.apache.samza.config.JobConfig.Config2Job
-import org.apache.samza.config.TaskConfig.Config2Task
import org.apache.samza.config.Config
import org.apache.samza.container.grouper.stream.SSPGrouperProxy
import
org.apache.samza.container.grouper.stream.SystemStreamPartitionGrouperFactory
@@ -50,6 +49,7 @@ import org.apache.samza.runtime.LocationId
import org.apache.samza.system._
import org.apache.samza.util.{Logging, ReflectionUtil, Util}
+import scala.collection.JavaConverters
import scala.collection.JavaConversions._
import scala.collection.JavaConverters._
@@ -270,7 +270,6 @@ object JobModelManager extends Logging {
*/
private def getInputStreamPartitions(config: Config, streamMetadataCache:
StreamMetadataCache): Set[SystemStreamPartition] = {
-
def invokeRegexTopicRewriter(config: Config): Config = {
config.getConfigRewriters match {
case Some(rewriters) => rewriters.split(",").
@@ -282,8 +281,11 @@ object JobModelManager extends Logging {
}
}
+ val configAfterRegexTopicRewrite = invokeRegexTopicRewriter(config)
+ val taskConfigAfterRegexTopicRewrite = new
TaskConfig(configAfterRegexTopicRewrite)
// Expand regex input, if a regex-rewriter is defined in config
- val inputSystemStreams = invokeRegexTopicRewriter(config).getInputStreams
+ val inputSystemStreams =
+
JavaConverters.asScalaSetConverter(taskConfigAfterRegexTopicRewrite.getInputStreams).asScala.toSet
// Get the set of partitions for each SystemStream from the stream metadata
streamMetadataCache
@@ -353,6 +355,7 @@ object JobModelManager extends Logging {
streamMetadataCache: StreamMetadataCache,
grouperMetadata: GrouperMetadata,
classLoader: ClassLoader): JobModel = {
+ val taskConfig = new TaskConfig(config)
// Do grouping to fetch TaskName to SSP mapping
val allSystemStreamPartitions = getMatchedInputStreamPartitions(config,
streamMetadataCache, classLoader)
@@ -397,7 +400,7 @@ object JobModelManager extends Logging {
// Here is where we should put in a pluggable option for the
// SSPTaskNameGrouper for locality, load-balancing, etc.
val containerGrouperFactory =
- ReflectionUtil.getObj(classLoader, config.getTaskNameGrouperFactory,
classOf[TaskNameGrouperFactory])
+ ReflectionUtil.getObj(classLoader, taskConfig.getTaskNameGrouperFactory,
classOf[TaskNameGrouperFactory])
val standbyTasksEnabled = new JobConfig(config).getStandbyTasksEnabled
val standbyTaskReplicationFactor = new
JobConfig(config).getStandbyTaskReplicationFactor
val taskNameGrouperProxy = new
TaskNameGrouperProxy(containerGrouperFactory.build(config),
standbyTasksEnabled, standbyTaskReplicationFactor)
diff --git
a/samza-core/src/main/scala/org/apache/samza/job/local/ProcessJobFactory.scala
b/samza-core/src/main/scala/org/apache/samza/job/local/ProcessJobFactory.scala
index 72a0fff..665cd15 100644
---
a/samza-core/src/main/scala/org/apache/samza/job/local/ProcessJobFactory.scala
+++
b/samza-core/src/main/scala/org/apache/samza/job/local/ProcessJobFactory.scala
@@ -22,8 +22,7 @@ package org.apache.samza.job.local
import java.util
import org.apache.samza.SamzaException
-import org.apache.samza.config.TaskConfig._
-import org.apache.samza.config.{Config, JobConfig}
+import org.apache.samza.config.{Config, JobConfig, TaskConfig}
import org.apache.samza.container.TaskName
import org.apache.samza.coordinator.metadatastore.{CoordinatorStreamStore,
NamespaceAwareCoordinatorStreamStore}
import org.apache.samza.coordinator.stream.messages.SetChangelogMapping
@@ -88,8 +87,9 @@ class ProcessJobFactory extends StreamJobFactory with Logging
{
val fwkPath = JobConfig.getFwkPath(config) // see if split deployment is
configured
info("Process job. using fwkPath = " + fwkPath)
- val commandBuilderClass =
config.getCommandClass(classOf[ShellCommandBuilder].getName)
- info("Using command builder class " + commandBuilderClass)
+ val taskConfig = new TaskConfig(config)
+ val commandBuilderClass =
taskConfig.getCommandClass(classOf[ShellCommandBuilder].getName)
+ info("Using command builder class %s" format commandBuilderClass)
val commandBuilder = ReflectionUtil.getObj(classLoader,
commandBuilderClass, classOf[CommandBuilder])
// JobCoordinator is stopped by ProcessJob when it exits
diff --git
a/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala
b/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala
index b698583..f9116fe 100644
---
a/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala
+++
b/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala
@@ -23,7 +23,7 @@ import org.apache.samza.application.ApplicationUtil
import org.apache.samza.application.descriptors.ApplicationDescriptorUtil
import org.apache.samza.config.JobConfig._
import org.apache.samza.config.ShellCommandConfig._
-import org.apache.samza.config.{Config, JobConfig, TaskConfigJava}
+import org.apache.samza.config.{Config, JobConfig, TaskConfig}
import org.apache.samza.container.{SamzaContainer, SamzaContainerListener,
TaskName}
import org.apache.samza.context.{ExternalContext, JobContextImpl}
import org.apache.samza.coordinator.{JobModelManager, MetadataResourceUtil}
diff --git
a/samza-core/src/main/scala/org/apache/samza/metrics/reporter/MetricsSnapshotReporterFactory.scala
b/samza-core/src/main/scala/org/apache/samza/metrics/reporter/MetricsSnapshotReporterFactory.scala
index 957cd6f..74ffcc8 100644
---
a/samza-core/src/main/scala/org/apache/samza/metrics/reporter/MetricsSnapshotReporterFactory.scala
+++
b/samza-core/src/main/scala/org/apache/samza/metrics/reporter/MetricsSnapshotReporterFactory.scala
@@ -21,12 +21,11 @@ package org.apache.samza.metrics.reporter
import org.apache.samza.util.{Logging, StreamUtil, Util}
import org.apache.samza.SamzaException
-import org.apache.samza.config.{ApplicationConfig, Config, SystemConfig}
+import org.apache.samza.config.{Config, SystemConfig}
import org.apache.samza.config.JobConfig.Config2Job
import org.apache.samza.config.MetricsConfig.Config2Metrics
import org.apache.samza.config.StreamConfig.Config2Stream
import org.apache.samza.config.SerializerConfig.Config2Serializer
-import org.apache.samza.config.TaskConfig.Config2Task
import org.apache.samza.metrics.MetricsReporter
import org.apache.samza.metrics.MetricsReporterFactory
import org.apache.samza.metrics.MetricsRegistryMap
diff --git
a/samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManager.java
b/samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManager.java
index fbaf23d..980b41f 100644
---
a/samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManager.java
+++
b/samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManager.java
@@ -250,7 +250,7 @@ public class ContainerStorageManager {
sideInputSystemConsumers =
new SystemConsumers(chooser,
ScalaJavaUtil.toScalaMap(this.sideInputConsumers), systemAdmins, serdeManager,
sideInputSystemConsumersMetrics,
SystemConsumers.DEFAULT_NO_NEW_MESSAGES_TIMEOUT(),
SystemConsumers.DEFAULT_DROP_SERIALIZATION_ERROR(),
- SystemConsumers.DEFAULT_POLL_INTERVAL_MS(),
ScalaJavaUtil.toScalaFunction(() -> System.nanoTime()));
+ TaskConfig.DEFAULT_POLL_INTERVAL_MS,
ScalaJavaUtil.toScalaFunction(() -> System.nanoTime()));
}
}
@@ -661,6 +661,7 @@ public class ContainerStorageManager {
getSideInputStorageManagers().forEach(sideInputStorageManager ->
sideInputStorageManager.init());
// start the checkpointing thread at the commit-ms frequency
+ TaskConfig taskConfig = new TaskConfig(config);
sideInputsFlushFuture = sideInputsFlushExecutor.scheduleWithFixedDelay(new
Runnable() {
@Override
public void run() {
@@ -671,7 +672,7 @@ public class ContainerStorageManager {
sideInputException = Optional.of(e);
}
}
- }, 0, new TaskConfig(config).getCommitMs(), TimeUnit.MILLISECONDS);
+ }, 0, taskConfig.getCommitMs(), TimeUnit.MILLISECONDS);
// set the latch to the number of sideInput SSPs
this.sideInputsCaughtUp = new
CountDownLatch(this.sideInputStorageManagers.keySet().size());
diff --git
a/samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala
b/samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala
index 563a104..b41c245 100644
--- a/samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala
+++ b/samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala
@@ -28,14 +28,15 @@ import java.util.HashMap
import java.util.HashSet
import java.util.Queue
import java.util.Set
+
import scala.collection.JavaConverters._
import org.apache.samza.serializers.SerdeManager
import org.apache.samza.util.{Logging, TimerUtil}
import org.apache.samza.system.chooser.MessageChooser
import org.apache.samza.SamzaException
+import org.apache.samza.config.TaskConfig
object SystemConsumers {
- val DEFAULT_POLL_INTERVAL_MS = 50
val DEFAULT_NO_NEW_MESSAGES_TIMEOUT = 10
val DEFAULT_DROP_SERIALIZATION_ERROR = false
}
@@ -105,7 +106,7 @@ class SystemConsumers (
* with no remaining unprocessed messages, the SystemConsumers will poll for
* it within 50ms of its availability in the stream system.</p>
*/
- val pollIntervalMs: Int = SystemConsumers.DEFAULT_POLL_INTERVAL_MS,
+ val pollIntervalMs: Int = TaskConfig.DEFAULT_POLL_INTERVAL_MS,
/**
* Clock can be used to inject a custom clock when mocking this class in
diff --git
a/samza-core/src/main/scala/org/apache/samza/system/chooser/DefaultChooser.scala
b/samza-core/src/main/scala/org/apache/samza/system/chooser/DefaultChooser.scala
index 9b77a8c..c5e102a 100644
---
a/samza-core/src/main/scala/org/apache/samza/system/chooser/DefaultChooser.scala
+++
b/samza-core/src/main/scala/org/apache/samza/system/chooser/DefaultChooser.scala
@@ -20,7 +20,7 @@
package org.apache.samza.system.chooser
import org.apache.samza.SamzaException
-import org.apache.samza.config.{Config, DefaultChooserConfig, TaskConfigJava}
+import org.apache.samza.config.{Config, DefaultChooserConfig, TaskConfig}
import org.apache.samza.metrics.{MetricsRegistry, MetricsRegistryMap}
import org.apache.samza.system._
import org.apache.samza.util.Logging
@@ -40,7 +40,7 @@ object DefaultChooser extends Logging {
debug("Got batch size: %s" format batchSize)
// Normal streams default to priority 0.
- val defaultPrioritizedStreams = new TaskConfigJava(config)
+ val defaultPrioritizedStreams = new TaskConfig(config)
.getAllInputStreams.asScala
.map((_, 0))
.toMap
diff --git a/samza-core/src/main/scala/org/apache/samza/util/Util.scala
b/samza-core/src/main/scala/org/apache/samza/util/Util.scala
index e31d82f..7ee96de 100644
--- a/samza-core/src/main/scala/org/apache/samza/util/Util.scala
+++ b/samza-core/src/main/scala/org/apache/samza/util/Util.scala
@@ -30,11 +30,11 @@ import java.net.InetAddress
import java.net.NetworkInterface
import java.util.Random
-
import scala.collection.JavaConverters._
object Util extends Logging {
+ private val FALLBACK_VERSION = "0.0.1"
val Random = new Random
/**
@@ -72,20 +72,29 @@ object Util extends Logging {
def getSamzaVersion(): String = {
Option(this.getClass.getPackage.getImplementationVersion)
.getOrElse({
- warn("Unable to find implementation samza version in jar's meta info.
Defaulting to 0.0.1.")
- "0.0.1"
+ warn("Unable to find implementation samza version in jar's meta info.
Defaulting to %s" format FALLBACK_VERSION)
+ FALLBACK_VERSION
})
}
def getTaskClassVersion(config: Config): String = {
try {
- val taskClass = Option(new ApplicationConfig(config).getAppClass())
- .orElse(new TaskConfig(config).getTaskClass).get
- Class.forName(taskClass).getPackage.getImplementationVersion
+ val appClass = Option(new ApplicationConfig(config).getAppClass)
+ if (appClass.isDefined) {
+ Class.forName(appClass.get).getPackage.getImplementationVersion
+ } else {
+ val taskClass = new TaskConfig(config).getTaskClass
+ if (taskClass.isPresent) {
+ Class.forName(taskClass.get()).getPackage.getImplementationVersion
+ } else {
+ warn("Unable to find app class or task class. Defaulting to %s"
format FALLBACK_VERSION)
+ FALLBACK_VERSION
+ }
+ }
} catch {
case e: Exception => {
- warn("Unable to find implementation version in jar's meta info.
Defaulting to 0.0.1.")
- "0.0.1"
+ warn("Unable to find implementation version in jar's meta info.
Defaulting to %s" format FALLBACK_VERSION)
+ FALLBACK_VERSION
}
}
}
diff --git
a/samza-core/src/test/java/org/apache/samza/application/TestApplicationUtil.java
b/samza-core/src/test/java/org/apache/samza/application/TestApplicationUtil.java
index f620ece..1d0bd88 100644
---
a/samza-core/src/test/java/org/apache/samza/application/TestApplicationUtil.java
+++
b/samza-core/src/test/java/org/apache/samza/application/TestApplicationUtil.java
@@ -24,9 +24,9 @@ import org.apache.samza.config.ApplicationConfig;
import org.apache.samza.config.Config;
import org.apache.samza.config.ConfigException;
import org.apache.samza.config.MapConfig;
-import org.apache.samza.config.TaskConfig;
import org.apache.samza.application.descriptors.TaskApplicationDescriptor;
import org.apache.samza.application.descriptors.TaskApplicationDescriptorImpl;
+import org.apache.samza.config.TaskConfig;
import org.apache.samza.task.MockStreamTask;
import org.junit.Test;
@@ -45,7 +45,7 @@ public class TestApplicationUtil {
SamzaApplication app = ApplicationUtil.fromConfig(new
MapConfig(configMap));
assertTrue(app instanceof MockStreamApplication);
- configMap.put(TaskConfig.TASK_CLASS(), MockStreamTask.class.getName());
+ configMap.put(TaskConfig.TASK_CLASS, MockStreamTask.class.getName());
app = ApplicationUtil.fromConfig(new MapConfig(configMap));
assertTrue(app instanceof MockStreamApplication);
}
@@ -57,7 +57,7 @@ public class TestApplicationUtil {
SamzaApplication app = ApplicationUtil.fromConfig(new
MapConfig(configMap));
assertTrue(app instanceof MockTaskApplication);
- configMap.put(TaskConfig.TASK_CLASS(), MockStreamTask.class.getName());
+ configMap.put(TaskConfig.TASK_CLASS, MockStreamTask.class.getName());
app = ApplicationUtil.fromConfig(new MapConfig(configMap));
assertTrue(app instanceof MockTaskApplication);
}
@@ -65,7 +65,7 @@ public class TestApplicationUtil {
@Test
public void testTaskClassOnly() {
Map<String, String> configMap = new HashMap<>();
- configMap.put(TaskConfig.TASK_CLASS(), MockStreamTask.class.getName());
+ configMap.put(TaskConfig.TASK_CLASS, MockStreamTask.class.getName());
Config config = new MapConfig(configMap);
SamzaApplication app = ApplicationUtil.fromConfig(config);
assertTrue(app instanceof TaskApplication);
@@ -76,7 +76,7 @@ public class TestApplicationUtil {
@Test(expected = ConfigException.class)
public void testEmptyTaskClassOnly() {
Map<String, String> configMap = new HashMap<>();
- configMap.put(TaskConfig.TASK_CLASS(), "");
+ configMap.put(TaskConfig.TASK_CLASS, "");
ApplicationUtil.fromConfig(new MapConfig(configMap));
}
diff --git
a/samza-core/src/test/java/org/apache/samza/config/TestTaskConfig.java
b/samza-core/src/test/java/org/apache/samza/config/TestTaskConfig.java
new file mode 100644
index 0000000..be3e4d5
--- /dev/null
+++ b/samza-core/src/test/java/org/apache/samza/config/TestTaskConfig.java
@@ -0,0 +1,367 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.config;
+
+import static org.junit.Assert.*;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Optional;
+import java.util.Set;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+import org.apache.samza.Partition;
+import org.apache.samza.checkpoint.Checkpoint;
+import org.apache.samza.checkpoint.CheckpointManager;
+import org.apache.samza.checkpoint.CheckpointManagerFactory;
+import org.apache.samza.container.TaskName;
+import org.apache.samza.container.grouper.task.GroupByContainerCountFactory;
+import org.apache.samza.metrics.MetricsRegistry;
+import org.apache.samza.system.SystemStream;
+import org.apache.samza.system.SystemStreamPartition;
+import org.apache.samza.system.chooser.RoundRobinChooserFactory;
+import org.junit.Test;
+
+
+public class TestTaskConfig {
+ @Test
+ public void testGetInputStreams() {
+ Config config = new MapConfig(
+ ImmutableMap.of(TaskConfig.INPUT_STREAMS, "kafka.foo, kafka.bar,
otherKafka.bar, otherKafka.foo.bar"));
+ Set<SystemStream> expected = ImmutableSet.of(
+ new SystemStream("kafka", "foo"),
+ new SystemStream("kafka", "bar"),
+ new SystemStream("otherKafka", "bar"),
+ new SystemStream("otherKafka", "foo.bar"));
+ assertEquals(expected, new TaskConfig(config).getAllInputStreams());
+
+ // empty string for value
+ MapConfig configEmptyInput = new
MapConfig(ImmutableMap.of(TaskConfig.INPUT_STREAMS, ""));
+ assertTrue(new TaskConfig(configEmptyInput).getInputStreams().isEmpty());
+ // config not specified
+ assertTrue(new TaskConfig(new MapConfig()).getInputStreams().isEmpty());
+ }
+
+ @Test
+ public void testGetWindowMs() {
+ Config config = new MapConfig(ImmutableMap.of(TaskConfig.WINDOW_MS, "10"));
+ assertEquals(10, new TaskConfig(config).getWindowMs());
+
+ config = new MapConfig(ImmutableMap.of(TaskConfig.WINDOW_MS, "-1"));
+ assertEquals(-1, new TaskConfig(config).getWindowMs());
+
+ // config not specified
+ assertEquals(TaskConfig.DEFAULT_WINDOW_MS, new TaskConfig(new
MapConfig()).getWindowMs());
+ }
+
+ @Test
+ public void testGetCommitMs() {
+ Config config = new MapConfig(ImmutableMap.of(TaskConfig.COMMIT_MS, "10"));
+ assertEquals(10, new TaskConfig(config).getCommitMs());
+
+ config = new MapConfig(ImmutableMap.of(TaskConfig.COMMIT_MS, "-1"));
+ assertEquals(-1, new TaskConfig(config).getCommitMs());
+
+ // config not specified
+ assertEquals(TaskConfig.DEFAULT_COMMIT_MS, new TaskConfig(new
MapConfig()).getCommitMs());
+ }
+
+ @Test
+ public void testGetTaskClass() {
+ String taskClass = "some.task.class";
+ Config config = new MapConfig(ImmutableMap.of(TaskConfig.TASK_CLASS,
taskClass));
+ assertEquals(Optional.of(taskClass), new
TaskConfig(config).getTaskClass());
+
+ // config not specified
+ assertFalse(new TaskConfig(new MapConfig()).getTaskClass().isPresent());
+ }
+
+ @Test
+ public void testGetCommandClass() {
+ String commandClass = "some.command.class";
+ String defaultCommandClass = "default.command.class";
+ Config config = new MapConfig(ImmutableMap.of(TaskConfig.COMMAND_BUILDER,
commandClass));
+ assertEquals(commandClass, new
TaskConfig(config).getCommandClass(defaultCommandClass));
+
+ // config not specified
+ assertEquals(defaultCommandClass, new TaskConfig(new
MapConfig()).getCommandClass(defaultCommandClass));
+ }
+
+ @Test
+ public void testGetMessageChooserClass() {
+ String messageChooserClassValue = "some.message.chooser.class";
+ Config config = new
MapConfig(ImmutableMap.of(TaskConfig.MESSAGE_CHOOSER_CLASS_NAME,
messageChooserClassValue));
+ assertEquals(messageChooserClassValue, new
TaskConfig(config).getMessageChooserClass());
+
+ // config not specified
+ assertEquals(RoundRobinChooserFactory.class.getName(), new TaskConfig(new
MapConfig()).getMessageChooserClass());
+ }
+
+ @Test
+ public void testGetDropDeserializationErrors() {
+ Config config = new
MapConfig(ImmutableMap.of(TaskConfig.DROP_DESERIALIZATION_ERRORS, "true"));
+ assertTrue(new TaskConfig(config).getDropDeserializationErrors());
+
+ config = new
MapConfig(ImmutableMap.of(TaskConfig.DROP_DESERIALIZATION_ERRORS, "false"));
+ assertFalse(new TaskConfig(config).getDropDeserializationErrors());
+
+ // config not specified
+ assertFalse(new TaskConfig(new
MapConfig()).getDropDeserializationErrors());
+ }
+
+ @Test
+ public void testGetDropSerializationErrors() {
+ Config config = new
MapConfig(ImmutableMap.of(TaskConfig.DROP_SERIALIZATION_ERRORS, "true"));
+ assertTrue(new TaskConfig(config).getDropSerializationErrors());
+
+ config = new
MapConfig(ImmutableMap.of(TaskConfig.DROP_SERIALIZATION_ERRORS, "false"));
+ assertFalse(new TaskConfig(config).getDropSerializationErrors());
+
+ // config not specified
+ assertFalse(new TaskConfig(new MapConfig()).getDropSerializationErrors());
+ }
+
+ @Test
+ public void testGetDropProducerErrors() {
+ Config config = new
MapConfig(ImmutableMap.of(TaskConfig.DROP_PRODUCER_ERRORS, "true"));
+ assertTrue(new TaskConfig(config).getDropProducerErrors());
+
+ config = new MapConfig(ImmutableMap.of(TaskConfig.DROP_PRODUCER_ERRORS,
"false"));
+ assertFalse(new TaskConfig(config).getDropProducerErrors());
+
+ // config not specified
+ assertFalse(new TaskConfig(new MapConfig()).getDropProducerErrors());
+ }
+
+ @Test
+ public void testGetPollIntervalMs() {
+ Config config = new MapConfig(ImmutableMap.of(TaskConfig.POLL_INTERVAL_MS,
"10"));
+ assertEquals(10, new TaskConfig(config).getPollIntervalMs());
+
+ // config not specified
+ assertEquals(TaskConfig.DEFAULT_POLL_INTERVAL_MS, new TaskConfig(new
MapConfig()).getPollIntervalMs());
+ }
+
+ @Test
+ public void testGetIgnoredExceptions() {
+ String ignoredExceptionsValue = "exception0.class, exception1.class";
+ Config config = new
MapConfig(ImmutableMap.of(TaskConfig.IGNORED_EXCEPTIONS,
ignoredExceptionsValue));
+ assertEquals(Optional.of(ignoredExceptionsValue), new
TaskConfig(config).getIgnoredExceptions());
+
+ // config not specified
+ assertFalse(new TaskConfig(new
MapConfig()).getIgnoredExceptions().isPresent());
+ }
+
+ @Test
+ public void testGetTaskNameGrouperFactory() {
+ String taskNameGrouperFactoryValue = "task.name.grouper.factory.class";
+ Config config = new MapConfig(ImmutableMap.of(TaskConfig.GROUPER_FACTORY,
taskNameGrouperFactoryValue));
+ assertEquals(taskNameGrouperFactoryValue, new
TaskConfig(config).getTaskNameGrouperFactory());
+
+ // config not specified
+ assertEquals(GroupByContainerCountFactory.class.getName(),
+ new TaskConfig(new MapConfig()).getTaskNameGrouperFactory());
+ }
+
+ @Test
+ public void testGetMaxConcurrency() {
+ Config config = new MapConfig(ImmutableMap.of(TaskConfig.MAX_CONCURRENCY,
"10"));
+ assertEquals(10, new TaskConfig(config).getMaxConcurrency());
+
+ // config not specified
+ assertEquals(TaskConfig.DEFAULT_MAX_CONCURRENCY, new TaskConfig(new
MapConfig()).getMaxConcurrency());
+ }
+
+ @Test
+ public void testGetCallbackTimeoutMs() {
+ Config config = new
MapConfig(ImmutableMap.of(TaskConfig.CALLBACK_TIMEOUT_MS, "10"));
+ assertEquals(10, new TaskConfig(config).getCallbackTimeoutMs());
+
+ config = new MapConfig(ImmutableMap.of(TaskConfig.CALLBACK_TIMEOUT_MS,
"-1"));
+ assertEquals(-1, new TaskConfig(config).getCallbackTimeoutMs());
+
+ // config not specified
+ assertEquals(TaskConfig.DEFAULT_CALLBACK_TIMEOUT_MS, new TaskConfig(new
MapConfig()).getCallbackTimeoutMs());
+ }
+
+ @Test
+ public void testGetAsyncCommit() {
+ Config config = new MapConfig(ImmutableMap.of(TaskConfig.ASYNC_COMMIT,
"true"));
+ assertTrue(new TaskConfig(config).getAsyncCommit());
+
+ config = new MapConfig(ImmutableMap.of(TaskConfig.ASYNC_COMMIT, "false"));
+ assertFalse(new TaskConfig(config).getAsyncCommit());
+
+ // config not specified
+ assertFalse(new TaskConfig(new MapConfig()).getAsyncCommit());
+ }
+
+ @Test
+ public void testGetMaxIdleMs() {
+ Config config = new MapConfig(ImmutableMap.of(TaskConfig.MAX_IDLE_MS,
"20"));
+ assertEquals(20, new TaskConfig(config).getMaxIdleMs());
+
+ // config not specified
+ assertEquals(TaskConfig.DEFAULT_MAX_IDLE_MS, new TaskConfig(new
MapConfig()).getMaxIdleMs());
+ }
+
+ @Test
+ public void testGetCheckpointManager() {
+ Config config =
+ new MapConfig(ImmutableMap.of(TaskConfig.CHECKPOINT_MANAGER_FACTORY,
MockCheckpointManagerFactory.class.getName()));
+ assertTrue(new TaskConfig(config).getCheckpointManager(null,
getClass().getClassLoader())
+ .get() instanceof MockCheckpointManager);
+
+ Config configEmptyString = new
MapConfig(ImmutableMap.of(TaskConfig.CHECKPOINT_MANAGER_FACTORY, ""));
+ assertFalse(new TaskConfig(configEmptyString).getCheckpointManager(null,
getClass().getClassLoader()).isPresent());
+
+ assertFalse(new TaskConfig(new MapConfig()).getCheckpointManager(null,
getClass().getClassLoader()).isPresent());
+ }
+
+ @Test
+ public void testGetBroadcastSystemStreamPartitions() {
+ // no entry for "task.broadcast.inputs"
+ assertEquals(Collections.emptySet(), new TaskConfig(new
MapConfig()).getBroadcastSystemStreamPartitions());
+
+ HashMap<String, String> map = new HashMap<>();
+ map.put("task.broadcast.inputs", "kafka.foo#4, kafka.boo#5,
kafka.z-o-o#[12-14], kafka.foo.bar#[3-4]");
+ Config config = new MapConfig(map);
+ TaskConfig taskConfig = new TaskConfig(config);
+ Set<SystemStreamPartition> systemStreamPartitionSet =
taskConfig.getBroadcastSystemStreamPartitions();
+
+ HashSet<SystemStreamPartition> expected = new HashSet<>();
+ expected.add(new SystemStreamPartition("kafka", "foo", new Partition(4)));
+ expected.add(new SystemStreamPartition("kafka", "boo", new Partition(5)));
+ expected.add(new SystemStreamPartition("kafka", "z-o-o", new
Partition(12)));
+ expected.add(new SystemStreamPartition("kafka", "z-o-o", new
Partition(13)));
+ expected.add(new SystemStreamPartition("kafka", "z-o-o", new
Partition(14)));
+ expected.add(new SystemStreamPartition("kafka", "foo.bar", new
Partition(3)));
+ expected.add(new SystemStreamPartition("kafka", "foo.bar", new
Partition(4)));
+ assertEquals(expected, systemStreamPartitionSet);
+
+ map.put("task.broadcast.inputs", "kafka.foo");
+ taskConfig = new TaskConfig(new MapConfig(map));
+ boolean catchCorrectException = false;
+ try {
+ taskConfig.getBroadcastSystemStreamPartitions();
+ } catch (IllegalArgumentException e) {
+ catchCorrectException = true;
+ }
+ assertTrue(catchCorrectException);
+
+ map.put("task.broadcast.inputs",
"kafka.org.apache.events.WhitelistedIps#1-2");
+ taskConfig = new TaskConfig(new MapConfig(map));
+ boolean invalidFormatException = false;
+ try {
+ taskConfig.getBroadcastSystemStreamPartitions();
+ } catch (IllegalArgumentException e) {
+ invalidFormatException = true;
+ }
+ assertTrue(invalidFormatException);
+ }
+
+ @Test
+ public void testGetBroadcastSystemStreams() {
+ Config config = new
MapConfig(ImmutableMap.of(TaskConfig.BROADCAST_INPUT_STREAMS,
+ "kafka.foo#4, kafka.bar#5, otherKafka.foo#4, otherKafka.foo.bar#5"));
+ Set<SystemStream> expected = ImmutableSet.of(
+ new SystemStream("kafka", "foo"),
+ new SystemStream("kafka", "bar"),
+ new SystemStream("otherKafka", "foo"),
+ new SystemStream("otherKafka", "foo.bar"));
+ assertEquals(expected, new TaskConfig(config).getBroadcastSystemStreams());
+ assertTrue(new TaskConfig(new
MapConfig()).getBroadcastSystemStreams().isEmpty());
+ }
+
+ @Test
+ public void testGetAllInputStreams() {
+ Config config = new MapConfig(ImmutableMap.of(
+ TaskConfig.INPUT_STREAMS, "kafka.foo, otherKafka.bar",
+ TaskConfig.BROADCAST_INPUT_STREAMS, "kafka.bar#4, otherKafka.foo#5"));
+ Set<SystemStream> expected = ImmutableSet.of(
+ new SystemStream("kafka", "foo"),
+ new SystemStream("otherKafka", "bar"),
+ new SystemStream("kafka", "bar"),
+ new SystemStream("otherKafka", "foo"));
+ assertEquals(expected, new TaskConfig(config).getAllInputStreams());
+
+ Config configOnlyBroadcast = new MapConfig(ImmutableMap.of(
+ TaskConfig.BROADCAST_INPUT_STREAMS, "kafka.bar#4, otherKafka.foo#5"));
+ Set<SystemStream> expectedOnlyBroadcast = ImmutableSet.of(
+ new SystemStream("kafka", "bar"),
+ new SystemStream("otherKafka", "foo"));
+ assertEquals(expectedOnlyBroadcast, new
TaskConfig(configOnlyBroadcast).getAllInputStreams());
+
+ Config configOnlyInputs = new
MapConfig(ImmutableMap.of(TaskConfig.INPUT_STREAMS, "kafka.foo,
otherKafka.bar"));
+ Set<SystemStream> expectedOnlyInputs = ImmutableSet.of(
+ new SystemStream("kafka", "foo"),
+ new SystemStream("otherKafka", "bar"));
+ assertEquals(expectedOnlyInputs, new
TaskConfig(configOnlyInputs).getAllInputStreams());
+
+ assertTrue(new TaskConfig(new MapConfig()).getAllInputStreams().isEmpty());
+ }
+
+ @Test
+ public void testGetShutdownMs() {
+ Config config = new MapConfig(ImmutableMap.of(TaskConfig.TASK_SHUTDOWN_MS,
"10"));
+ assertEquals(10, new TaskConfig(config).getShutdownMs());
+
+ // unable to parse value into number
+ config = new MapConfig(ImmutableMap.of(TaskConfig.TASK_SHUTDOWN_MS, "not a
number"));
+ assertEquals(TaskConfig.DEFAULT_TASK_SHUTDOWN_MS, new
TaskConfig(config).getShutdownMs());
+
+ // config not specified
+ assertEquals(TaskConfig.DEFAULT_TASK_SHUTDOWN_MS, new TaskConfig(new
MapConfig()).getShutdownMs());
+ }
+
+ /**
+ * Used for testing classloading a {@link CheckpointManagerFactory}.
+ */
+ public static class MockCheckpointManagerFactory implements
CheckpointManagerFactory {
+ @Override
+ public CheckpointManager getCheckpointManager(Config config,
MetricsRegistry registry) {
+ return new MockCheckpointManager();
+ }
+ }
+
+ /**
+ * Placeholder class to be returned by {@link MockCheckpointManagerFactory}.
+ */
+ private static class MockCheckpointManager implements CheckpointManager {
+ @Override
+ public void start() { }
+
+ @Override
+ public void register(TaskName taskName) { }
+
+ @Override
+ public void writeCheckpoint(TaskName taskName, Checkpoint checkpoint) { }
+
+ @Override
+ public Checkpoint readLastCheckpoint(TaskName taskName) {
+ return null;
+ }
+
+ @Override
+ public void stop() { }
+ }
+}
diff --git
a/samza-core/src/test/java/org/apache/samza/config/TestTaskConfigJava.java
b/samza-core/src/test/java/org/apache/samza/config/TestTaskConfigJava.java
deleted file mode 100644
index baf2d4f..0000000
--- a/samza-core/src/test/java/org/apache/samza/config/TestTaskConfigJava.java
+++ /dev/null
@@ -1,93 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.config;
-
-import static org.junit.Assert.*;
-
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Set;
-
-import com.google.common.collect.ImmutableMap;
-import org.apache.samza.Partition;
-import org.apache.samza.system.SystemStreamPartition;
-import org.junit.Test;
-
-public class TestTaskConfigJava {
-
- @Test
- public void testGetBroadcastSystemStreamPartitions() {
- HashMap<String, String> map = new HashMap<String, String>();
- map.put("task.broadcast.inputs", "kafka.foo#4, kafka.boo#5,
kafka.z-o-o#[12-14], kafka.foo.bar#[3-4]");
- Config config = new MapConfig(map);
-
- TaskConfigJava taskConfig = new TaskConfigJava(config);
- Set<SystemStreamPartition> systemStreamPartitionSet =
taskConfig.getBroadcastSystemStreamPartitions();
-
- HashSet<SystemStreamPartition> expected = new
HashSet<SystemStreamPartition>();
- expected.add(new SystemStreamPartition("kafka", "foo", new Partition(4)));
- expected.add(new SystemStreamPartition("kafka", "boo", new Partition(5)));
- expected.add(new SystemStreamPartition("kafka", "z-o-o", new
Partition(12)));
- expected.add(new SystemStreamPartition("kafka", "z-o-o", new
Partition(13)));
- expected.add(new SystemStreamPartition("kafka", "z-o-o", new
Partition(14)));
- expected.add(new SystemStreamPartition("kafka", "foo.bar", new
Partition(3)));
- expected.add(new SystemStreamPartition("kafka", "foo.bar", new
Partition(4)));
- assertEquals(expected, systemStreamPartitionSet);
-
- map.put("task.broadcast.inputs", "kafka.foo");
- taskConfig = new TaskConfigJava(new MapConfig(map));
- boolean catchCorrectException = false;
- try {
- taskConfig.getBroadcastSystemStreamPartitions();
- } catch (IllegalArgumentException e) {
- catchCorrectException = true;
- }
- assertTrue(catchCorrectException);
-
- map.put("task.broadcast.inputs",
"kafka.org.apache.events.WhitelistedIps#1-2");
- taskConfig = new TaskConfigJava(new MapConfig(map));
- boolean invalidFormatException = false;
- try {
- taskConfig.getBroadcastSystemStreamPartitions();
- } catch (IllegalArgumentException e) {
- invalidFormatException = true;
- }
- assertTrue(invalidFormatException);
- }
-
- @Test
- public void testAutoCommitConfig() {
- // positive values of commit.ms => autoCommit = true
- Config config1 = new MapConfig(ImmutableMap.of("task.commit.ms", "1"));
- assertTrue(new TaskConfig(config1).isAutoCommitEnabled());
-
- // no value for commit.ms => autoCommit = true
- Config config2 = new MapConfig(ImmutableMap.of());
- assertTrue(new TaskConfig(config2).isAutoCommitEnabled());
-
- // A zero value for commit.ms => autoCommit = false
- Config config3 = new MapConfig(ImmutableMap.of("task.commit.ms", "0"));
- assertFalse(new TaskConfig(config3).isAutoCommitEnabled());
-
- // negative value for commit.ms => autoCommit = false
- Config config4 = new MapConfig(ImmutableMap.of("task.commit.ms", "-1"));
- assertFalse(new TaskConfig(config4).isAutoCommitEnabled());
- }
-}
diff --git
a/samza-core/src/test/java/org/apache/samza/execution/TestExecutionPlanner.java
b/samza-core/src/test/java/org/apache/samza/execution/TestExecutionPlanner.java
index 4077079..9f7644b 100644
---
a/samza-core/src/test/java/org/apache/samza/execution/TestExecutionPlanner.java
+++
b/samza-core/src/test/java/org/apache/samza/execution/TestExecutionPlanner.java
@@ -725,7 +725,7 @@ public class TestExecutionPlanner {
assertEquals(1, jobConfigs.size());
// GCD of 8, 16, 1600 and 252 is 4
- assertEquals("4", jobConfigs.get(0).get(TaskConfig.WINDOW_MS()));
+ assertEquals("4", jobConfigs.get(0).get(TaskConfig.WINDOW_MS));
}
@Test
@@ -741,7 +741,7 @@ public class TestExecutionPlanner {
assertEquals(1, jobConfigs.size());
// GCD of 8, 16, 1600 and 252 is 4
- assertEquals("4", jobConfigs.get(0).get(TaskConfig.WINDOW_MS()));
+ assertEquals("4", jobConfigs.get(0).get(TaskConfig.WINDOW_MS));
}
@Test
@@ -755,13 +755,13 @@ public class TestExecutionPlanner {
ExecutionPlan plan = planner.plan(graphSpec);
List<JobConfig> jobConfigs = plan.getJobConfigs();
assertEquals(1, jobConfigs.size());
- assertFalse(jobConfigs.get(0).containsKey(TaskConfig.WINDOW_MS()));
+ assertFalse(jobConfigs.get(0).containsKey(TaskConfig.WINDOW_MS));
}
@Test
public void testTriggerIntervalWhenWindowMsIsConfigured() {
Map<String, String> map = new HashMap<>(config);
- map.put(TaskConfig.WINDOW_MS(), "2000");
+ map.put(TaskConfig.WINDOW_MS, "2000");
map.put(JobConfig.JOB_INTERMEDIATE_STREAM_PARTITIONS(),
String.valueOf(DEFAULT_PARTITIONS));
Config cfg = new MapConfig(map);
@@ -770,7 +770,7 @@ public class TestExecutionPlanner {
ExecutionPlan plan = planner.plan(graphSpec);
List<JobConfig> jobConfigs = plan.getJobConfigs();
assertEquals(1, jobConfigs.size());
- assertEquals("2000", jobConfigs.get(0).get(TaskConfig.WINDOW_MS()));
+ assertEquals("2000", jobConfigs.get(0).get(TaskConfig.WINDOW_MS));
}
@Test
diff --git
a/samza-core/src/test/java/org/apache/samza/execution/TestJobNodeConfigurationGenerator.java
b/samza-core/src/test/java/org/apache/samza/execution/TestJobNodeConfigurationGenerator.java
index 0962a14..f60be4f 100644
---
a/samza-core/src/test/java/org/apache/samza/execution/TestJobNodeConfigurationGenerator.java
+++
b/samza-core/src/test/java/org/apache/samza/execution/TestJobNodeConfigurationGenerator.java
@@ -30,7 +30,6 @@ import org.apache.samza.config.JobConfig;
import org.apache.samza.config.MapConfig;
import org.apache.samza.config.SerializerConfig;
import org.apache.samza.config.TaskConfig;
-import org.apache.samza.config.TaskConfigJava;
import org.apache.samza.storage.SideInputsProcessor;
import org.apache.samza.system.descriptors.GenericInputDescriptor;
import org.apache.samza.table.descriptors.BaseTableDescriptor;
@@ -75,7 +74,7 @@ public class TestJobNodeConfigurationGenerator extends
ExecutionPlannerTestBase
Config expectedJobConfig = getExpectedJobConfig(mockConfig,
mockJobNode.getInEdges());
validateJobConfig(expectedJobConfig, jobConfig);
// additional, check the computed window.ms for join
- assertEquals("3600000", jobConfig.get(TaskConfig.WINDOW_MS()));
+ assertEquals("3600000", jobConfig.get(TaskConfig.WINDOW_MS));
Map<String, Serde> deserializedSerdes =
validateAndGetDeserializedSerdes(jobConfig, 5);
validateStreamConfigures(jobConfig, deserializedSerdes);
validateJoinStoreConfigures(jobConfig, deserializedSerdes);
@@ -276,10 +275,10 @@ public class TestJobNodeConfigurationGenerator extends
ExecutionPlannerTestBase
}
}
if (!inputs.isEmpty()) {
- configMap.put(TaskConfig.INPUT_STREAMS(), Joiner.on(',').join(inputs));
+ configMap.put(TaskConfig.INPUT_STREAMS, Joiner.on(',').join(inputs));
}
if (!broadcasts.isEmpty()) {
- configMap.put(TaskConfigJava.BROADCAST_INPUT_STREAMS,
Joiner.on(',').join(broadcasts));
+ configMap.put(TaskConfig.BROADCAST_INPUT_STREAMS,
Joiner.on(',').join(broadcasts));
}
return new MapConfig(configMap);
}
@@ -299,8 +298,8 @@ public class TestJobNodeConfigurationGenerator extends
ExecutionPlannerTestBase
assertEquals(expectedConfig.get(JobConfig.JOB_NAME()),
jobConfig.getName().get());
assertEquals(expectedConfig.get(JobConfig.JOB_ID()), jobConfig.getJobId());
assertEquals("testJobGraphJson",
jobConfig.get(JobNodeConfigurationGenerator.CONFIG_INTERNAL_EXECUTION_PLAN));
- assertEquals(expectedConfig.get(TaskConfig.INPUT_STREAMS()),
jobConfig.get(TaskConfig.INPUT_STREAMS()));
- assertEquals(expectedConfig.get(TaskConfigJava.BROADCAST_INPUT_STREAMS),
jobConfig.get(TaskConfigJava.BROADCAST_INPUT_STREAMS));
+ assertEquals(expectedConfig.get(TaskConfig.INPUT_STREAMS),
jobConfig.get(TaskConfig.INPUT_STREAMS));
+ assertEquals(expectedConfig.get(TaskConfig.BROADCAST_INPUT_STREAMS),
jobConfig.get(TaskConfig.BROADCAST_INPUT_STREAMS));
}
private void validateStreamSerdeConfigure(String streamId, Config config,
Map<String, Serde> deserializedSerdes) {
diff --git
a/samza-core/src/test/scala/org/apache/samza/container/TestTaskInstanceExceptionHandler.scala
b/samza-core/src/test/scala/org/apache/samza/container/TestTaskInstanceExceptionHandler.scala
index ca06b2a..0829a82 100644
---
a/samza-core/src/test/scala/org/apache/samza/container/TestTaskInstanceExceptionHandler.scala
+++
b/samza-core/src/test/scala/org/apache/samza/container/TestTaskInstanceExceptionHandler.scala
@@ -121,7 +121,7 @@ class TestTaskInstanceExceptionHandler extends
AssertionsForJUnit with MockitoSu
}
private def build(config: Config): TaskInstanceExceptionHandler = {
- TaskInstanceExceptionHandler.apply(this.metrics, config)
+ TaskInstanceExceptionHandler.apply(this.metrics, new TaskConfig(config))
}
/**
diff --git
a/samza-core/src/test/scala/org/apache/samza/coordinator/TestJobCoordinator.scala
b/samza-core/src/test/scala/org/apache/samza/coordinator/TestJobCoordinator.scala
index d498b24..8117a00 100644
---
a/samza-core/src/test/scala/org/apache/samza/coordinator/TestJobCoordinator.scala
+++
b/samza-core/src/test/scala/org/apache/samza/coordinator/TestJobCoordinator.scala
@@ -249,8 +249,8 @@ class TestJobCoordinator extends FlatSpec with
PrivateMethodTester {
val getMatchedInputStreamPartitions =
PrivateMethod[immutable.Set[Any]]('getMatchedInputStreamPartitions)
val allSSP = JobModelManager invokePrivate
getInputStreamPartitions(config, streamMetadataCache)
- val matchedSSP = JobModelManager invokePrivate
getMatchedInputStreamPartitions(config, streamMetadataCache,
- getClass.getClassLoader)
+ val matchedSSP =
+ JobModelManager invokePrivate getMatchedInputStreamPartitions(config,
streamMetadataCache, getClass.getClassLoader)
assertEquals(matchedSSP, allSSP)
}
diff --git
a/samza-core/src/test/scala/org/apache/samza/system/TestSystemConsumers.scala
b/samza-core/src/test/scala/org/apache/samza/system/TestSystemConsumers.scala
index 037fbb3..d1fce3e 100644
---
a/samza-core/src/test/scala/org/apache/samza/system/TestSystemConsumers.scala
+++
b/samza-core/src/test/scala/org/apache/samza/system/TestSystemConsumers.scala
@@ -25,8 +25,8 @@ import java.util.Collections
import org.junit.Assert._
import org.junit.Test
import org.apache.samza.Partition
+import org.apache.samza.config.TaskConfig
import org.apache.samza.serializers._
-import org.apache.samza.startpoint._
import org.apache.samza.system.chooser.MessageChooser
import org.apache.samza.system.chooser.DefaultChooser
import org.apache.samza.system.chooser.MockMessageChooser
@@ -51,7 +51,7 @@ class TestSystemConsumers {
new SerdeManager, new
SystemConsumersMetrics,
SystemConsumers.DEFAULT_NO_NEW_MESSAGES_TIMEOUT,
SystemConsumers.DEFAULT_DROP_SERIALIZATION_ERROR,
-
SystemConsumers.DEFAULT_POLL_INTERVAL_MS, clock = () => now)
+ TaskConfig.DEFAULT_POLL_INTERVAL_MS,
clock = () => now)
consumers.register(systemStreamPartition0, "0")
consumers.register(systemStreamPartition1, "1234")
@@ -75,7 +75,7 @@ class TestSystemConsumers {
// Advance the clock to trigger a new poll even though there are still
// messages.
- now = SystemConsumers.DEFAULT_POLL_INTERVAL_MS
+ now = TaskConfig.DEFAULT_POLL_INTERVAL_MS
assertEquals(envelope, consumers.choose())
@@ -116,7 +116,7 @@ class TestSystemConsumers {
new SerdeManager, new
SystemConsumersMetrics,
SystemConsumers.DEFAULT_NO_NEW_MESSAGES_TIMEOUT,
SystemConsumers.DEFAULT_DROP_SERIALIZATION_ERROR,
-
SystemConsumers.DEFAULT_POLL_INTERVAL_MS, clock = () => now)
+ TaskConfig.DEFAULT_POLL_INTERVAL_MS,
clock = () => now)
consumers.register(systemStreamPartition, "0")
consumers.start
@@ -145,7 +145,7 @@ class TestSystemConsumers {
assertNull(consumers.choose())
// Increase clock interval.
- now = SystemConsumers.DEFAULT_POLL_INTERVAL_MS
+ now = TaskConfig.DEFAULT_POLL_INTERVAL_MS
// We get two messages now.
assertEquals(envelope, consumers.choose())
@@ -316,7 +316,7 @@ class TestSystemConsumers {
systemAdmins, new SerdeManager, new SystemConsumersMetrics,
SystemConsumers.DEFAULT_NO_NEW_MESSAGES_TIMEOUT,
SystemConsumers.DEFAULT_DROP_SERIALIZATION_ERROR,
- SystemConsumers.DEFAULT_POLL_INTERVAL_MS, clock = () => 0)
+ TaskConfig.DEFAULT_POLL_INTERVAL_MS, clock = () => 0)
consumers.register(systemStreamPartition1, "0")
consumers.register(systemStreamPartition2, "0")
@@ -369,7 +369,7 @@ class TestSystemConsumers {
systemAdmins, new SerdeManager, new SystemConsumersMetrics,
SystemConsumers.DEFAULT_NO_NEW_MESSAGES_TIMEOUT,
SystemConsumers.DEFAULT_DROP_SERIALIZATION_ERROR,
- SystemConsumers.DEFAULT_POLL_INTERVAL_MS, clock = () => 0)
+ TaskConfig.DEFAULT_POLL_INTERVAL_MS, clock = () => 0)
consumers.register(systemStreamPartition1, "0")
}
diff --git
a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala
b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala
index 7d093c3..deb8056 100644
---
a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala
+++
b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala
@@ -25,7 +25,6 @@ import com.google.common.annotations.VisibleForTesting
import org.apache.kafka.clients.producer.KafkaProducer
import org.apache.samza.config.ApplicationConfig.ApplicationMode
import org.apache.samza.config.KafkaConfig.Config2Kafka
-import org.apache.samza.config.TaskConfig.Config2Task
import org.apache.samza.config._
import org.apache.samza.metrics.MetricsRegistry
import org.apache.samza.system.{SystemAdmin, SystemConsumer, SystemFactory,
SystemProducer}
@@ -80,12 +79,13 @@ class KafkaSystemFactory extends SystemFactory with Logging
{
// for us.
info("Creating kafka producer for system %s, producerClientId %s"
format(systemName, clientId))
+ val taskConfig = new TaskConfig(config)
new KafkaSystemProducer(
systemName,
new ExponentialSleepStrategy(initialDelayMs =
producerConfig.reconnectIntervalMs),
getProducer,
metrics,
- dropProducerExceptions = config.getDropProducerErrors)
+ dropProducerExceptions = taskConfig.getDropProducerErrors)
}
def getAdmin(systemName: String, config: Config): SystemAdmin = {
diff --git
a/samza-log4j/src/main/java/org/apache/samza/logging/log4j/StreamAppender.java
b/samza-log4j/src/main/java/org/apache/samza/logging/log4j/StreamAppender.java
index d48136c..61b7475 100644
---
a/samza-log4j/src/main/java/org/apache/samza/logging/log4j/StreamAppender.java
+++
b/samza-log4j/src/main/java/org/apache/samza/logging/log4j/StreamAppender.java
@@ -279,7 +279,7 @@ public class StreamAppender extends AppenderSkeleton {
throw new SamzaException("can not read the config", e);
}
// Make system producer drop producer errors for StreamAppender
- config = new MapConfig(config,
ImmutableMap.of(TaskConfig.DROP_PRODUCER_ERRORS(), "true"));
+ config = new MapConfig(config,
ImmutableMap.of(TaskConfig.DROP_PRODUCER_ERRORS, "true"));
return config;
}
diff --git
a/samza-log4j2/src/main/java/org/apache/samza/logging/log4j2/StreamAppender.java
b/samza-log4j2/src/main/java/org/apache/samza/logging/log4j2/StreamAppender.java
index a719317..b47ddab 100644
---
a/samza-log4j2/src/main/java/org/apache/samza/logging/log4j2/StreamAppender.java
+++
b/samza-log4j2/src/main/java/org/apache/samza/logging/log4j2/StreamAppender.java
@@ -300,7 +300,7 @@ public class StreamAppender extends AbstractAppender {
throw new SamzaException("can not read the config", e);
}
// Make system producer drop producer errors for StreamAppender
- config = new MapConfig(config,
ImmutableMap.of(TaskConfig.DROP_PRODUCER_ERRORS(), "true"));
+ config = new MapConfig(config,
ImmutableMap.of(TaskConfig.DROP_PRODUCER_ERRORS, "true"));
return config;
}
diff --git
a/samza-sql-shell/src/main/java/org/apache/samza/sql/client/impl/SamzaExecutor.java
b/samza-sql-shell/src/main/java/org/apache/samza/sql/client/impl/SamzaExecutor.java
index 0434815..181f0f7 100755
---
a/samza-sql-shell/src/main/java/org/apache/samza/sql/client/impl/SamzaExecutor.java
+++
b/samza-sql-shell/src/main/java/org/apache/samza/sql/client/impl/SamzaExecutor.java
@@ -317,7 +317,7 @@ public class SamzaExecutor implements SqlExecutor {
staticConfigs.put(JobConfig.JOB_NAME(), "sql-job-" + execId);
staticConfigs.put(JobConfig.PROCESSOR_ID(), String.valueOf(execId));
staticConfigs.put(JobCoordinatorConfig.JOB_COORDINATOR_FACTORY,
PassthroughJobCoordinatorFactory.class.getName());
- staticConfigs.put(TaskConfig.GROUPER_FACTORY(),
SingleContainerGrouperFactory.class.getName());
+ staticConfigs.put(TaskConfig.GROUPER_FACTORY,
SingleContainerGrouperFactory.class.getName());
staticConfigs.put(SamzaSqlApplicationConfig.CFG_IO_RESOLVER, "config");
String configIOResolverDomain =
diff --git
a/samza-sql/src/test/java/org/apache/samza/sql/util/SamzaSqlTestConfig.java
b/samza-sql/src/test/java/org/apache/samza/sql/util/SamzaSqlTestConfig.java
index 627dc65..364d0a9 100644
--- a/samza-sql/src/test/java/org/apache/samza/sql/util/SamzaSqlTestConfig.java
+++ b/samza-sql/src/test/java/org/apache/samza/sql/util/SamzaSqlTestConfig.java
@@ -82,7 +82,7 @@ public class SamzaSqlTestConfig {
staticConfigs.put(JobConfig.JOB_NAME(), "sql-job");
staticConfigs.put(JobConfig.PROCESSOR_ID(), "1");
staticConfigs.put(JobCoordinatorConfig.JOB_COORDINATOR_FACTORY,
PassthroughJobCoordinatorFactory.class.getName());
- staticConfigs.put(TaskConfig.GROUPER_FACTORY(),
SingleContainerGrouperFactory.class.getName());
+ staticConfigs.put(TaskConfig.GROUPER_FACTORY,
SingleContainerGrouperFactory.class.getName());
staticConfigs.put(SamzaSqlApplicationConfig.CFG_IO_RESOLVER, "config");
String configIOResolverDomain =
diff --git
a/samza-test/src/main/java/org/apache/samza/test/framework/TestRunner.java
b/samza-test/src/main/java/org/apache/samza/test/framework/TestRunner.java
index 6737488..2f2e74e 100644
--- a/samza-test/src/main/java/org/apache/samza/test/framework/TestRunner.java
+++ b/samza-test/src/main/java/org/apache/samza/test/framework/TestRunner.java
@@ -110,7 +110,7 @@ public class TestRunner {
configs.put(JobConfig.PROCESSOR_ID(), "1");
configs.put(JobCoordinatorConfig.JOB_COORDINATOR_FACTORY,
PassthroughJobCoordinatorFactory.class.getName());
configs.put(JobConfig.STARTPOINT_METADATA_STORE_FACTORY(),
InMemoryMetadataStoreFactory.class.getCanonicalName());
- configs.put(TaskConfig.GROUPER_FACTORY(),
SingleContainerGrouperFactory.class.getName());
+ configs.put(TaskConfig.GROUPER_FACTORY,
SingleContainerGrouperFactory.class.getName());
// Changing the base directory for non-changelog stores used by Samza
application to separate the
// on-disk store locations for concurrently executing tests
configs.put(JobConfig.JOB_NON_LOGGED_STORE_BASE_DIR(),
@@ -131,7 +131,7 @@ public class TestRunner {
private TestRunner(Class taskClass) {
this();
Preconditions.checkNotNull(taskClass);
- configs.put(TaskConfig.TASK_CLASS(), taskClass.getName());
+ configs.put(TaskConfig.TASK_CLASS, taskClass.getName());
this.app = new LegacyTaskApplication(taskClass.getName());
}
@@ -373,11 +373,11 @@ public class TestRunner {
Map<Integer, Iterable<StreamMessageType>> partitionData) {
String systemName = descriptor.getSystemName();
String streamName = (String)
descriptor.getPhysicalName().orElse(descriptor.getStreamId());
- if (configs.containsKey(TaskConfig.INPUT_STREAMS())) {
- configs.put(TaskConfig.INPUT_STREAMS(),
- configs.get(TaskConfig.INPUT_STREAMS()).concat("," + systemName +
"." + streamName));
+ if (configs.containsKey(TaskConfig.INPUT_STREAMS)) {
+ configs.put(TaskConfig.INPUT_STREAMS,
+ configs.get(TaskConfig.INPUT_STREAMS).concat("," + systemName + "."
+ streamName));
} else {
- configs.put(TaskConfig.INPUT_STREAMS(), systemName + "." + streamName);
+ configs.put(TaskConfig.INPUT_STREAMS, systemName + "." + streamName);
}
InMemorySystemDescriptor imsd = (InMemorySystemDescriptor)
descriptor.getSystemDescriptor();
imsd.withInMemoryScope(this.inMemoryScope);
diff --git
a/samza-test/src/test/java/org/apache/samza/processor/TestZkStreamProcessorBase.java
b/samza-test/src/test/java/org/apache/samza/processor/TestZkStreamProcessorBase.java
index 777d219..81ac963 100644
---
a/samza-test/src/test/java/org/apache/samza/processor/TestZkStreamProcessorBase.java
+++
b/samza-test/src/test/java/org/apache/samza/processor/TestZkStreamProcessorBase.java
@@ -40,7 +40,7 @@ import org.apache.samza.config.Config;
import org.apache.samza.config.JobConfig;
import org.apache.samza.config.JobCoordinatorConfig;
import org.apache.samza.config.MapConfig;
-import org.apache.samza.config.TaskConfigJava;
+import org.apache.samza.config.TaskConfig;
import org.apache.samza.config.ZkConfig;
import org.apache.samza.context.Context;
import org.apache.samza.coordinator.JobCoordinator;
@@ -197,7 +197,7 @@ public class TestZkStreamProcessorBase extends
IntegrationTestHarness {
configs.put("task.name.grouper.factory",
"org.apache.samza.container.grouper.task.GroupByContainerIdsFactory");
configs.put(JobCoordinatorConfig.JOB_COORDINATOR_FACTORY,
"org.apache.samza.zk.ZkJobCoordinatorFactory");
- configs.put(TaskConfigJava.TASK_SHUTDOWN_MS, TASK_SHUTDOWN_MS);
+ configs.put(TaskConfig.TASK_SHUTDOWN_MS, TASK_SHUTDOWN_MS);
configs.put(JobConfig.JOB_DEBOUNCE_TIME_MS(), JOB_DEBOUNCE_TIME_MS);
configs.put(ZkConfig.ZK_CONSENSUS_TIMEOUT_MS, BARRIER_TIMEOUT_MS);
configs.put(ZkConfig.ZK_SESSION_TIMEOUT_MS, ZK_SESSION_TIMEOUT_MS);
diff --git
a/samza-test/src/test/java/org/apache/samza/test/controlmessages/EndOfStreamIntegrationTest.java
b/samza-test/src/test/java/org/apache/samza/test/controlmessages/EndOfStreamIntegrationTest.java
index b571ae0..054556e 100644
---
a/samza-test/src/test/java/org/apache/samza/test/controlmessages/EndOfStreamIntegrationTest.java
+++
b/samza-test/src/test/java/org/apache/samza/test/controlmessages/EndOfStreamIntegrationTest.java
@@ -81,7 +81,7 @@ public class EndOfStreamIntegrationTest extends
IntegrationTestHarness {
configs.put(JobConfig.JOB_NAME(), "test-eos-job");
configs.put(JobConfig.PROCESSOR_ID(), "1");
configs.put(JobCoordinatorConfig.JOB_COORDINATOR_FACTORY,
PassthroughJobCoordinatorFactory.class.getName());
- configs.put(TaskConfig.GROUPER_FACTORY(),
SingleContainerGrouperFactory.class.getName());
+ configs.put(TaskConfig.GROUPER_FACTORY,
SingleContainerGrouperFactory.class.getName());
configs.put("systems.kafka.samza.factory",
"org.apache.samza.system.kafka.KafkaSystemFactory");
configs.put("systems.kafka.producer.bootstrap.servers", bootstrapUrl());
diff --git
a/samza-test/src/test/java/org/apache/samza/test/controlmessages/WatermarkIntegrationTest.java
b/samza-test/src/test/java/org/apache/samza/test/controlmessages/WatermarkIntegrationTest.java
index 63dc254..7431bd7 100644
---
a/samza-test/src/test/java/org/apache/samza/test/controlmessages/WatermarkIntegrationTest.java
+++
b/samza-test/src/test/java/org/apache/samza/test/controlmessages/WatermarkIntegrationTest.java
@@ -19,6 +19,7 @@
package org.apache.samza.test.controlmessages;
+import org.apache.samza.config.TaskConfig;
import scala.collection.JavaConverters;
import java.util.ArrayList;
@@ -34,7 +35,6 @@ import org.apache.samza.config.Config;
import org.apache.samza.config.JobConfig;
import org.apache.samza.config.JobCoordinatorConfig;
import org.apache.samza.config.MapConfig;
-import org.apache.samza.config.TaskConfig;
import org.apache.samza.container.SamzaContainer;
import org.apache.samza.container.TaskInstance;
import org.apache.samza.container.TaskName;
@@ -131,7 +131,7 @@ public class WatermarkIntegrationTest extends
IntegrationTestHarness {
configs.put(JobConfig.JOB_NAME(), "test-watermark-job");
configs.put(JobConfig.PROCESSOR_ID(), "1");
configs.put(JobCoordinatorConfig.JOB_COORDINATOR_FACTORY,
PassthroughJobCoordinatorFactory.class.getName());
- configs.put(TaskConfig.GROUPER_FACTORY(),
SingleContainerGrouperFactory.class.getName());
+ configs.put(TaskConfig.GROUPER_FACTORY,
SingleContainerGrouperFactory.class.getName());
configs.put("systems.kafka.samza.factory",
"org.apache.samza.system.kafka.KafkaSystemFactory");
configs.put("systems.kafka.producer.bootstrap.servers", bootstrapUrl());
diff --git
a/samza-test/src/test/java/org/apache/samza/test/framework/FaultInjectionTest.java
b/samza-test/src/test/java/org/apache/samza/test/framework/FaultInjectionTest.java
index 83b2b63..268f07c 100644
---
a/samza-test/src/test/java/org/apache/samza/test/framework/FaultInjectionTest.java
+++
b/samza-test/src/test/java/org/apache/samza/test/framework/FaultInjectionTest.java
@@ -55,9 +55,9 @@ public class FaultInjectionTest extends
StreamApplicationIntegrationTestHarness
Map<String, String> configs = new HashMap<>();
configs.put(JobCoordinatorConfig.JOB_COORDINATOR_FACTORY,
"org.apache.samza.zk.ZkJobCoordinatorFactory");
configs.put(JobConfig.PROCESSOR_ID(), "0");
- configs.put(TaskConfig.GROUPER_FACTORY(),
"org.apache.samza.container.grouper.task.GroupByContainerIdsFactory");
+ configs.put(TaskConfig.GROUPER_FACTORY,
"org.apache.samza.container.grouper.task.GroupByContainerIdsFactory");
configs.put(FaultInjectionStreamApp.INPUT_TOPIC_NAME_PROP, "page-views");
- configs.put(TaskConfig.INPUT_STREAMS(), "kafka.page-views");
+ configs.put(TaskConfig.INPUT_STREAMS, "kafka.page-views");
configs.put(ZkConfig.ZK_CONNECT, zkConnect());
configs.put(JobConfig.JOB_DEBOUNCE_TIME_MS(), "5000");
diff --git
a/samza-test/src/test/java/org/apache/samza/test/operator/TestAsyncFlatMap.java
b/samza-test/src/test/java/org/apache/samza/test/operator/TestAsyncFlatMap.java
index 4afff92..4f5d510 100644
---
a/samza-test/src/test/java/org/apache/samza/test/operator/TestAsyncFlatMap.java
+++
b/samza-test/src/test/java/org/apache/samza/test/operator/TestAsyncFlatMap.java
@@ -82,7 +82,7 @@ public class TestAsyncFlatMap extends IntegrationTestHarness {
@Test(expected = SamzaException.class)
public void testProcessingFutureCompletesAfterTaskTimeout() {
Map<String, String> configs = new HashMap<>();
- configs.put(TaskConfig.CALLBACK_TIMEOUT_MS(), "100");
+ configs.put(TaskConfig.CALLBACK_TIMEOUT_MS, "100");
configs.put(PROCESS_JITTER, "200");
runTest(PAGE_VIEWS, configs);
diff --git
a/samza-test/src/test/java/org/apache/samza/test/operator/TestRepartitionJoinWindowApp.java
b/samza-test/src/test/java/org/apache/samza/test/operator/TestRepartitionJoinWindowApp.java
index fc5e75a..cb81b89 100644
---
a/samza-test/src/test/java/org/apache/samza/test/operator/TestRepartitionJoinWindowApp.java
+++
b/samza-test/src/test/java/org/apache/samza/test/operator/TestRepartitionJoinWindowApp.java
@@ -83,7 +83,7 @@ public class TestRepartitionJoinWindowApp extends
StreamApplicationIntegrationTe
Map<String, String> configs = new HashMap<>();
configs.put(JobCoordinatorConfig.JOB_COORDINATOR_FACTORY,
"org.apache.samza.standalone.PassthroughJobCoordinatorFactory");
configs.put(JobConfig.PROCESSOR_ID(), "0");
- configs.put(TaskConfig.GROUPER_FACTORY(),
"org.apache.samza.container.grouper.task.GroupByContainerIdsFactory");
+ configs.put(TaskConfig.GROUPER_FACTORY,
"org.apache.samza.container.grouper.task.GroupByContainerIdsFactory");
configs.put("systems.kafka.samza.delete.committed.messages", "false");
configs.put(RepartitionJoinWindowApp.INPUT_TOPIC_1_CONFIG_KEY,
inputTopicName1);
configs.put(RepartitionJoinWindowApp.INPUT_TOPIC_2_CONFIG_KEY,
inputTopicName2);
@@ -112,7 +112,7 @@ public class TestRepartitionJoinWindowApp extends
StreamApplicationIntegrationTe
Map<String, String> configs = new HashMap<>();
configs.put(JobCoordinatorConfig.JOB_COORDINATOR_FACTORY,
"org.apache.samza.standalone.PassthroughJobCoordinatorFactory");
configs.put(JobConfig.PROCESSOR_ID(), "0");
- configs.put(TaskConfig.GROUPER_FACTORY(),
"org.apache.samza.container.grouper.task.GroupByContainerIdsFactory");
+ configs.put(TaskConfig.GROUPER_FACTORY,
"org.apache.samza.container.grouper.task.GroupByContainerIdsFactory");
configs.put("systems.kafka.samza.delete.committed.messages", "true");
configs.put(RepartitionJoinWindowApp.INPUT_TOPIC_1_CONFIG_KEY,
inputTopicName1);
configs.put(RepartitionJoinWindowApp.INPUT_TOPIC_2_CONFIG_KEY,
inputTopicName2);
@@ -161,7 +161,7 @@ public class TestRepartitionJoinWindowApp extends
StreamApplicationIntegrationTe
Map<String, String> configs = new HashMap<>();
configs.put(JobCoordinatorConfig.JOB_COORDINATOR_FACTORY,
"org.apache.samza.standalone.PassthroughJobCoordinatorFactory");
configs.put(JobConfig.PROCESSOR_ID(), "0");
- configs.put(TaskConfig.GROUPER_FACTORY(),
"org.apache.samza.container.grouper.task.GroupByContainerIdsFactory");
+ configs.put(TaskConfig.GROUPER_FACTORY,
"org.apache.samza.container.grouper.task.GroupByContainerIdsFactory");
configs.put(BroadcastAssertApp.INPUT_TOPIC_NAME_PROP, inputTopicName1);
initializeTopics(inputTopicName1, inputTopicName2, outputTopicName);
diff --git
a/samza-test/src/test/java/org/apache/samza/test/operator/TestRepartitionWindowApp.java
b/samza-test/src/test/java/org/apache/samza/test/operator/TestRepartitionWindowApp.java
index 2f08fed..82533a0 100644
---
a/samza-test/src/test/java/org/apache/samza/test/operator/TestRepartitionWindowApp.java
+++
b/samza-test/src/test/java/org/apache/samza/test/operator/TestRepartitionWindowApp.java
@@ -62,7 +62,7 @@ public class TestRepartitionWindowApp extends
StreamApplicationIntegrationTestHa
Map<String, String> configs = new HashMap<>();
configs.put(JobCoordinatorConfig.JOB_COORDINATOR_FACTORY,
"org.apache.samza.standalone.PassthroughJobCoordinatorFactory");
configs.put(JobConfig.PROCESSOR_ID(), "0");
- configs.put(TaskConfig.GROUPER_FACTORY(),
"org.apache.samza.container.grouper.task.GroupByContainerIdsFactory");
+ configs.put(TaskConfig.GROUPER_FACTORY,
"org.apache.samza.container.grouper.task.GroupByContainerIdsFactory");
// run the application
runApplication(new RepartitionWindowApp(), APP_NAME, configs);
diff --git
a/samza-test/src/test/java/org/apache/samza/test/processor/TestZkLocalApplicationRunner.java
b/samza-test/src/test/java/org/apache/samza/test/processor/TestZkLocalApplicationRunner.java
index 0c43674..7ad0632 100644
---
a/samza-test/src/test/java/org/apache/samza/test/processor/TestZkLocalApplicationRunner.java
+++
b/samza-test/src/test/java/org/apache/samza/test/processor/TestZkLocalApplicationRunner.java
@@ -49,7 +49,6 @@ import org.apache.samza.config.JobConfig;
import org.apache.samza.config.JobCoordinatorConfig;
import org.apache.samza.config.MapConfig;
import org.apache.samza.config.TaskConfig;
-import org.apache.samza.config.TaskConfigJava;
import org.apache.samza.config.ZkConfig;
import org.apache.samza.SamzaException;
import org.apache.samza.container.TaskName;
@@ -197,12 +196,12 @@ public class TestZkLocalApplicationRunner extends
IntegrationTestHarness {
String coordinatorSystemName = "coordinatorSystem";
Map<String, String> config = new HashMap<>();
config.put(ZkConfig.ZK_CONSENSUS_TIMEOUT_MS, BARRIER_TIMEOUT_MS);
- config.put(TaskConfig.INPUT_STREAMS(),
Joiner.on(',').join(inputSystemStreams));
+ config.put(TaskConfig.INPUT_STREAMS,
Joiner.on(',').join(inputSystemStreams));
config.put(JobConfig.JOB_DEFAULT_SYSTEM(),
TestZkLocalApplicationRunner.TEST_SYSTEM);
- config.put(TaskConfig.IGNORED_EXCEPTIONS(), "*");
+ config.put(TaskConfig.IGNORED_EXCEPTIONS, "*");
config.put(ZkConfig.ZK_CONNECT, zkConnect());
config.put(JobConfig.SSP_GROUPER_FACTORY(), TEST_SSP_GROUPER_FACTORY);
- config.put(TaskConfig.GROUPER_FACTORY(), TEST_TASK_GROUPER_FACTORY);
+ config.put(TaskConfig.GROUPER_FACTORY, TEST_TASK_GROUPER_FACTORY);
config.put(JobCoordinatorConfig.JOB_COORDINATOR_FACTORY,
TEST_JOB_COORDINATOR_FACTORY);
config.put(ApplicationConfig.APP_NAME, appName);
config.put(ApplicationConfig.APP_ID, appId);
@@ -210,8 +209,8 @@ public class TestZkLocalApplicationRunner extends
IntegrationTestHarness {
config.put(String.format("systems.%s.samza.factory",
TestZkLocalApplicationRunner.TEST_SYSTEM), TEST_SYSTEM_FACTORY);
config.put(JobConfig.JOB_NAME(), appName);
config.put(JobConfig.JOB_ID(), appId);
- config.put(TaskConfigJava.TASK_SHUTDOWN_MS, TASK_SHUTDOWN_MS);
- config.put(TaskConfig.DROP_PRODUCER_ERRORS(), "true");
+ config.put(TaskConfig.TASK_SHUTDOWN_MS, TASK_SHUTDOWN_MS);
+ config.put(TaskConfig.DROP_PRODUCER_ERRORS, "true");
config.put(JobConfig.JOB_DEBOUNCE_TIME_MS(), JOB_DEBOUNCE_TIME_MS);
config.put(JobConfig.MONITOR_PARTITION_CHANGE_FREQUENCY_MS(), "1000");
config.put(ClusterManagerConfig.HOST_AFFINITY_ENABLED, "true");
@@ -609,7 +608,7 @@ public class TestZkLocalApplicationRunner extends
IntegrationTestHarness {
publishKafkaEvents(inputKafkaTopic, 0, NUM_KAFKA_EVENTS, PROCESSOR_IDS[0]);
Map<String, String> configMap =
buildStreamApplicationConfigMap(ImmutableList.of(inputKafkaTopic),
testStreamAppName, testStreamAppId);
- configMap.put(TaskConfig.SHUTDOWN_MS(), "0");
+ configMap.put(TaskConfig.TASK_SHUTDOWN_MS, "0");
configMap.put(JobConfig.PROCESSOR_ID(), PROCESSOR_IDS[0]);
Config applicationConfig1 = new MapConfig(configMap);
@@ -642,7 +641,7 @@ public class TestZkLocalApplicationRunner extends
IntegrationTestHarness {
configMap.put(JobConfig.PROCESSOR_ID(), PROCESSOR_IDS[2]);
// Reset the task shutdown ms for 3rd application to give it ample time to
shutdown cleanly
- configMap.put(TaskConfig.SHUTDOWN_MS(), TASK_SHUTDOWN_MS);
+ configMap.put(TaskConfig.TASK_SHUTDOWN_MS, TASK_SHUTDOWN_MS);
Config applicationConfig3 = new MapConfig(configMap);
CountDownLatch processedMessagesLatch3 = new CountDownLatch(1);
@@ -951,7 +950,7 @@ public class TestZkLocalApplicationRunner extends
IntegrationTestHarness {
TaskApplication taskApplication = new TestTaskApplication(TEST_SYSTEM,
inputKafkaTopic, outputKafkaTopic, processedMessagesLatch1, shutdownLatch);
MapConfig taskApplicationConfig = new
MapConfig(ImmutableList.of(applicationConfig1,
- ImmutableMap.of(TaskConfig.MAX_CONCURRENCY(), "1",
JobConfig.SSP_GROUPER_FACTORY(),
"org.apache.samza.container.grouper.stream.AllSspToSingleTaskGrouperFactory")));
+ ImmutableMap.of(TaskConfig.MAX_CONCURRENCY, "1",
JobConfig.SSP_GROUPER_FACTORY(),
"org.apache.samza.container.grouper.stream.AllSspToSingleTaskGrouperFactory")));
ApplicationRunner appRunner =
ApplicationRunners.getApplicationRunner(taskApplication, taskApplicationConfig);
// Run the application.
diff --git
a/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTableEndToEnd.java
b/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTableEndToEnd.java
index c3f0ba5..7760833 100644
---
a/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTableEndToEnd.java
+++
b/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTableEndToEnd.java
@@ -265,7 +265,7 @@ public class TestLocalTableEndToEnd extends
IntegrationTestHarness {
configs.put(JobConfig.JOB_NAME(), "test-table-job");
configs.put(JobConfig.PROCESSOR_ID(), "1");
configs.put(JobCoordinatorConfig.JOB_COORDINATOR_FACTORY,
PassthroughJobCoordinatorFactory.class.getName());
- configs.put(TaskConfig.GROUPER_FACTORY(),
SingleContainerGrouperFactory.class.getName());
+ configs.put(TaskConfig.GROUPER_FACTORY,
SingleContainerGrouperFactory.class.getName());
// For intermediate streams
configs.put("systems.kafka.samza.factory",
"org.apache.samza.system.kafka.KafkaSystemFactory");
diff --git
a/samza-test/src/test/scala/org/apache/samza/test/integration/StreamTaskTestUtil.scala
b/samza-test/src/test/scala/org/apache/samza/test/integration/StreamTaskTestUtil.scala
index 8982682..1263d46 100644
---
a/samza-test/src/test/scala/org/apache/samza/test/integration/StreamTaskTestUtil.scala
+++
b/samza-test/src/test/scala/org/apache/samza/test/integration/StreamTaskTestUtil.scala
@@ -44,6 +44,7 @@ import org.apache.samza.metrics.MetricsRegistryMap
import org.apache.samza.storage.ChangelogStreamManager
import org.apache.samza.system.{IncomingMessageEnvelope, SystemStreamPartition}
import org.apache.samza.task._
+import org.apache.samza.util.ScalaJavaUtil.JavaOptionals
import org.junit.Assert._
import scala.collection.JavaConverters._
@@ -273,14 +274,14 @@ class StreamTaskTestUtil {
val jobModel = new JobModel(mapConfig, containers)
jobModel.maxChangeLogStreamPartitions = 1
- val taskConfig = new TaskConfigJava(jobModel.getConfig)
- val checkpointManager = Option(taskConfig.getCheckpointManager(new
MetricsRegistryMap(), getClass.getClassLoader))
- checkpointManager match {
- case Some(checkpointManager) => {
- checkpointManager.createResources
- checkpointManager.stop
- }
- case _ => assert(checkpointManager != null, "No checkpoint manager
factory configured")
+ val taskConfig = new TaskConfig(jobModel.getConfig)
+ val checkpointManagerOption =
JavaOptionals.toRichOptional(taskConfig.getCheckpointManager(new
MetricsRegistryMap(),
+ getClass.getClassLoader)).toOption
+ checkpointManagerOption match {
+ case Some(checkpointManager) =>
+ checkpointManager.createResources()
+ checkpointManager.stop()
+ case _ => throw new ConfigException("No checkpoint manager factory
configured")
}
ChangelogStreamManager.createChangelogStreams(jobModel.getConfig,
jobModel.maxChangeLogStreamPartitions)
diff --git
a/samza-tools/src/main/java/org/apache/samza/tools/SamzaSqlConsole.java
b/samza-tools/src/main/java/org/apache/samza/tools/SamzaSqlConsole.java
index ab60156..f26d691 100644
--- a/samza-tools/src/main/java/org/apache/samza/tools/SamzaSqlConsole.java
+++ b/samza-tools/src/main/java/org/apache/samza/tools/SamzaSqlConsole.java
@@ -116,7 +116,7 @@ public class SamzaSqlConsole {
staticConfigs.put(JobConfig.JOB_NAME(), "sql-job");
staticConfigs.put(JobConfig.PROCESSOR_ID(), "1");
staticConfigs.put(JobCoordinatorConfig.JOB_COORDINATOR_FACTORY,
PassthroughJobCoordinatorFactory.class.getName());
- staticConfigs.put(TaskConfig.GROUPER_FACTORY(),
SingleContainerGrouperFactory.class.getName());
+ staticConfigs.put(TaskConfig.GROUPER_FACTORY,
SingleContainerGrouperFactory.class.getName());
staticConfigs.put(SamzaSqlApplicationConfig.CFG_IO_RESOLVER, "config");
String configIOResolverDomain =
diff --git
a/samza-tools/src/main/java/org/apache/samza/tools/benchmark/SystemConsumerWithSamzaBench.java
b/samza-tools/src/main/java/org/apache/samza/tools/benchmark/SystemConsumerWithSamzaBench.java
index 683ced4..c7703b0 100644
---
a/samza-tools/src/main/java/org/apache/samza/tools/benchmark/SystemConsumerWithSamzaBench.java
+++
b/samza-tools/src/main/java/org/apache/samza/tools/benchmark/SystemConsumerWithSamzaBench.java
@@ -70,7 +70,7 @@ public class SystemConsumerWithSamzaBench extends
AbstractSamzaBench {
props.put(JobCoordinatorConfig.JOB_COORDINATOR_FACTORY,
PassthroughJobCoordinatorFactory.class.getName());
props.put(String.format(ConfigBasedSspGrouperFactory.CONFIG_STREAM_PARTITIONS,
streamId),
Joiner.on(",").join(partitions));
- props.put(TaskConfig.GROUPER_FACTORY(),
ConfigBasedSspGrouperFactory.class.getName());
+ props.put(TaskConfig.GROUPER_FACTORY,
ConfigBasedSspGrouperFactory.class.getName());
}
public void start() throws IOException, InterruptedException {