This is an automated email from the ASF dual-hosted git repository.

cameronlee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git


The following commit(s) were added to refs/heads/master by this push:
     new 38e642d  SAMZA-2197: [Scala cleanup] Convert TaskConfig to Java  
(#1039)
38e642d is described below

commit 38e642d330acc6d08ab1f82580bb712f88ce7b7e
Author: Cameron Lee <[email protected]>
AuthorDate: Thu Jun 6 13:33:35 2019 -0700

    SAMZA-2197: [Scala cleanup] Convert TaskConfig to Java  (#1039)
    
    * SAMZA-2197: [Scala cleanup] Convert TaskConfig to Java
    * moving constants to TaskConfigJava
    * renaming TaskConfigJava to TaskConfig and modifying visibility on some 
constants
    * update private method access in TestJobCoordinator
    * clean up unused methods in TaskConfig
    * update javadocs, use Java Optional, move some defaults to TaskConfig
    * post-merge fixes
    * test and import fixes
    * fix javadoc, clean up if-else check, fix format
---
 .../samza/system/kinesis/KinesisSystemFactory.java |   4 +-
 .../samza/coordinator/AzureJobCoordinator.java     |   3 +-
 .../apache/samza/application/ApplicationUtil.java  |   7 +-
 .../clustermanager/ClusterBasedJobCoordinator.java |   4 +-
 .../apache/samza/config/DefaultChooserConfig.java  |   8 +-
 .../java/org/apache/samza/config/TaskConfig.java   | 299 +++++++++++++++++
 .../org/apache/samza/config/TaskConfigJava.java    | 178 ----------
 .../org/apache/samza/container/RunLoopFactory.java |  14 +-
 .../stream/AllSspToSingleTaskGrouperFactory.java   |   4 +-
 .../container/grouper/stream/GroupByPartition.java |   4 +-
 .../stream/GroupBySystemStreamPartition.java       |   4 +-
 .../container/grouper/stream/SSPGrouperProxy.java  |   4 +-
 .../samza/coordinator/MetadataResourceUtil.java    |   6 +-
 .../execution/JobNodeConfigurationGenerator.java   |   9 +-
 .../org/apache/samza/execution/JobPlanner.java     |   6 +-
 .../org/apache/samza/execution/StreamManager.java  |  13 +-
 .../apache/samza/processor/StreamProcessor.java    |   4 +-
 .../org/apache/samza/util/DiagnosticsUtil.java     |   4 +-
 .../java/org/apache/samza/zk/ZkJobCoordinator.java |   4 +-
 .../apache/samza/checkpoint/CheckpointTool.scala   |  15 +-
 .../apache/samza/config/RegExTopicGenerator.scala  |   8 +-
 .../scala/org/apache/samza/config/TaskConfig.scala | 157 ---------
 .../apache/samza/container/SamzaContainer.scala    |  31 +-
 .../container/TaskInstanceExceptionHandler.scala   |  15 +-
 .../apache/samza/coordinator/JobModelManager.scala |  11 +-
 .../apache/samza/job/local/ProcessJobFactory.scala |   8 +-
 .../apache/samza/job/local/ThreadJobFactory.scala  |   2 +-
 .../reporter/MetricsSnapshotReporterFactory.scala  |   3 +-
 .../samza/storage/ContainerStorageManager.java     |   5 +-
 .../org/apache/samza/system/SystemConsumers.scala  |   5 +-
 .../samza/system/chooser/DefaultChooser.scala      |   4 +-
 .../main/scala/org/apache/samza/util/Util.scala    |  25 +-
 .../samza/application/TestApplicationUtil.java     |  10 +-
 .../org/apache/samza/config/TestTaskConfig.java    | 367 +++++++++++++++++++++
 .../apache/samza/config/TestTaskConfigJava.java    |  93 ------
 .../samza/execution/TestExecutionPlanner.java      |  10 +-
 .../TestJobNodeConfigurationGenerator.java         |  11 +-
 .../TestTaskInstanceExceptionHandler.scala         |   2 +-
 .../samza/coordinator/TestJobCoordinator.scala     |   4 +-
 .../apache/samza/system/TestSystemConsumers.scala  |  14 +-
 .../samza/system/kafka/KafkaSystemFactory.scala    |   4 +-
 .../apache/samza/logging/log4j/StreamAppender.java |   2 +-
 .../samza/logging/log4j2/StreamAppender.java       |   2 +-
 .../samza/sql/client/impl/SamzaExecutor.java       |   2 +-
 .../apache/samza/sql/util/SamzaSqlTestConfig.java  |   2 +-
 .../apache/samza/test/framework/TestRunner.java    |  12 +-
 .../samza/processor/TestZkStreamProcessorBase.java |   4 +-
 .../EndOfStreamIntegrationTest.java                |   2 +-
 .../controlmessages/WatermarkIntegrationTest.java  |   4 +-
 .../samza/test/framework/FaultInjectionTest.java   |   4 +-
 .../samza/test/operator/TestAsyncFlatMap.java      |   2 +-
 .../operator/TestRepartitionJoinWindowApp.java     |   6 +-
 .../test/operator/TestRepartitionWindowApp.java    |   2 +-
 .../processor/TestZkLocalApplicationRunner.java    |  17 +-
 .../samza/test/table/TestLocalTableEndToEnd.java   |   2 +-
 .../test/integration/StreamTaskTestUtil.scala      |  17 +-
 .../org/apache/samza/tools/SamzaSqlConsole.java    |   2 +-
 .../benchmark/SystemConsumerWithSamzaBench.java    |   2 +-
 58 files changed, 850 insertions(+), 617 deletions(-)

diff --git 
a/samza-aws/src/main/java/org/apache/samza/system/kinesis/KinesisSystemFactory.java
 
b/samza-aws/src/main/java/org/apache/samza/system/kinesis/KinesisSystemFactory.java
index 558e871..15b44bc 100644
--- 
a/samza-aws/src/main/java/org/apache/samza/system/kinesis/KinesisSystemFactory.java
+++ 
b/samza-aws/src/main/java/org/apache/samza/system/kinesis/KinesisSystemFactory.java
@@ -23,7 +23,7 @@ import org.apache.samza.config.Config;
 import org.apache.samza.config.ConfigException;
 import org.apache.samza.config.JobConfig;
 import org.apache.samza.config.StreamConfig;
-import org.apache.samza.config.TaskConfigJava;
+import org.apache.samza.config.TaskConfig;
 import 
org.apache.samza.container.grouper.stream.AllSspToSingleTaskGrouperFactory;
 import org.apache.samza.metrics.MetricsRegistry;
 import org.apache.samza.system.SystemAdmin;
@@ -69,7 +69,7 @@ public class KinesisSystemFactory implements SystemFactory {
     }
 
     // Kinesis streams cannot be configured as broadcast streams
-    TaskConfigJava taskConfig = new TaskConfigJava(config);
+    TaskConfig taskConfig = new TaskConfig(config);
     if (taskConfig.getBroadcastSystemStreams().stream().anyMatch(ss -> 
system.equals(ss.getSystem()))) {
       throw new ConfigException("Kinesis streams cannot be configured as 
broadcast streams.");
     }
diff --git 
a/samza-azure/src/main/java/org/apache/samza/coordinator/AzureJobCoordinator.java
 
b/samza-azure/src/main/java/org/apache/samza/coordinator/AzureJobCoordinator.java
index 8c4afdc..db082e3 100644
--- 
a/samza-azure/src/main/java/org/apache/samza/coordinator/AzureJobCoordinator.java
+++ 
b/samza-azure/src/main/java/org/apache/samza/coordinator/AzureJobCoordinator.java
@@ -271,7 +271,8 @@ public class AzureJobCoordinator implements JobCoordinator {
    */
   private Set<SystemStreamPartition> getInputStreamPartitions() {
     TaskConfig taskConfig = new TaskConfig(config);
-    scala.collection.immutable.Set<SystemStream> inputSystemStreams = 
taskConfig.getInputStreams();
+    scala.collection.immutable.Set<SystemStream> inputSystemStreams =
+        
JavaConverters.asScalaSetConverter(taskConfig.getInputStreams()).asScala().toSet();
 
     // Get the set of partitions for each SystemStream from the stream metadata
     Set<SystemStreamPartition>
diff --git 
a/samza-core/src/main/java/org/apache/samza/application/ApplicationUtil.java 
b/samza-core/src/main/java/org/apache/samza/application/ApplicationUtil.java
index f779619..edcb3ae 100644
--- a/samza-core/src/main/java/org/apache/samza/application/ApplicationUtil.java
+++ b/samza-core/src/main/java/org/apache/samza/application/ApplicationUtil.java
@@ -18,12 +18,12 @@
  */
 package org.apache.samza.application;
 
+import java.util.Optional;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.samza.config.ApplicationConfig;
 import org.apache.samza.config.Config;
 import org.apache.samza.config.ConfigException;
 import org.apache.samza.config.TaskConfig;
-import scala.Option;
 
 
 /**
@@ -52,8 +52,9 @@ public class ApplicationUtil {
       }
     }
     // no app.class defined. It has to be a legacy application with task.class 
configuration
-    Option<String> taskClassOption = new TaskConfig(config).getTaskClass();
-    if (!taskClassOption.isDefined() || 
!StringUtils.isNotBlank(taskClassOption.getOrElse(null))) {
+    TaskConfig taskConfig = new TaskConfig(config);
+    Optional<String> taskClassOption = taskConfig.getTaskClass();
+    if (!taskClassOption.isPresent() || 
StringUtils.isBlank(taskClassOption.get())) {
       // no task.class defined either. This is wrong.
       throw new ConfigException("Legacy task applications must set a non-empty 
task.class in configuration.");
     }
diff --git 
a/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java
 
b/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java
index bfd036c..7338765 100644
--- 
a/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java
+++ 
b/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java
@@ -33,7 +33,7 @@ import org.apache.samza.config.JobConfig;
 import org.apache.samza.config.MapConfig;
 import org.apache.samza.config.ShellCommandConfig;
 import org.apache.samza.config.StorageConfig;
-import org.apache.samza.config.TaskConfigJava;
+import org.apache.samza.config.TaskConfig;
 import org.apache.samza.container.TaskName;
 import org.apache.samza.coordinator.InputStreamsDiscoveredException;
 import org.apache.samza.coordinator.JobModelManager;
@@ -320,7 +320,7 @@ public class ClusterBasedJobCoordinator {
 
   private Optional<StreamPartitionCountMonitor> 
getPartitionCountMonitor(Config config, SystemAdmins systemAdmins) {
     StreamMetadataCache streamMetadata = new StreamMetadataCache(systemAdmins, 
0, SystemClock.instance());
-    Set<SystemStream> inputStreamsToMonitor = new 
TaskConfigJava(config).getAllInputStreams();
+    Set<SystemStream> inputStreamsToMonitor = new 
TaskConfig(config).getAllInputStreams();
     if (inputStreamsToMonitor.isEmpty()) {
       throw new SamzaException("Input streams to a job can not be empty.");
     }
diff --git 
a/samza-core/src/main/java/org/apache/samza/config/DefaultChooserConfig.java 
b/samza-core/src/main/java/org/apache/samza/config/DefaultChooserConfig.java
index ff344c9..c753b4e 100644
--- a/samza-core/src/main/java/org/apache/samza/config/DefaultChooserConfig.java
+++ b/samza-core/src/main/java/org/apache/samza/config/DefaultChooserConfig.java
@@ -32,12 +32,12 @@ import org.apache.samza.system.SystemStream;
 public class DefaultChooserConfig extends MapConfig {
   private static final String BATCH_SIZE = "task.consumer.batch.size";
 
-  private final TaskConfigJava taskConfigJava;
+  private final TaskConfig taskConfig;
   private final StreamConfig streamConfig;
 
   public DefaultChooserConfig(Config config) {
     super(config);
-    taskConfigJava = new TaskConfigJava(config);
+    taskConfig = new TaskConfig(config);
     streamConfig = new StreamConfig(config);
   }
 
@@ -53,7 +53,7 @@ public class DefaultChooserConfig extends MapConfig {
    */
   public Set<SystemStream> getBootstrapStreams() {
     Set<SystemStream> bootstrapInputs = new HashSet<>();
-    Set<SystemStream> allInputs = taskConfigJava.getAllInputStreams();
+    Set<SystemStream> allInputs = taskConfig.getAllInputStreams();
     for (SystemStream systemStream : allInputs) {
       if (streamConfig.getBootstrapEnabled(systemStream)) {
         bootstrapInputs.add(systemStream);
@@ -71,7 +71,7 @@ public class DefaultChooserConfig extends MapConfig {
    *          were not explicitly configured with a priority are not returned.
    */
   public Map<SystemStream, Integer> getPriorityStreams() {
-    Set<SystemStream> allInputs = taskConfigJava.getAllInputStreams();
+    Set<SystemStream> allInputs = taskConfig.getAllInputStreams();
 
     Map<SystemStream, Integer> priorityStreams = new HashMap<>();
     for (SystemStream systemStream : allInputs) {
diff --git a/samza-core/src/main/java/org/apache/samza/config/TaskConfig.java 
b/samza-core/src/main/java/org/apache/samza/config/TaskConfig.java
new file mode 100644
index 0000000..7c02054
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/config/TaskConfig.java
@@ -0,0 +1,299 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.config;
+
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.samza.Partition;
+import org.apache.samza.checkpoint.CheckpointManager;
+import org.apache.samza.checkpoint.CheckpointManagerFactory;
+import org.apache.samza.container.grouper.task.GroupByContainerCountFactory;
+import org.apache.samza.metrics.MetricsRegistry;
+import org.apache.samza.system.SystemStream;
+import org.apache.samza.system.SystemStreamPartition;
+import org.apache.samza.system.chooser.RoundRobinChooserFactory;
+import org.apache.samza.util.ReflectionUtil;
+import org.apache.samza.util.StreamUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class TaskConfig extends MapConfig {
+  public static final Logger LOGGER = 
LoggerFactory.getLogger(TaskConfig.class);
+
+  // comma-separated list of system-streams
+  public static final String INPUT_STREAMS = "task.inputs";
+  // window period in milliseconds
+  public static final String WINDOW_MS = "task.window.ms";
+  static final long DEFAULT_WINDOW_MS = -1L;
+  // commit period in milliseconds
+  public static final String COMMIT_MS = "task.commit.ms";
+  static final long DEFAULT_COMMIT_MS = 60000L;
+  // how long to wait for a clean shutdown
+  public static final String TASK_SHUTDOWN_MS = "task.shutdown.ms";
+  static final long DEFAULT_TASK_SHUTDOWN_MS = 30000L;
+  // legacy config for specifying task class; replaced by SamzaApplication and 
app.class
+  public static final String TASK_CLASS = "task.class";
+  // command builder to use for launching a Samza job
+  public static final String COMMAND_BUILDER = "task.command.class";
+  // message chooser for controlling stream consumption
+  public static final String MESSAGE_CHOOSER_CLASS_NAME = "task.chooser.class";
+  // define whether to drop the messages or not when deserialization fails
+  public static final String DROP_DESERIALIZATION_ERRORS = 
"task.drop.deserialization.errors";
+  // define whether to drop the messages or not when serialization fails
+  public static final String DROP_SERIALIZATION_ERRORS = 
"task.drop.serialization.errors";
+  // whether to ignore producer errors and drop the messages that failed to 
send
+  public static final String DROP_PRODUCER_ERRORS = 
"task.drop.producer.errors";
+  // exceptions to ignore in process and window
+  public static final String IGNORED_EXCEPTIONS = "task.ignored.exceptions";
+  // class name for task grouper
+  public static final String GROUPER_FACTORY = "task.name.grouper.factory";
+  // max number of messages to process concurrently
+  public static final String MAX_CONCURRENCY = "task.max.concurrency";
+  static final int DEFAULT_MAX_CONCURRENCY = 1;
+  // timeout for triggering a callback
+  public static final String CALLBACK_TIMEOUT_MS = "task.callback.timeout.ms";
+  static final long DEFAULT_CALLBACK_TIMEOUT_MS = -1L;
+  // enable async commit
+  public static final String ASYNC_COMMIT = "task.async.commit";
+  // maximum time to wait for a task worker to complete when there are no new 
messages to handle
+  public static final String MAX_IDLE_MS = "task.max.idle.ms";
+  static final long DEFAULT_MAX_IDLE_MS = 10L;
+  /**
+   * Samza's container polls for more messages under two conditions. The first
+   * condition arises when there are simply no remaining buffered messages to
+   * process for any input SystemStreamPartition. The second condition arises
+   * when some input SystemStreamPartitions have empty buffers, but some do
+   * not. In the latter case, a polling interval is defined to determine how
+   * often to refresh the empty SystemStreamPartition buffers. By default,
+   * this interval is 50ms, which means that any empty SystemStreamPartition
+   * buffer will be refreshed at least every 50ms. A higher value here means
+   * that empty SystemStreamPartitions will be refreshed less often, which
+   * means more latency is introduced, but less CPU and network will be used.
+   * Decreasing this value means that empty SystemStreamPartitions are
+   * refreshed more frequently, thereby introducing less latency, but
+   * increasing CPU and network utilization.
+   */
+  public static final String POLL_INTERVAL_MS = "task.poll.interval.ms";
+  public static final int DEFAULT_POLL_INTERVAL_MS = 50;
+  // broadcast streams consumed by all tasks. e.g. kafka.foo#1
+  public static final String BROADCAST_INPUT_STREAMS = "task.broadcast.inputs";
+  private static final String BROADCAST_STREAM_PATTERN = "^[\\d]+$";
+  private static final String BROADCAST_STREAM_RANGE_PATTERN = 
"^\\[[\\d]+\\-[\\d]+\\]$";
+  public static final String CHECKPOINT_MANAGER_FACTORY = 
"task.checkpoint.factory";
+
+  public TaskConfig(Config config) {
+    super(config);
+  }
+
+  /**
+   * Get the input streams, not including the broadcast streams. Use {@link 
#getAllInputStreams()} to also get the
+   * broadcast streams.
+   */
+  public Set<SystemStream> getInputStreams() {
+    Optional<String> inputStreams = Optional.ofNullable(get(INPUT_STREAMS));
+    if (!inputStreams.isPresent() || inputStreams.get().isEmpty()) {
+      return Collections.emptySet();
+    } else {
+      return Stream.of(inputStreams.get().split(","))
+          .map(systemStreamNames -> 
StreamUtil.getSystemStreamFromNames(systemStreamNames.trim()))
+          .collect(Collectors.toSet());
+    }
+  }
+
+  public long getWindowMs() {
+    return getLong(WINDOW_MS, DEFAULT_WINDOW_MS);
+  }
+
+  public long getCommitMs() {
+    return getLong(COMMIT_MS, DEFAULT_COMMIT_MS);
+  }
+
+  public Optional<String> getTaskClass() {
+    return Optional.ofNullable(get(TASK_CLASS));
+  }
+
+  public String getCommandClass(String defaultCommandClass) {
+    return get(COMMAND_BUILDER, defaultCommandClass);
+  }
+
+  public String getMessageChooserClass() {
+    return 
Optional.ofNullable(get(MESSAGE_CHOOSER_CLASS_NAME)).orElse(RoundRobinChooserFactory.class.getName());
+  }
+
+  public boolean getDropDeserializationErrors() {
+    return getBoolean(DROP_DESERIALIZATION_ERRORS, false);
+  }
+
+  public boolean getDropSerializationErrors() {
+    return getBoolean(DROP_SERIALIZATION_ERRORS, false);
+  }
+
+  public boolean getDropProducerErrors() {
+    return getBoolean(DROP_PRODUCER_ERRORS, false);
+  }
+
+  public int getPollIntervalMs() {
+    return getInt(POLL_INTERVAL_MS, DEFAULT_POLL_INTERVAL_MS);
+  }
+
+  public Optional<String> getIgnoredExceptions() {
+    return Optional.ofNullable(get(IGNORED_EXCEPTIONS));
+  }
+
+  public String getTaskNameGrouperFactory() {
+    Optional<String> taskNameGrouperFactory = 
Optional.ofNullable(get(GROUPER_FACTORY));
+    if (taskNameGrouperFactory.isPresent()) {
+      return taskNameGrouperFactory.get();
+    } else {
+      LOGGER.info(String.format("No %s configuration, using %s", 
GROUPER_FACTORY,
+          GroupByContainerCountFactory.class.getName()));
+      return GroupByContainerCountFactory.class.getName();
+    }
+  }
+
+  public int getMaxConcurrency() {
+    return getInt(MAX_CONCURRENCY, DEFAULT_MAX_CONCURRENCY);
+  }
+
+  public long getCallbackTimeoutMs() {
+    return getLong(CALLBACK_TIMEOUT_MS, DEFAULT_CALLBACK_TIMEOUT_MS);
+  }
+
+  public boolean getAsyncCommit() {
+    return getBoolean(ASYNC_COMMIT, false);
+  }
+
+  public long getMaxIdleMs() {
+    return getLong(MAX_IDLE_MS, DEFAULT_MAX_IDLE_MS);
+  }
+
+  /**
+   * Create the checkpoint manager
+   *
+   * @param metricsRegistry Registry of metrics to use. Can be null if not 
using metrics.
+   * @return CheckpointManager object if checkpoint manager factory is 
configured, otherwise empty.
+   */
+  public Optional<CheckpointManager> getCheckpointManager(MetricsRegistry 
metricsRegistry, ClassLoader classLoader) {
+    return Optional.ofNullable(get(CHECKPOINT_MANAGER_FACTORY))
+        .filter(StringUtils::isNotBlank)
+        .map(checkpointManagerFactoryName -> 
ReflectionUtil.getObj(classLoader, checkpointManagerFactoryName,
+            CheckpointManagerFactory.class).getCheckpointManager(this, 
metricsRegistry));
+  }
+
+  /**
+   * Get the systemStreamPartitions of the broadcast stream. Specifying
+   * one partition for one stream or a range of the partitions for one
+   * stream is allowed.
+   *
+   * @return a Set of SystemStreamPartitions
+   */
+  public Set<SystemStreamPartition> getBroadcastSystemStreamPartitions() {
+    HashSet<SystemStreamPartition> systemStreamPartitionSet = new HashSet<>();
+    List<String> systemStreamPartitions = getList(BROADCAST_INPUT_STREAMS, 
Collections.emptyList());
+
+    for (String systemStreamPartition : systemStreamPartitions) {
+      int hashPosition = systemStreamPartition.indexOf("#");
+      if (hashPosition == -1) {
+        throw new IllegalArgumentException("incorrect format in " + 
systemStreamPartition
+            + ". Broadcast stream names should be in the form 
'system.stream#partitionId' or 'system.stream#[partitionN-partitionM]'");
+      } else {
+        String systemStreamName = systemStreamPartition.substring(0, 
hashPosition);
+        String partitionSegment = systemStreamPartition.substring(hashPosition 
+ 1);
+        SystemStream systemStream = 
StreamUtil.getSystemStreamFromNames(systemStreamName);
+
+        if (Pattern.matches(BROADCAST_STREAM_PATTERN, partitionSegment)) {
+          systemStreamPartitionSet.add(new SystemStreamPartition(systemStream, 
new Partition(Integer.valueOf(partitionSegment))));
+        } else {
+          if (Pattern.matches(BROADCAST_STREAM_RANGE_PATTERN, 
partitionSegment)) {
+            int partitionStart = Integer.valueOf(partitionSegment.substring(1, 
partitionSegment.lastIndexOf("-")));
+            int partitionEnd = 
Integer.valueOf(partitionSegment.substring(partitionSegment.lastIndexOf("-") + 
1, partitionSegment.indexOf("]")));
+            if (partitionStart > partitionEnd) {
+              LOGGER.warn("The starting partition in stream " + 
systemStream.toString() + " is bigger than the ending Partition. No partition 
is added");
+            }
+            for (int i = partitionStart; i <= partitionEnd; i++) {
+              systemStreamPartitionSet.add(new 
SystemStreamPartition(systemStream, new Partition(i)));
+            }
+          } else {
+            throw new IllegalArgumentException("incorrect format in " + 
systemStreamPartition
+                + ". Broadcast stream names should be in the form 
'system.stream#partitionId' or 'system.stream#[partitionN-partitionM]'");
+          }
+        }
+      }
+    }
+    return systemStreamPartitionSet;
+  }
+
+  /**
+   * Get the SystemStreams for the configured broadcast streams.
+   *
+   * @return the set of SystemStreams for which there are broadcast stream 
SSPs configured.
+   */
+  public Set<SystemStream> getBroadcastSystemStreams() {
+    Set<SystemStream> broadcastSS = new HashSet<>();
+    Set<SystemStreamPartition> broadcastSSPs = 
getBroadcastSystemStreamPartitions();
+    for (SystemStreamPartition bssp : broadcastSSPs) {
+      broadcastSS.add(bssp.getSystemStream());
+    }
+    return Collections.unmodifiableSet(broadcastSS);
+  }
+
+  /**
+   * Get the SystemStreams for the configured input and broadcast streams.
+   *
+   * @return the set of SystemStreams for both standard inputs and broadcast 
stream inputs.
+   */
+  public Set<SystemStream> getAllInputStreams() {
+    Set<SystemStream> allInputSS = new HashSet<>();
+
+    allInputSS.addAll(getInputStreams());
+    allInputSS.addAll(getBroadcastSystemStreams());
+
+    return Collections.unmodifiableSet(allInputSS);
+  }
+
+  /**
+   * Returns a value indicating how long to wait for the tasks to shutdown
+   * If the value is not defined in the config or if does not parse correctly, 
we return the default value -
+   * {@value #DEFAULT_TASK_SHUTDOWN_MS}
+   *
+   * @return Long value indicating how long to wait for all the tasks to 
shutdown
+   */
+  public long getShutdownMs() {
+    String shutdownMs = get(TASK_SHUTDOWN_MS);
+    try {
+      return Long.parseLong(shutdownMs);
+    } catch (NumberFormatException nfe) {
+      LOGGER.warn(String.format(
+          "Unable to parse user-configure value for %s - %s. Using default 
value %d",
+          TASK_SHUTDOWN_MS,
+          shutdownMs,
+          DEFAULT_TASK_SHUTDOWN_MS));
+      return DEFAULT_TASK_SHUTDOWN_MS;
+    }
+  }
+}
diff --git 
a/samza-core/src/main/java/org/apache/samza/config/TaskConfigJava.java 
b/samza-core/src/main/java/org/apache/samza/config/TaskConfigJava.java
deleted file mode 100644
index 6ef41e9..0000000
--- a/samza-core/src/main/java/org/apache/samza/config/TaskConfigJava.java
+++ /dev/null
@@ -1,178 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.config;
-
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-import java.util.regex.Pattern;
-
-import org.apache.commons.lang3.StringUtils;
-import org.apache.samza.Partition;
-import org.apache.samza.checkpoint.CheckpointManager;
-import org.apache.samza.checkpoint.CheckpointManagerFactory;
-import org.apache.samza.metrics.MetricsRegistry;
-import org.apache.samza.system.SystemStream;
-import org.apache.samza.system.SystemStreamPartition;
-import org.apache.samza.util.ReflectionUtil;
-import org.apache.samza.util.StreamUtil;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import scala.collection.JavaConverters;
-
-
-public class TaskConfigJava extends MapConfig {
-  // Task Configs
-  public static final String TASK_SHUTDOWN_MS = "task.shutdown.ms";
-  public static final long DEFAULT_TASK_SHUTDOWN_MS = 30000L;
-
-  // broadcast streams consumed by all tasks. e.g. kafka.foo#1
-  public static final String BROADCAST_INPUT_STREAMS = "task.broadcast.inputs";
-  private static final String BROADCAST_STREAM_PATTERN = "^[\\d]+$";
-  private static final String BROADCAST_STREAM_RANGE_PATTERN = 
"^\\[[\\d]+\\-[\\d]+\\]$";
-
-  // class name to use when sending offset checkpoints
-  public static final String CHECKPOINT_MANAGER_FACTORY = 
"task.checkpoint.factory";
-
-  public static final Logger LOGGER = 
LoggerFactory.getLogger(TaskConfigJava.class);
-
-  public TaskConfigJava(Config config) {
-    super(config);
-  }
-
-  /**
-   * Get the name of the checkpoint manager factory
-   *
-   * @return Name of checkpoint manager factory
-   */
-  public String getCheckpointManagerFactoryName() {
-    return get(CHECKPOINT_MANAGER_FACTORY, null);
-  }
-
-  /**
-   * Create the checkpoint manager
-   *
-   * @param metricsRegistry Registry of metrics to use. Can be null if not 
using metrics.
-   * @return CheckpointManager object if checkpoint manager factory is 
configured, otherwise null.
-   */
-  public CheckpointManager getCheckpointManager(MetricsRegistry 
metricsRegistry, ClassLoader classLoader) {
-    // Initialize checkpoint streams during job coordination
-    String checkpointManagerFactoryName = getCheckpointManagerFactoryName();
-    if (StringUtils.isNotBlank(checkpointManagerFactoryName)) {
-      return ReflectionUtil.getObj(classLoader, checkpointManagerFactoryName, 
CheckpointManagerFactory.class)
-          .getCheckpointManager(this, metricsRegistry);
-    }
-    return null;
-  }
-
-  /**
-   * Get the systemStreamPartitions of the broadcast stream. Specifying
-   * one partition for one stream or a range of the partitions for one
-   * stream is allowed.
-   *
-   * @return a Set of SystemStreamPartitions
-   */
-  public Set<SystemStreamPartition> getBroadcastSystemStreamPartitions() {
-    HashSet<SystemStreamPartition> systemStreamPartitionSet = new 
HashSet<SystemStreamPartition>();
-    List<String> systemStreamPartitions = getList(BROADCAST_INPUT_STREAMS, 
Collections.<String>emptyList());
-
-    for (String systemStreamPartition : systemStreamPartitions) {
-      int hashPosition = systemStreamPartition.indexOf("#");
-      if (hashPosition == -1) {
-        throw new IllegalArgumentException("incorrect format in " + 
systemStreamPartition
-            + ". Broadcast stream names should be in the form 
'system.stream#partitionId' or 'system.stream#[partitionN-partitionM]'");
-      } else {
-        String systemStreamName = systemStreamPartition.substring(0, 
hashPosition);
-        String partitionSegment = systemStreamPartition.substring(hashPosition 
+ 1);
-        SystemStream systemStream = 
StreamUtil.getSystemStreamFromNames(systemStreamName);
-
-        if (Pattern.matches(BROADCAST_STREAM_PATTERN, partitionSegment)) {
-          systemStreamPartitionSet.add(new SystemStreamPartition(systemStream, 
new Partition(Integer.valueOf(partitionSegment))));
-        } else {
-          if (Pattern.matches(BROADCAST_STREAM_RANGE_PATTERN, 
partitionSegment)) {
-            int partitionStart = Integer.valueOf(partitionSegment.substring(1, 
partitionSegment.lastIndexOf("-")));
-            int partitionEnd = 
Integer.valueOf(partitionSegment.substring(partitionSegment.lastIndexOf("-") + 
1, partitionSegment.indexOf("]")));
-            if (partitionStart > partitionEnd) {
-              LOGGER.warn("The starting partition in stream " + 
systemStream.toString() + " is bigger than the ending Partition. No partition 
is added");
-            }
-            for (int i = partitionStart; i <= partitionEnd; i++) {
-              systemStreamPartitionSet.add(new 
SystemStreamPartition(systemStream, new Partition(i)));
-            }
-          } else {
-            throw new IllegalArgumentException("incorrect format in " + 
systemStreamPartition
-                + ". Broadcast stream names should be in the form 
'system.stream#partitionId' or 'system.stream#[partitionN-partitionM]'");
-          }
-        }
-      }
-    }
-    return systemStreamPartitionSet;
-  }
-
-  /**
-   * Get the SystemStreams for the configured broadcast streams.
-   *
-   * @return the set of SystemStreams for which there are broadcast stream 
SSPs configured.
-   */
-  public Set<SystemStream> getBroadcastSystemStreams() {
-    Set<SystemStream> broadcastSS = new HashSet<>();
-    Set<SystemStreamPartition> broadcastSSPs = 
getBroadcastSystemStreamPartitions();
-    for (SystemStreamPartition bssp : broadcastSSPs) {
-      broadcastSS.add(bssp.getSystemStream());
-    }
-    return Collections.unmodifiableSet(broadcastSS);
-  }
-
-  /**
-   * Get the SystemStreams for the configured input and broadcast streams.
-   *
-   * @return the set of SystemStreams for both standard inputs and broadcast 
stream inputs.
-   */
-  public Set<SystemStream> getAllInputStreams() {
-    Set<SystemStream> allInputSS = new HashSet<>();
-
-    TaskConfig taskConfig = TaskConfig.Config2Task(this);
-    allInputSS.addAll((Set<? extends SystemStream>) 
JavaConverters.setAsJavaSetConverter(taskConfig.getInputStreams()).asJava());
-    allInputSS.addAll(getBroadcastSystemStreams());
-
-    return Collections.unmodifiableSet(allInputSS);
-  }
-
-  /**
-   * Returns a value indicating how long to wait for the tasks to shutdown
-   * If the value is not defined in the config or if does not parse correctly, 
we return the default value -
-   * {@value #DEFAULT_TASK_SHUTDOWN_MS}
-   *
-   * @return Long value indicating how long to wait for all the tasks to 
shutdown
-   */
-  public long getShutdownMs() {
-    String shutdownMs = get(TASK_SHUTDOWN_MS);
-    try {
-      return Long.parseLong(shutdownMs);
-    } catch (NumberFormatException nfe) {
-      LOGGER.warn(String.format(
-          "Unable to parse user-configure value for %s - %s. Using default 
value %d",
-          TASK_SHUTDOWN_MS,
-          shutdownMs,
-          DEFAULT_TASK_SHUTDOWN_MS));
-      return DEFAULT_TASK_SHUTDOWN_MS;
-    }
-  }
-}
diff --git 
a/samza-core/src/main/java/org/apache/samza/container/RunLoopFactory.java 
b/samza-core/src/main/java/org/apache/samza/container/RunLoopFactory.java
index b50a270..0e0a01c 100644
--- a/samza-core/src/main/java/org/apache/samza/container/RunLoopFactory.java
+++ b/samza-core/src/main/java/org/apache/samza/container/RunLoopFactory.java
@@ -41,14 +41,14 @@ public class RunLoopFactory {
       ExecutorService threadPool,
       long maxThrottlingDelayMs,
       SamzaContainerMetrics containerMetrics,
-      TaskConfig config,
+      TaskConfig taskConfig,
       HighResolutionClock clock) {
 
-    long taskWindowMs = config.getWindowMs();
+    long taskWindowMs = taskConfig.getWindowMs();
 
     log.info("Got window milliseconds: {}.", taskWindowMs);
 
-    long taskCommitMs = config.getCommitMs();
+    long taskCommitMs = taskConfig.getCommitMs();
 
     log.info("Got commit milliseconds: {}.", taskCommitMs);
 
@@ -64,16 +64,16 @@ public class RunLoopFactory {
       throw new SamzaException("Mixing StreamTask and AsyncStreamTask is not 
supported");
     }
 
-    int taskMaxConcurrency = config.getMaxConcurrency();
+    int taskMaxConcurrency = taskConfig.getMaxConcurrency();
     log.info("Got taskMaxConcurrency: {}.", taskMaxConcurrency);
 
-    boolean isAsyncCommitEnabled = config.getAsyncCommit();
+    boolean isAsyncCommitEnabled = taskConfig.getAsyncCommit();
     log.info("Got asyncCommitEnabled: {}.", isAsyncCommitEnabled);
 
-    long callbackTimeout = config.getCallbackTimeoutMs();
+    long callbackTimeout = taskConfig.getCallbackTimeoutMs();
     log.info("Got callbackTimeout: {}.", callbackTimeout);
 
-    long maxIdleMs = config.getMaxIdleMs();
+    long maxIdleMs = taskConfig.getMaxIdleMs();
     log.info("Got maxIdleMs: {}.", maxIdleMs);
 
     log.info("Run loop in asynchronous mode.");
diff --git 
a/samza-core/src/main/java/org/apache/samza/container/grouper/stream/AllSspToSingleTaskGrouperFactory.java
 
b/samza-core/src/main/java/org/apache/samza/container/grouper/stream/AllSspToSingleTaskGrouperFactory.java
index d3c5080..794a1f9 100644
--- 
a/samza-core/src/main/java/org/apache/samza/container/grouper/stream/AllSspToSingleTaskGrouperFactory.java
+++ 
b/samza-core/src/main/java/org/apache/samza/container/grouper/stream/AllSspToSingleTaskGrouperFactory.java
@@ -29,7 +29,7 @@ import org.apache.samza.SamzaException;
 import org.apache.samza.config.Config;
 import org.apache.samza.config.ConfigException;
 import org.apache.samza.config.JobConfig;
-import org.apache.samza.config.TaskConfigJava;
+import org.apache.samza.config.TaskConfig;
 import org.apache.samza.container.TaskName;
 import org.apache.samza.system.SystemStreamPartition;
 
@@ -73,7 +73,7 @@ class AllSspToSingleTaskGrouper implements 
SystemStreamPartitionGrouper {
 public class AllSspToSingleTaskGrouperFactory implements 
SystemStreamPartitionGrouperFactory {
   @Override
   public SystemStreamPartitionGrouper getSystemStreamPartitionGrouper(Config 
config) {
-    if (!(new TaskConfigJava(config).getBroadcastSystemStreams().isEmpty())) {
+    if (!(new TaskConfig(config).getBroadcastSystemStreams().isEmpty())) {
       throw new ConfigException("The job configured with 
AllSspToSingleTaskGrouper cannot have broadcast streams.");
     }
 
diff --git 
a/samza-core/src/main/java/org/apache/samza/container/grouper/stream/GroupByPartition.java
 
b/samza-core/src/main/java/org/apache/samza/container/grouper/stream/GroupByPartition.java
index 9542791..154aa67 100644
--- 
a/samza-core/src/main/java/org/apache/samza/container/grouper/stream/GroupByPartition.java
+++ 
b/samza-core/src/main/java/org/apache/samza/container/grouper/stream/GroupByPartition.java
@@ -24,7 +24,7 @@ import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
 import org.apache.samza.config.Config;
-import org.apache.samza.config.TaskConfigJava;
+import org.apache.samza.config.TaskConfig;
 import org.apache.samza.container.TaskName;
 import org.apache.samza.system.SystemStreamPartition;
 
@@ -38,7 +38,7 @@ public class GroupByPartition implements 
SystemStreamPartitionGrouper {
   private final Set<SystemStreamPartition> broadcastStreams;
 
   public GroupByPartition(Config config) {
-    TaskConfigJava taskConfig = new TaskConfigJava(config);
+    TaskConfig taskConfig = new TaskConfig(config);
     this.broadcastStreams = taskConfig.getBroadcastSystemStreamPartitions();
   }
 
diff --git 
a/samza-core/src/main/java/org/apache/samza/container/grouper/stream/GroupBySystemStreamPartition.java
 
b/samza-core/src/main/java/org/apache/samza/container/grouper/stream/GroupBySystemStreamPartition.java
index 90828fd..9f3fbf1 100644
--- 
a/samza-core/src/main/java/org/apache/samza/container/grouper/stream/GroupBySystemStreamPartition.java
+++ 
b/samza-core/src/main/java/org/apache/samza/container/grouper/stream/GroupBySystemStreamPartition.java
@@ -25,7 +25,7 @@ import java.util.Map;
 import java.util.Set;
 
 import org.apache.samza.config.Config;
-import org.apache.samza.config.TaskConfigJava;
+import org.apache.samza.config.TaskConfig;
 import org.apache.samza.container.TaskName;
 import org.apache.samza.system.SystemStreamPartition;
 
@@ -37,7 +37,7 @@ public class GroupBySystemStreamPartition implements 
SystemStreamPartitionGroupe
   private final Set<SystemStreamPartition> broadcastStreams;
 
   public GroupBySystemStreamPartition(Config config) {
-    TaskConfigJava taskConfig = new TaskConfigJava(config);
+    TaskConfig taskConfig = new TaskConfig(config);
     broadcastStreams = taskConfig.getBroadcastSystemStreamPartitions();
   }
 
diff --git 
a/samza-core/src/main/java/org/apache/samza/container/grouper/stream/SSPGrouperProxy.java
 
b/samza-core/src/main/java/org/apache/samza/container/grouper/stream/SSPGrouperProxy.java
index a24a828..29b27c5 100644
--- 
a/samza-core/src/main/java/org/apache/samza/container/grouper/stream/SSPGrouperProxy.java
+++ 
b/samza-core/src/main/java/org/apache/samza/container/grouper/stream/SSPGrouperProxy.java
@@ -29,7 +29,7 @@ import java.util.HashSet;
 import com.google.common.base.Preconditions;
 import org.apache.samza.config.Config;
 import org.apache.samza.config.JobConfig;
-import org.apache.samza.config.TaskConfigJava;
+import org.apache.samza.config.TaskConfig;
 import org.apache.samza.container.TaskName;
 import org.apache.samza.container.grouper.task.GrouperMetadata;
 import org.apache.samza.metrics.MetricsRegistryMap;
@@ -55,7 +55,7 @@ public class SSPGrouperProxy {
     Preconditions.checkNotNull(config);
     Preconditions.checkNotNull(grouper);
     this.grouper = grouper;
-    this.broadcastSystemStreamPartitions = new 
TaskConfigJava(config).getBroadcastSystemStreamPartitions();
+    this.broadcastSystemStreamPartitions = new 
TaskConfig(config).getBroadcastSystemStreamPartitions();
     this.systemStreamPartitionMapper = getSystemStreamPartitionMapper(config, 
classLoader);
   }
 
diff --git 
a/samza-core/src/main/java/org/apache/samza/coordinator/MetadataResourceUtil.java
 
b/samza-core/src/main/java/org/apache/samza/coordinator/MetadataResourceUtil.java
index 97037a5..148e942 100644
--- 
a/samza-core/src/main/java/org/apache/samza/coordinator/MetadataResourceUtil.java
+++ 
b/samza-core/src/main/java/org/apache/samza/coordinator/MetadataResourceUtil.java
@@ -21,7 +21,7 @@ package org.apache.samza.coordinator;
 
 import com.google.common.annotations.VisibleForTesting;
 import org.apache.samza.checkpoint.CheckpointManager;
-import org.apache.samza.config.TaskConfigJava;
+import org.apache.samza.config.TaskConfig;
 import org.apache.samza.job.model.JobModel;
 import org.apache.samza.metrics.MetricsRegistry;
 import org.apache.samza.storage.ChangelogStreamManager;
@@ -41,8 +41,8 @@ public class MetadataResourceUtil {
    */
   public MetadataResourceUtil(JobModel jobModel, MetricsRegistry 
metricsRegistry, ClassLoader classLoader) {
     this.jobModel = jobModel;
-    this.checkpointManager =
-        new 
TaskConfigJava(jobModel.getConfig()).getCheckpointManager(metricsRegistry, 
classLoader);
+    TaskConfig taskConfig = new TaskConfig(jobModel.getConfig());
+    this.checkpointManager = taskConfig.getCheckpointManager(metricsRegistry, 
classLoader).orElse(null);
   }
 
   @VisibleForTesting
diff --git 
a/samza-core/src/main/java/org/apache/samza/execution/JobNodeConfigurationGenerator.java
 
b/samza-core/src/main/java/org/apache/samza/execution/JobNodeConfigurationGenerator.java
index 15a1801..fdcce38 100644
--- 
a/samza-core/src/main/java/org/apache/samza/execution/JobNodeConfigurationGenerator.java
+++ 
b/samza-core/src/main/java/org/apache/samza/execution/JobNodeConfigurationGenerator.java
@@ -41,7 +41,6 @@ import org.apache.samza.config.SerializerConfig;
 import org.apache.samza.config.StorageConfig;
 import org.apache.samza.config.StreamConfig;
 import org.apache.samza.config.TaskConfig;
-import org.apache.samza.config.TaskConfigJava;
 import org.apache.samza.operators.KV;
 import org.apache.samza.operators.spec.JoinOperatorSpec;
 import org.apache.samza.operators.spec.OperatorSpec;
@@ -164,7 +163,7 @@ import org.slf4j.LoggerFactory;
     configureTables(generatedConfig, originalConfig, reachableTables, inputs);
 
     // generate the task.inputs configuration
-    generatedConfig.put(TaskConfig.INPUT_STREAMS(), 
Joiner.on(',').join(inputs));
+    generatedConfig.put(TaskConfig.INPUT_STREAMS, Joiner.on(',').join(inputs));
 
     LOG.info("Job {} has generated configs {}", jobNode.getJobNameAndId(), 
generatedConfig);
 
@@ -182,11 +181,11 @@ import org.slf4j.LoggerFactory;
     if (broadcastStreams.isEmpty()) {
       return;
     }
-    String broadcastInputs = 
config.get(TaskConfigJava.BROADCAST_INPUT_STREAMS);
+    String broadcastInputs = config.get(TaskConfig.BROADCAST_INPUT_STREAMS);
     if (StringUtils.isNotBlank(broadcastInputs)) {
       broadcastStreams.add(broadcastInputs);
     }
-    configs.put(TaskConfigJava.BROADCAST_INPUT_STREAMS, 
Joiner.on(',').join(broadcastStreams));
+    configs.put(TaskConfig.BROADCAST_INPUT_STREAMS, 
Joiner.on(',').join(broadcastStreams));
   }
 
   private void configureWindowInterval(Map<String, String> configs, Config 
config,
@@ -200,7 +199,7 @@ import org.slf4j.LoggerFactory;
     long triggerInterval = computeTriggerInterval(reachableOperators);
     LOG.info("Using triggering interval: {}", triggerInterval);
 
-    configs.put(TaskConfig.WINDOW_MS(), String.valueOf(triggerInterval));
+    configs.put(TaskConfig.WINDOW_MS, String.valueOf(triggerInterval));
   }
 
   /**
diff --git 
a/samza-core/src/main/java/org/apache/samza/execution/JobPlanner.java 
b/samza-core/src/main/java/org/apache/samza/execution/JobPlanner.java
index f495870..577964e 100644
--- a/samza-core/src/main/java/org/apache/samza/execution/JobPlanner.java
+++ b/samza-core/src/main/java/org/apache/samza/execution/JobPlanner.java
@@ -78,11 +78,11 @@ public abstract class JobPlanner {
     // TODO: This should all be consolidated with ExecutionPlanner after 
fixing SAMZA-1811
     // Don't generate any configurations for LegacyTaskApplications
     if (!LegacyTaskApplication.class.isAssignableFrom(appDesc.getAppClass())) {
-      if (userConfig.containsKey(TaskConfig.INPUT_STREAMS())) {
+      if (userConfig.containsKey(TaskConfig.INPUT_STREAMS)) {
         LOG.warn("SamzaApplications should not specify task.inputs in 
configuration. " +
             "Specify them using InputDescriptors instead. Ignoring configured 
task.inputs value of " +
-            userConfig.get(TaskConfig.INPUT_STREAMS()));
-        allowedUserConfig.remove(TaskConfig.INPUT_STREAMS());
+            userConfig.get(TaskConfig.INPUT_STREAMS));
+        allowedUserConfig.remove(TaskConfig.INPUT_STREAMS);
       }
       generatedConfig.putAll(getGeneratedConfig(runId));
     }
diff --git 
a/samza-core/src/main/java/org/apache/samza/execution/StreamManager.java 
b/samza-core/src/main/java/org/apache/samza/execution/StreamManager.java
index 6c308af..fe0d9fd 100644
--- a/samza-core/src/main/java/org/apache/samza/execution/StreamManager.java
+++ b/samza-core/src/main/java/org/apache/samza/execution/StreamManager.java
@@ -30,7 +30,6 @@ import java.util.stream.Collectors;
 
 import org.apache.samza.SamzaException;
 import org.apache.samza.checkpoint.CheckpointManager;
-import org.apache.samza.checkpoint.CheckpointManagerFactory;
 import org.apache.samza.config.*;
 import org.apache.samza.metrics.MetricsRegistryMap;
 import org.apache.samza.system.StreamSpec;
@@ -38,13 +37,11 @@ import org.apache.samza.system.SystemAdmin;
 import org.apache.samza.system.SystemAdmins;
 import org.apache.samza.system.SystemStream;
 import org.apache.samza.system.SystemStreamMetadata;
-import org.apache.samza.util.ReflectionUtil;
 import org.apache.samza.util.StreamUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import scala.collection.JavaConversions;
 
-import static org.apache.samza.util.ScalaJavaUtil.defaultValue;
 
 public class StreamManager {
   private static final Logger LOGGER = 
LoggerFactory.getLogger(StreamManager.class);
@@ -127,14 +124,8 @@ public class StreamManager {
 
       //Find checkpoint stream and clean up
       TaskConfig taskConfig = new TaskConfig(prevConfig);
-      String checkpointManagerFactoryClassName = 
taskConfig.getCheckpointManagerFactory()
-          .getOrElse(defaultValue(null));
-      if (checkpointManagerFactoryClassName != null) {
-        CheckpointManager checkpointManager =
-            ReflectionUtil.getObj(classLoader, 
checkpointManagerFactoryClassName, CheckpointManagerFactory.class)
-                .getCheckpointManager(prevConfig, new MetricsRegistryMap());
-        checkpointManager.clearCheckpoints();
-      }
+      taskConfig.getCheckpointManager(new MetricsRegistryMap(), classLoader)
+          .ifPresent(CheckpointManager::clearCheckpoints);
 
       //Find changelog streams and remove them
       StorageConfig storageConfig = new StorageConfig(prevConfig);
diff --git 
a/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java 
b/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java
index 6e45a02..3ba200b 100644
--- a/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java
+++ b/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java
@@ -37,7 +37,7 @@ import org.apache.samza.config.Config;
 import org.apache.samza.config.JobConfig;
 import org.apache.samza.config.JobCoordinatorConfig;
 import org.apache.samza.config.MetricsConfig;
-import org.apache.samza.config.TaskConfigJava;
+import org.apache.samza.config.TaskConfig;
 import org.apache.samza.container.SamzaContainer;
 import org.apache.samza.container.SamzaContainerListener;
 import org.apache.samza.context.ApplicationContainerContext;
@@ -247,7 +247,7 @@ public class StreamProcessor {
     this.applicationDefinedContainerContextFactoryOptional = 
applicationDefinedContainerContextFactoryOptional;
     this.applicationDefinedTaskContextFactoryOptional = 
applicationDefinedTaskContextFactoryOptional;
     this.externalContextOptional = externalContextOptional;
-    this.taskShutdownMs = new TaskConfigJava(config).getShutdownMs();
+    this.taskShutdownMs = new TaskConfig(config).getShutdownMs();
     this.jobCoordinator = (jobCoordinator != null) ? jobCoordinator : 
createJobCoordinator();
     this.jobCoordinatorListener = createJobCoordinatorListener();
     this.jobCoordinator.setListener(jobCoordinatorListener);
diff --git 
a/samza-core/src/main/java/org/apache/samza/util/DiagnosticsUtil.java 
b/samza-core/src/main/java/org/apache/samza/util/DiagnosticsUtil.java
index 3c05228..6060720 100644
--- a/samza-core/src/main/java/org/apache/samza/util/DiagnosticsUtil.java
+++ b/samza-core/src/main/java/org/apache/samza/util/DiagnosticsUtil.java
@@ -29,7 +29,7 @@ import org.apache.samza.config.ConfigException;
 import org.apache.samza.config.JobConfig;
 import org.apache.samza.config.MetricsConfig;
 import org.apache.samza.config.SystemConfig;
-import org.apache.samza.config.TaskConfigJava;
+import org.apache.samza.config.TaskConfig;
 import org.apache.samza.diagnostics.DiagnosticsManager;
 import org.apache.samza.metrics.MetricsRegistryMap;
 import org.apache.samza.metrics.reporter.Metrics;
@@ -111,7 +111,7 @@ public class DiagnosticsUtil {
       SystemFactory systemFactory = 
Util.getObj(diagnosticsSystemFactoryName.get(), SystemFactory.class);
       SystemProducer systemProducer = 
systemFactory.getProducer(diagnosticsSystemStream.getSystem(), config, new 
MetricsRegistryMap());
       DiagnosticsManager diagnosticsManager = new DiagnosticsManager(jobName, 
jobId, containerId, execEnvContainerId.orElse(""), taskClassVersion,
-          samzaVersion, hostName, diagnosticsSystemStream, systemProducer, 
Duration.ofMillis(new TaskConfigJava(config).getShutdownMs()));
+          samzaVersion, hostName, diagnosticsSystemStream, systemProducer, 
Duration.ofMillis(new TaskConfig(config).getShutdownMs()));
 
       MetricsSnapshotReporter diagnosticsReporter =
           new MetricsSnapshotReporter(systemProducer, diagnosticsSystemStream, 
publishInterval, jobName, jobId,
diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java 
b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
index fcc73ef..b0b9836 100644
--- a/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
+++ b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
@@ -34,7 +34,7 @@ import org.apache.samza.config.Config;
 import org.apache.samza.config.JobConfig;
 import org.apache.samza.config.MapConfig;
 import org.apache.samza.config.StorageConfig;
-import org.apache.samza.config.TaskConfigJava;
+import org.apache.samza.config.TaskConfig;
 import org.apache.samza.config.ZkConfig;
 import org.apache.samza.container.TaskName;
 import org.apache.samza.container.grouper.task.GrouperMetadata;
@@ -368,7 +368,7 @@ public class ZkJobCoordinator implements JobCoordinator {
   @VisibleForTesting
   StreamPartitionCountMonitor getPartitionCountMonitor() {
     StreamMetadataCache streamMetadata = new StreamMetadataCache(systemAdmins, 
0, SystemClock.instance());
-    Set<SystemStream> inputStreamsToMonitor = new 
TaskConfigJava(config).getAllInputStreams();
+    Set<SystemStream> inputStreamsToMonitor = new 
TaskConfig(config).getAllInputStreams();
 
     return new StreamPartitionCountMonitor(
             inputStreamsToMonitor,
diff --git 
a/samza-core/src/main/scala/org/apache/samza/checkpoint/CheckpointTool.scala 
b/samza-core/src/main/scala/org/apache/samza/checkpoint/CheckpointTool.scala
index 31cbda5..7c0cedd 100644
--- a/samza-core/src/main/scala/org/apache/samza/checkpoint/CheckpointTool.scala
+++ b/samza-core/src/main/scala/org/apache/samza/checkpoint/CheckpointTool.scala
@@ -21,12 +21,12 @@ package org.apache.samza.checkpoint
 
 import java.net.URI
 import java.util
+import java.util.function.Supplier
 import java.util.regex.Pattern
 
 import joptsimple.ArgumentAcceptingOptionSpec
 import joptsimple.OptionSet
 import org.apache.samza.checkpoint.CheckpointTool.TaskNameToCheckpointMap
-import org.apache.samza.config.TaskConfig.Config2Task
 import org.apache.samza.config._
 import org.apache.samza.container.TaskName
 import org.apache.samza.job.JobRunner.info
@@ -42,6 +42,7 @@ import 
org.apache.samza.coordinator.metadatastore.{CoordinatorStreamStore, Names
 import org.apache.samza.coordinator.stream.messages.SetChangelogMapping
 import org.apache.samza.execution.JobPlanner
 import org.apache.samza.storage.ChangelogStreamManager
+import org.apache.samza.util.ScalaJavaUtil.JavaOptionals
 
 import scala.collection.mutable.ListBuffer
 
@@ -158,14 +159,12 @@ class CheckpointTool(newOffsets: TaskNameToCheckpointMap, 
coordinatorStreamStore
     combinedConfigMap.putAll(userDefinedConfig)
     val combinedConfig: Config = new MapConfig(combinedConfigMap)
 
+    val taskConfig = new TaskConfig(combinedConfig)
     // Instantiate the checkpoint manager with coordinator stream 
configuration.
-    val checkpointManager: CheckpointManager = 
combinedConfig.getCheckpointManagerFactory() match {
-      case Some(className) =>
-        ReflectionUtil.getObj(classLoader, className, 
classOf[CheckpointManagerFactory])
-          .getCheckpointManager(combinedConfig, new MetricsRegistryMap)
-      case _ =>
-        throw new SamzaException("Configuration: task.checkpoint.factory is 
not defined.")
-    }
+    val checkpointManager: CheckpointManager =
+      JavaOptionals.toRichOptional(taskConfig.getCheckpointManager(new 
MetricsRegistryMap, classLoader))
+        .toOption
+        .getOrElse(throw new SamzaException("Configuration: 
task.checkpoint.factory is not defined."))
     try {
       // Find all the TaskNames that would be generated for this job config
       val changelogManager = new ChangelogStreamManager(new 
NamespaceAwareCoordinatorStreamStore(coordinatorStreamStore, 
SetChangelogMapping.TYPE))
diff --git 
a/samza-core/src/main/scala/org/apache/samza/config/RegExTopicGenerator.scala 
b/samza-core/src/main/scala/org/apache/samza/config/RegExTopicGenerator.scala
index 5af55c7..3dcdf05 100644
--- 
a/samza-core/src/main/scala/org/apache/samza/config/RegExTopicGenerator.scala
+++ 
b/samza-core/src/main/scala/org/apache/samza/config/RegExTopicGenerator.scala
@@ -21,9 +21,8 @@ package org.apache.samza.config
 
 import org.apache.samza.SamzaException
 import org.apache.samza.config.JobConfig.REGEX_RESOLVED_STREAMS
-import org.apache.samza.config.TaskConfig.Config2Task
-import org.apache.samza.system.{StreamMetadataCache, SystemAdmins, 
SystemStream}
-import org.apache.samza.util.{Logging, StreamUtil, SystemClock}
+import org.apache.samza.system.SystemStream
+import org.apache.samza.util.{Logging, StreamUtil}
 
 import scala.collection.JavaConverters._
 import scala.collection._
@@ -56,7 +55,8 @@ class RegExTopicGenerator extends ConfigRewriter with Logging 
{
       .getRegexResolvedSystem(rewriterName)
       .getOrElse(throw new SamzaException("No system defined for %s." format 
rewriterName))
     val topics = getTopicsFromSystemAdmin(rewriterName, config)
-    val existingInputStreams = config.getInputStreams
+    val taskConfig = new TaskConfig(config)
+    val existingInputStreams = 
JavaConverters.asScalaSetConverter(taskConfig.getInputStreams).asScala.toSet
     val newInputStreams = new mutable.HashSet[SystemStream]
     val keysAndValsToAdd = new mutable.HashMap[String, String]
 
diff --git a/samza-core/src/main/scala/org/apache/samza/config/TaskConfig.scala 
b/samza-core/src/main/scala/org/apache/samza/config/TaskConfig.scala
deleted file mode 100644
index a887355..0000000
--- a/samza-core/src/main/scala/org/apache/samza/config/TaskConfig.scala
+++ /dev/null
@@ -1,157 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.config
-
-import org.apache.samza.checkpoint.CheckpointManager
-import org.apache.samza.metrics.MetricsRegistry
-import org.apache.samza.system.SystemStream
-import org.apache.samza.util.{Logging, StreamUtil}
-
-object TaskConfig {
-  // task config constants
-  val INPUT_STREAMS = "task.inputs" // streaming.input-streams
-  val WINDOW_MS = "task.window.ms" // window period in milliseconds
-  val COMMIT_MS = "task.commit.ms" // commit period in milliseconds
-  val SHUTDOWN_MS = "task.shutdown.ms" // how long to wait for a clean shutdown
-  val TASK_CLASS = "task.class" // streaming.task-factory-class
-  val COMMAND_BUILDER = "task.command.class" // streaming.task-factory-class
-  val LIFECYCLE_LISTENERS = "task.lifecycle.listeners" // li-generator,foo
-  val LIFECYCLE_LISTENER = "task.lifecycle.listener.%s.class" // 
task.lifecycle.listener.li-generator.class
-  val CHECKPOINT_MANAGER_FACTORY = TaskConfigJava.CHECKPOINT_MANAGER_FACTORY 
// class name to use when sending offset checkpoints
-  val MESSAGE_CHOOSER_CLASS_NAME = "task.chooser.class"
-  val DROP_DESERIALIZATION_ERRORS = "task.drop.deserialization.errors" // 
define whether drop the messages or not when deserialization fails
-  val DROP_SERIALIZATION_ERRORS = "task.drop.serialization.errors" // define 
whether drop the messages or not when serialization fails
-  val DROP_PRODUCER_ERRORS = "task.drop.producer.errors" // whether to ignore 
producer errors and drop the messages that failed to send
-  val IGNORED_EXCEPTIONS = "task.ignored.exceptions" // exceptions to ignore 
in process and window
-  val GROUPER_FACTORY = "task.name.grouper.factory" // class name for task 
grouper
-  val MAX_CONCURRENCY = "task.max.concurrency" // max number of concurrent 
process for a AsyncStreamTask
-  val CALLBACK_TIMEOUT_MS = "task.callback.timeout.ms"  // timeout period for 
triggering a callback
-  val ASYNC_COMMIT = "task.async.commit" // to enable async commit in a 
AsyncStreamTask
-  val MAX_IDLE_MS = "task.max.idle.ms"  // maximum time to wait for a task 
worker to complete when there are no new messages to handle
-
-  val DEFAULT_WINDOW_MS: Long = -1L
-  val DEFAULT_COMMIT_MS = 60000L
-  val DEFAULT_CALLBACK_TIMEOUT_MS: Long = -1L
-  val DEFAULT_MAX_CONCURRENCY: Int = 1
-  val DEFAULT_MAX_IDLE_MS: Long = 10
-
-  /**
-   * Samza's container polls for more messages under two conditions. The first
-   * condition arises when there are simply no remaining buffered messages to
-   * process for any input SystemStreamPartition. The second condition arises
-   * when some input SystemStreamPartitions have empty buffers, but some do
-   * not. In the latter case, a polling interval is defined to determine how
-   * often to refresh the empty SystemStreamPartition buffers. By default,
-   * this interval is 50ms, which means that any empty SystemStreamPartition
-   * buffer will be refreshed at least every 50ms. A higher value here means
-   * that empty SystemStreamPartitions will be refreshed less often, which
-   * means more latency is introduced, but less CPU and network will be used.
-   * Decreasing this value means that empty SystemStreamPartitions are
-   * refreshed more frequently, thereby introducing less latency, but
-   * increasing CPU and network utilization.
-   */
-  val POLL_INTERVAL_MS = "task.poll.interval.ms"
-
-  implicit def Config2Task(config: Config) = new TaskConfig(config)
-}
-
-class TaskConfig(config: Config) extends ScalaMapConfig(config) with Logging {
-  val javaTaskConfig = new TaskConfigJava(config)
-
-  def getInputStreams = getOption(TaskConfig.INPUT_STREAMS) match {
-    case Some(streams) => if (streams.length > 0) {
-      streams.split(",").map(systemStreamNames => {
-        StreamUtil.getSystemStreamFromNames(systemStreamNames.trim)
-      }).toSet
-    } else {
-      Set[SystemStream]()
-    }
-    case _ => Set[SystemStream]()
-  }
-
-  def getWindowMs: Long = getOption(TaskConfig.WINDOW_MS) match {
-    case Some(ms) => ms.toLong
-    case _ => TaskConfig.DEFAULT_WINDOW_MS
-  }
-
-  def getCommitMs: Long = getOption(TaskConfig.COMMIT_MS) match {
-    case Some(ms) => ms.toLong
-    case _ => TaskConfig.DEFAULT_COMMIT_MS
-  }
-
-  def getShutdownMs: Option[Long] = getOption(TaskConfig.SHUTDOWN_MS) match {
-    case Some(ms) => Some(ms.toLong)
-    case _ => None
-  }
-
-  def getTaskClass = getOption(TaskConfig.TASK_CLASS)
-
-  def getCommandClass = getOption(TaskConfig.COMMAND_BUILDER)
-
-  def getCommandClass(defaultValue: String) = 
getOrElse(TaskConfig.COMMAND_BUILDER, defaultValue)
-
-  def getCheckpointManagerFactory() = 
Option(javaTaskConfig.getCheckpointManagerFactoryName)
-
-  def getMessageChooserClass = getOption(TaskConfig.MESSAGE_CHOOSER_CLASS_NAME)
-
-  def getDropDeserializationErrors = 
getBoolean(TaskConfig.DROP_DESERIALIZATION_ERRORS, false)
-
-  def getDropSerializationErrors = 
getBoolean(TaskConfig.DROP_SERIALIZATION_ERRORS, false)
-
-  def getDropProducerErrors = getBoolean(TaskConfig.DROP_PRODUCER_ERRORS, 
false)
-
-  def getPollIntervalMs = getOption(TaskConfig.POLL_INTERVAL_MS)
-
-  def getIgnoredExceptions = getOption(TaskConfig.IGNORED_EXCEPTIONS)
-
-  def getTaskNameGrouperFactory = {
-    getOption(TaskConfig.GROUPER_FACTORY) match {
-      case Some(grouperFactory) => grouperFactory
-      case _ =>
-        info("No %s configuration, using 
'org.apache.samza.container.grouper.task.GroupByContainerCountFactory'" format 
TaskConfig.GROUPER_FACTORY)
-        "org.apache.samza.container.grouper.task.GroupByContainerCountFactory"
-    }
-  }
-
-  def getMaxConcurrency: Int = getOption(TaskConfig.MAX_CONCURRENCY) match {
-    case Some(count) => count.toInt
-    case _ => TaskConfig.DEFAULT_MAX_CONCURRENCY
-  }
-
-  def getCallbackTimeoutMs: Long = getOption(TaskConfig.CALLBACK_TIMEOUT_MS) 
match {
-    case Some(ms) => ms.toLong
-    case _ => TaskConfig.DEFAULT_CALLBACK_TIMEOUT_MS
-  }
-
-  def getAsyncCommit: Boolean = getOption(TaskConfig.ASYNC_COMMIT) match {
-    case Some(asyncCommit) => asyncCommit.toBoolean
-    case _ => false
-  }
-
-  def isAutoCommitEnabled: Boolean = getOption(TaskConfig.COMMIT_MS) match {
-    case Some(commitMs) => commitMs.toInt > 0
-    case _ => TaskConfig.DEFAULT_COMMIT_MS > 0
-  }
-
-  def getMaxIdleMs: Long = getOption(TaskConfig.MAX_IDLE_MS) match {
-    case Some(ms) => ms.toLong
-    case _ => TaskConfig.DEFAULT_MAX_IDLE_MS
-  }
-}
diff --git 
a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala 
b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
index e1cd92a..9734cb5 100644
--- a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
+++ b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
@@ -21,7 +21,6 @@ package org.apache.samza.container
 
 import java.io.File
 import java.lang.management.ManagementFactory
-import java.lang.reflect.InvocationTargetException
 import java.net.{URL, UnknownHostException}
 import java.nio.file.Path
 import java.time.Duration
@@ -31,12 +30,11 @@ import java.util.concurrent.{ExecutorService, Executors, 
ScheduledExecutorServic
 
 import com.google.common.annotations.VisibleForTesting
 import com.google.common.util.concurrent.ThreadFactoryBuilder
-import org.apache.samza.checkpoint.{CheckpointListener, 
CheckpointManagerFactory, OffsetManager, OffsetManagerMetrics}
+import org.apache.samza.checkpoint.{CheckpointListener, OffsetManager, 
OffsetManagerMetrics}
 import org.apache.samza.config.JobConfig.Config2Job
 import org.apache.samza.config.MetricsConfig.Config2Metrics
 import org.apache.samza.config.SerializerConfig.Config2Serializer
 import org.apache.samza.config.StreamConfig.Config2Stream
-import org.apache.samza.config.TaskConfig.Config2Task
 import org.apache.samza.config._
 import org.apache.samza.container.disk.DiskSpaceMonitor.Listener
 import org.apache.samza.container.disk.{DiskQuotaPolicyFactory, 
DiskSpaceMonitor, NoThrottlingDiskQuotaPolicyFactory, 
PollingScanDiskSpaceMonitor}
@@ -50,7 +48,7 @@ import org.apache.samza.serializers.model.SamzaObjectMapper
 import org.apache.samza.startpoint.StartpointManager
 import org.apache.samza.storage._
 import org.apache.samza.system._
-import org.apache.samza.system.chooser.{DefaultChooser, MessageChooserFactory, 
RoundRobinChooserFactory}
+import org.apache.samza.system.chooser.{DefaultChooser, MessageChooserFactory}
 import org.apache.samza.table.TableManager
 import org.apache.samza.task._
 import org.apache.samza.util.ScalaJavaUtil.JavaOptionals
@@ -403,7 +401,8 @@ object SamzaContainer extends Logging {
 
     info("Setting up message chooser.")
 
-    val chooserFactoryClassName = 
config.getMessageChooserClass.getOrElse(classOf[RoundRobinChooserFactory].getName)
+    val taskConfig = new TaskConfig(config)
+    val chooserFactoryClassName = taskConfig.getMessageChooserClass
 
     val chooserFactory = ReflectionUtil.getObj(classLoader, 
chooserFactoryClassName, classOf[MessageChooserFactory])
 
@@ -424,11 +423,7 @@ object SamzaContainer extends Logging {
     }
     info("Got security manager: %s" format securityManager)
 
-    val checkpointManager = config.getCheckpointManagerFactory()
-      .filterNot(_.isEmpty)
-      .map(ReflectionUtil.getObj(classLoader, _, 
classOf[CheckpointManagerFactory])
-        .getCheckpointManager(config, samzaContainerMetrics.registry))
-      .orNull
+    val checkpointManager = 
taskConfig.getCheckpointManager(samzaContainerMetrics.registry, 
classLoader).orElse(null)
     info("Got checkpoint manager: %s" format checkpointManager)
 
     // create a map of consumers with callbacks to pass to the OffsetManager
@@ -439,13 +434,10 @@ object SamzaContainer extends Logging {
     val offsetManager = OffsetManager(inputStreamMetadata, config, 
checkpointManager, startpointManager, systemAdmins, checkpointListeners, 
offsetManagerMetrics)
     info("Got offset manager: %s" format offsetManager)
 
-    val dropDeserializationError = config.getDropDeserializationErrors
-    val dropSerializationError = config.getDropSerializationErrors
+    val dropDeserializationError = taskConfig.getDropDeserializationErrors
+    val dropSerializationError = taskConfig.getDropSerializationErrors
 
-    val pollIntervalMs = config
-      .getPollIntervalMs
-      .getOrElse(SystemConsumers.DEFAULT_POLL_INTERVAL_MS.toString)
-      .toInt
+    val pollIntervalMs = taskConfig.getPollIntervalMs
 
     val consumerMultiplexer = new SystemConsumers(
       chooser = chooser,
@@ -585,7 +577,7 @@ object SamzaContainer extends Logging {
           storageManager = storageManager,
           tableManager = tableManager,
           systemStreamPartitions = taskSSPs -- taskSideInputSSPs,
-          exceptionHandler = 
TaskInstanceExceptionHandler(taskInstanceMetrics.get(taskName).get, config),
+          exceptionHandler = 
TaskInstanceExceptionHandler(taskInstanceMetrics.get(taskName).get, taskConfig),
           jobModel = jobModel,
           streamMetadataCache = streamMetadataCache,
           timerExecutor = timerExecutor,
@@ -609,7 +601,7 @@ object SamzaContainer extends Logging {
       taskThreadPool,
       maxThrottlingDelayMs,
       samzaContainerMetrics,
-      config,
+      taskConfig,
       clock)
 
     val memoryStatisticsMonitor : SystemStatisticsMonitor = new 
StatisticsMonitorImpl()
@@ -713,7 +705,8 @@ class SamzaContainer(
   containerStorageManager: ContainerStorageManager,
   diagnosticsManager: Option[DiagnosticsManager] = Option.empty) extends 
Runnable with Logging {
 
-  val shutdownMs = 
config.getShutdownMs.getOrElse(TaskConfigJava.DEFAULT_TASK_SHUTDOWN_MS)
+  private val taskConfig = new TaskConfig(config)
+  val shutdownMs: Long = taskConfig.getShutdownMs
   var shutdownHookThread: Thread = null
   var jmxServer: JmxServer = null
 
diff --git 
a/samza-core/src/main/scala/org/apache/samza/container/TaskInstanceExceptionHandler.scala
 
b/samza-core/src/main/scala/org/apache/samza/container/TaskInstanceExceptionHandler.scala
index 99b729f..7216b4c 100644
--- 
a/samza-core/src/main/scala/org/apache/samza/container/TaskInstanceExceptionHandler.scala
+++ 
b/samza-core/src/main/scala/org/apache/samza/container/TaskInstanceExceptionHandler.scala
@@ -19,11 +19,11 @@
 
 package org.apache.samza.container
 
-import org.apache.samza.config.Config
-import org.apache.samza.config.TaskConfig.Config2Task
+import org.apache.samza.config.TaskConfig
 import org.apache.samza.metrics.Counter
 import org.apache.samza.metrics.MetricsHelper
 import org.apache.samza.util.Logging
+import org.apache.samza.util.ScalaJavaUtil.JavaOptionals
 
 /**
  * Handles exceptions thrown in a {@link TaskInstance}'s process or window
@@ -88,16 +88,15 @@ object TaskInstanceExceptionHandler {
    * Creates a new TaskInstanceExceptionHandler using the provided
    * configuration.
    *
-   * @param metrics The {@link TaskInstanceMetrics} used to track exception
-   *        counts.
-   * @param config The configuration to read the list of ignored exceptions
-   *        from.
+   * @param metrics The {@link TaskInstanceMetrics} used to track exception 
counts.
+   * @param taskConfig The configuration to read the list of ignored 
exceptions from.
    */
-  def apply(metrics: MetricsHelper, config: Config) =
+  def apply(metrics: MetricsHelper, taskConfig: TaskConfig) = {
     new TaskInstanceExceptionHandler(
       metrics = metrics,
-      ignoredExceptions = config.getIgnoredExceptions match {
+      ignoredExceptions = 
JavaOptionals.toRichOptional(taskConfig.getIgnoredExceptions).toOption match {
         case Some(exceptions) => exceptions.split(",").toSet
         case _ => Set[String]()
       })
+  }
 }
diff --git 
a/samza-core/src/main/scala/org/apache/samza/coordinator/JobModelManager.scala 
b/samza-core/src/main/scala/org/apache/samza/coordinator/JobModelManager.scala
index 06fdced..04a3ec2 100644
--- 
a/samza-core/src/main/scala/org/apache/samza/coordinator/JobModelManager.scala
+++ 
b/samza-core/src/main/scala/org/apache/samza/coordinator/JobModelManager.scala
@@ -25,7 +25,6 @@ import java.util.concurrent.atomic.AtomicReference
 import org.apache.samza.{Partition, SamzaException}
 import org.apache.samza.config._
 import org.apache.samza.config.JobConfig.Config2Job
-import org.apache.samza.config.TaskConfig.Config2Task
 import org.apache.samza.config.Config
 import org.apache.samza.container.grouper.stream.SSPGrouperProxy
 import 
org.apache.samza.container.grouper.stream.SystemStreamPartitionGrouperFactory
@@ -50,6 +49,7 @@ import org.apache.samza.runtime.LocationId
 import org.apache.samza.system._
 import org.apache.samza.util.{Logging, ReflectionUtil, Util}
 
+import scala.collection.JavaConverters
 import scala.collection.JavaConversions._
 import scala.collection.JavaConverters._
 
@@ -270,7 +270,6 @@ object JobModelManager extends Logging {
     */
   private def getInputStreamPartitions(config: Config, streamMetadataCache: 
StreamMetadataCache): Set[SystemStreamPartition] = {
 
-
     def invokeRegexTopicRewriter(config: Config): Config = {
       config.getConfigRewriters match {
         case Some(rewriters) => rewriters.split(",").
@@ -282,8 +281,11 @@ object JobModelManager extends Logging {
       }
     }
 
+    val configAfterRegexTopicRewrite = invokeRegexTopicRewriter(config)
+    val taskConfigAfterRegexTopicRewrite = new 
TaskConfig(configAfterRegexTopicRewrite)
     // Expand regex input, if a regex-rewriter is defined in config
-    val inputSystemStreams = invokeRegexTopicRewriter(config).getInputStreams
+    val inputSystemStreams =
+      
JavaConverters.asScalaSetConverter(taskConfigAfterRegexTopicRewrite.getInputStreams).asScala.toSet
 
     // Get the set of partitions for each SystemStream from the stream metadata
     streamMetadataCache
@@ -353,6 +355,7 @@ object JobModelManager extends Logging {
                    streamMetadataCache: StreamMetadataCache,
                    grouperMetadata: GrouperMetadata,
                    classLoader: ClassLoader): JobModel = {
+    val taskConfig = new TaskConfig(config)
     // Do grouping to fetch TaskName to SSP mapping
     val allSystemStreamPartitions = getMatchedInputStreamPartitions(config, 
streamMetadataCache, classLoader)
 
@@ -397,7 +400,7 @@ object JobModelManager extends Logging {
     // Here is where we should put in a pluggable option for the
     // SSPTaskNameGrouper for locality, load-balancing, etc.
     val containerGrouperFactory =
-      ReflectionUtil.getObj(classLoader, config.getTaskNameGrouperFactory, 
classOf[TaskNameGrouperFactory])
+      ReflectionUtil.getObj(classLoader, taskConfig.getTaskNameGrouperFactory, 
classOf[TaskNameGrouperFactory])
     val standbyTasksEnabled = new JobConfig(config).getStandbyTasksEnabled
     val standbyTaskReplicationFactor = new 
JobConfig(config).getStandbyTaskReplicationFactor
     val taskNameGrouperProxy = new 
TaskNameGrouperProxy(containerGrouperFactory.build(config), 
standbyTasksEnabled, standbyTaskReplicationFactor)
diff --git 
a/samza-core/src/main/scala/org/apache/samza/job/local/ProcessJobFactory.scala 
b/samza-core/src/main/scala/org/apache/samza/job/local/ProcessJobFactory.scala
index 72a0fff..665cd15 100644
--- 
a/samza-core/src/main/scala/org/apache/samza/job/local/ProcessJobFactory.scala
+++ 
b/samza-core/src/main/scala/org/apache/samza/job/local/ProcessJobFactory.scala
@@ -22,8 +22,7 @@ package org.apache.samza.job.local
 import java.util
 
 import org.apache.samza.SamzaException
-import org.apache.samza.config.TaskConfig._
-import org.apache.samza.config.{Config, JobConfig}
+import org.apache.samza.config.{Config, JobConfig, TaskConfig}
 import org.apache.samza.container.TaskName
 import org.apache.samza.coordinator.metadatastore.{CoordinatorStreamStore, 
NamespaceAwareCoordinatorStreamStore}
 import org.apache.samza.coordinator.stream.messages.SetChangelogMapping
@@ -88,8 +87,9 @@ class ProcessJobFactory extends StreamJobFactory with Logging 
{
     val fwkPath = JobConfig.getFwkPath(config) // see if split deployment is 
configured
     info("Process job. using fwkPath = " + fwkPath)
 
-    val commandBuilderClass = 
config.getCommandClass(classOf[ShellCommandBuilder].getName)
-    info("Using command builder class " + commandBuilderClass)
+    val taskConfig = new TaskConfig(config)
+    val commandBuilderClass = 
taskConfig.getCommandClass(classOf[ShellCommandBuilder].getName)
+    info("Using command builder class %s" format commandBuilderClass)
     val commandBuilder = ReflectionUtil.getObj(classLoader, 
commandBuilderClass, classOf[CommandBuilder])
 
     // JobCoordinator is stopped by ProcessJob when it exits
diff --git 
a/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala 
b/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala
index b698583..f9116fe 100644
--- 
a/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala
+++ 
b/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala
@@ -23,7 +23,7 @@ import org.apache.samza.application.ApplicationUtil
 import org.apache.samza.application.descriptors.ApplicationDescriptorUtil
 import org.apache.samza.config.JobConfig._
 import org.apache.samza.config.ShellCommandConfig._
-import org.apache.samza.config.{Config, JobConfig, TaskConfigJava}
+import org.apache.samza.config.{Config, JobConfig, TaskConfig}
 import org.apache.samza.container.{SamzaContainer, SamzaContainerListener, 
TaskName}
 import org.apache.samza.context.{ExternalContext, JobContextImpl}
 import org.apache.samza.coordinator.{JobModelManager, MetadataResourceUtil}
diff --git 
a/samza-core/src/main/scala/org/apache/samza/metrics/reporter/MetricsSnapshotReporterFactory.scala
 
b/samza-core/src/main/scala/org/apache/samza/metrics/reporter/MetricsSnapshotReporterFactory.scala
index 957cd6f..74ffcc8 100644
--- 
a/samza-core/src/main/scala/org/apache/samza/metrics/reporter/MetricsSnapshotReporterFactory.scala
+++ 
b/samza-core/src/main/scala/org/apache/samza/metrics/reporter/MetricsSnapshotReporterFactory.scala
@@ -21,12 +21,11 @@ package org.apache.samza.metrics.reporter
 
 import org.apache.samza.util.{Logging, StreamUtil, Util}
 import org.apache.samza.SamzaException
-import org.apache.samza.config.{ApplicationConfig, Config, SystemConfig}
+import org.apache.samza.config.{Config, SystemConfig}
 import org.apache.samza.config.JobConfig.Config2Job
 import org.apache.samza.config.MetricsConfig.Config2Metrics
 import org.apache.samza.config.StreamConfig.Config2Stream
 import org.apache.samza.config.SerializerConfig.Config2Serializer
-import org.apache.samza.config.TaskConfig.Config2Task
 import org.apache.samza.metrics.MetricsReporter
 import org.apache.samza.metrics.MetricsReporterFactory
 import org.apache.samza.metrics.MetricsRegistryMap
diff --git 
a/samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManager.java
 
b/samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManager.java
index fbaf23d..980b41f 100644
--- 
a/samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManager.java
+++ 
b/samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManager.java
@@ -250,7 +250,7 @@ public class ContainerStorageManager {
       sideInputSystemConsumers =
           new SystemConsumers(chooser, 
ScalaJavaUtil.toScalaMap(this.sideInputConsumers), systemAdmins, serdeManager,
               sideInputSystemConsumersMetrics, 
SystemConsumers.DEFAULT_NO_NEW_MESSAGES_TIMEOUT(), 
SystemConsumers.DEFAULT_DROP_SERIALIZATION_ERROR(),
-              SystemConsumers.DEFAULT_POLL_INTERVAL_MS(), 
ScalaJavaUtil.toScalaFunction(() -> System.nanoTime()));
+              TaskConfig.DEFAULT_POLL_INTERVAL_MS, 
ScalaJavaUtil.toScalaFunction(() -> System.nanoTime()));
     }
 
   }
@@ -661,6 +661,7 @@ public class ContainerStorageManager {
     getSideInputStorageManagers().forEach(sideInputStorageManager -> 
sideInputStorageManager.init());
 
     // start the checkpointing thread at the commit-ms frequency
+    TaskConfig taskConfig = new TaskConfig(config);
     sideInputsFlushFuture = sideInputsFlushExecutor.scheduleWithFixedDelay(new 
Runnable() {
       @Override
       public void run() {
@@ -671,7 +672,7 @@ public class ContainerStorageManager {
           sideInputException = Optional.of(e);
         }
       }
-    }, 0, new TaskConfig(config).getCommitMs(), TimeUnit.MILLISECONDS);
+    }, 0, taskConfig.getCommitMs(), TimeUnit.MILLISECONDS);
 
     // set the latch to the number of sideInput SSPs
     this.sideInputsCaughtUp = new 
CountDownLatch(this.sideInputStorageManagers.keySet().size());
diff --git 
a/samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala 
b/samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala
index 563a104..b41c245 100644
--- a/samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala
+++ b/samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala
@@ -28,14 +28,15 @@ import java.util.HashMap
 import java.util.HashSet
 import java.util.Queue
 import java.util.Set
+
 import scala.collection.JavaConverters._
 import org.apache.samza.serializers.SerdeManager
 import org.apache.samza.util.{Logging, TimerUtil}
 import org.apache.samza.system.chooser.MessageChooser
 import org.apache.samza.SamzaException
+import org.apache.samza.config.TaskConfig
 
 object SystemConsumers {
-  val DEFAULT_POLL_INTERVAL_MS = 50
   val DEFAULT_NO_NEW_MESSAGES_TIMEOUT = 10
   val DEFAULT_DROP_SERIALIZATION_ERROR = false
 }
@@ -105,7 +106,7 @@ class SystemConsumers (
    * with no remaining unprocessed messages, the SystemConsumers will poll for
    * it within 50ms of its availability in the stream system.</p>
    */
-  val pollIntervalMs: Int = SystemConsumers.DEFAULT_POLL_INTERVAL_MS,
+  val pollIntervalMs: Int = TaskConfig.DEFAULT_POLL_INTERVAL_MS,
 
   /**
    * Clock can be used to inject a custom clock when mocking this class in
diff --git 
a/samza-core/src/main/scala/org/apache/samza/system/chooser/DefaultChooser.scala
 
b/samza-core/src/main/scala/org/apache/samza/system/chooser/DefaultChooser.scala
index 9b77a8c..c5e102a 100644
--- 
a/samza-core/src/main/scala/org/apache/samza/system/chooser/DefaultChooser.scala
+++ 
b/samza-core/src/main/scala/org/apache/samza/system/chooser/DefaultChooser.scala
@@ -20,7 +20,7 @@
 package org.apache.samza.system.chooser
 
 import org.apache.samza.SamzaException
-import org.apache.samza.config.{Config, DefaultChooserConfig, TaskConfigJava}
+import org.apache.samza.config.{Config, DefaultChooserConfig, TaskConfig}
 import org.apache.samza.metrics.{MetricsRegistry, MetricsRegistryMap}
 import org.apache.samza.system._
 import org.apache.samza.util.Logging
@@ -40,7 +40,7 @@ object DefaultChooser extends Logging {
     debug("Got batch size: %s" format batchSize)
 
     // Normal streams default to priority 0.
-    val defaultPrioritizedStreams = new TaskConfigJava(config)
+    val defaultPrioritizedStreams = new TaskConfig(config)
       .getAllInputStreams.asScala
       .map((_, 0))
       .toMap
diff --git a/samza-core/src/main/scala/org/apache/samza/util/Util.scala 
b/samza-core/src/main/scala/org/apache/samza/util/Util.scala
index e31d82f..7ee96de 100644
--- a/samza-core/src/main/scala/org/apache/samza/util/Util.scala
+++ b/samza-core/src/main/scala/org/apache/samza/util/Util.scala
@@ -30,11 +30,11 @@ import java.net.InetAddress
 import java.net.NetworkInterface
 import java.util.Random
 
-
 import scala.collection.JavaConverters._
 
 
 object Util extends Logging {
+  private val FALLBACK_VERSION = "0.0.1"
   val Random = new Random
 
   /**
@@ -72,20 +72,29 @@ object Util extends Logging {
   def getSamzaVersion(): String = {
     Option(this.getClass.getPackage.getImplementationVersion)
       .getOrElse({
-        warn("Unable to find implementation samza version in jar's meta info. 
Defaulting to 0.0.1.")
-        "0.0.1"
+        warn("Unable to find implementation samza version in jar's meta info. 
Defaulting to %s" format FALLBACK_VERSION)
+        FALLBACK_VERSION
       })
   }
 
   def getTaskClassVersion(config: Config): String = {
     try {
-      val taskClass = Option(new ApplicationConfig(config).getAppClass())
-        .orElse(new TaskConfig(config).getTaskClass).get
-      Class.forName(taskClass).getPackage.getImplementationVersion
+      val appClass = Option(new ApplicationConfig(config).getAppClass)
+      if (appClass.isDefined) {
+        Class.forName(appClass.get).getPackage.getImplementationVersion
+      } else {
+        val taskClass = new TaskConfig(config).getTaskClass
+        if (taskClass.isPresent) {
+          Class.forName(taskClass.get()).getPackage.getImplementationVersion
+        } else {
+          warn("Unable to find app class or task class. Defaulting to %s" 
format FALLBACK_VERSION)
+          FALLBACK_VERSION
+        }
+      }
     } catch {
       case e: Exception => {
-        warn("Unable to find implementation version in jar's meta info. 
Defaulting to 0.0.1.")
-        "0.0.1"
+        warn("Unable to find implementation version in jar's meta info. 
Defaulting to %s" format FALLBACK_VERSION)
+        FALLBACK_VERSION
       }
     }
   }
diff --git 
a/samza-core/src/test/java/org/apache/samza/application/TestApplicationUtil.java
 
b/samza-core/src/test/java/org/apache/samza/application/TestApplicationUtil.java
index f620ece..1d0bd88 100644
--- 
a/samza-core/src/test/java/org/apache/samza/application/TestApplicationUtil.java
+++ 
b/samza-core/src/test/java/org/apache/samza/application/TestApplicationUtil.java
@@ -24,9 +24,9 @@ import org.apache.samza.config.ApplicationConfig;
 import org.apache.samza.config.Config;
 import org.apache.samza.config.ConfigException;
 import org.apache.samza.config.MapConfig;
-import org.apache.samza.config.TaskConfig;
 import org.apache.samza.application.descriptors.TaskApplicationDescriptor;
 import org.apache.samza.application.descriptors.TaskApplicationDescriptorImpl;
+import org.apache.samza.config.TaskConfig;
 import org.apache.samza.task.MockStreamTask;
 import org.junit.Test;
 
@@ -45,7 +45,7 @@ public class TestApplicationUtil {
     SamzaApplication app = ApplicationUtil.fromConfig(new 
MapConfig(configMap));
     assertTrue(app instanceof MockStreamApplication);
 
-    configMap.put(TaskConfig.TASK_CLASS(), MockStreamTask.class.getName());
+    configMap.put(TaskConfig.TASK_CLASS, MockStreamTask.class.getName());
     app = ApplicationUtil.fromConfig(new MapConfig(configMap));
     assertTrue(app instanceof MockStreamApplication);
   }
@@ -57,7 +57,7 @@ public class TestApplicationUtil {
     SamzaApplication app = ApplicationUtil.fromConfig(new 
MapConfig(configMap));
     assertTrue(app instanceof MockTaskApplication);
 
-    configMap.put(TaskConfig.TASK_CLASS(), MockStreamTask.class.getName());
+    configMap.put(TaskConfig.TASK_CLASS, MockStreamTask.class.getName());
     app = ApplicationUtil.fromConfig(new MapConfig(configMap));
     assertTrue(app instanceof MockTaskApplication);
   }
@@ -65,7 +65,7 @@ public class TestApplicationUtil {
   @Test
   public void testTaskClassOnly() {
     Map<String, String> configMap = new HashMap<>();
-    configMap.put(TaskConfig.TASK_CLASS(), MockStreamTask.class.getName());
+    configMap.put(TaskConfig.TASK_CLASS, MockStreamTask.class.getName());
     Config config = new MapConfig(configMap);
     SamzaApplication app = ApplicationUtil.fromConfig(config);
     assertTrue(app instanceof TaskApplication);
@@ -76,7 +76,7 @@ public class TestApplicationUtil {
   @Test(expected = ConfigException.class)
   public void testEmptyTaskClassOnly() {
     Map<String, String> configMap = new HashMap<>();
-    configMap.put(TaskConfig.TASK_CLASS(), "");
+    configMap.put(TaskConfig.TASK_CLASS, "");
     ApplicationUtil.fromConfig(new MapConfig(configMap));
   }
 
diff --git 
a/samza-core/src/test/java/org/apache/samza/config/TestTaskConfig.java 
b/samza-core/src/test/java/org/apache/samza/config/TestTaskConfig.java
new file mode 100644
index 0000000..be3e4d5
--- /dev/null
+++ b/samza-core/src/test/java/org/apache/samza/config/TestTaskConfig.java
@@ -0,0 +1,367 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.config;
+
+import static org.junit.Assert.*;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Optional;
+import java.util.Set;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+import org.apache.samza.Partition;
+import org.apache.samza.checkpoint.Checkpoint;
+import org.apache.samza.checkpoint.CheckpointManager;
+import org.apache.samza.checkpoint.CheckpointManagerFactory;
+import org.apache.samza.container.TaskName;
+import org.apache.samza.container.grouper.task.GroupByContainerCountFactory;
+import org.apache.samza.metrics.MetricsRegistry;
+import org.apache.samza.system.SystemStream;
+import org.apache.samza.system.SystemStreamPartition;
+import org.apache.samza.system.chooser.RoundRobinChooserFactory;
+import org.junit.Test;
+
+
+public class TestTaskConfig {
+  @Test
+  public void testGetInputStreams() {
+    Config config = new MapConfig(
+        ImmutableMap.of(TaskConfig.INPUT_STREAMS, "kafka.foo, kafka.bar, 
otherKafka.bar, otherKafka.foo.bar"));
+    Set<SystemStream> expected = ImmutableSet.of(
+        new SystemStream("kafka", "foo"),
+        new SystemStream("kafka", "bar"),
+        new SystemStream("otherKafka", "bar"),
+        new SystemStream("otherKafka", "foo.bar"));
+    assertEquals(expected, new TaskConfig(config).getAllInputStreams());
+
+    // empty string for value
+    MapConfig configEmptyInput = new 
MapConfig(ImmutableMap.of(TaskConfig.INPUT_STREAMS, ""));
+    assertTrue(new TaskConfig(configEmptyInput).getInputStreams().isEmpty());
+    // config not specified
+    assertTrue(new TaskConfig(new MapConfig()).getInputStreams().isEmpty());
+  }
+
+  @Test
+  public void testGetWindowMs() {
+    Config config = new MapConfig(ImmutableMap.of(TaskConfig.WINDOW_MS, "10"));
+    assertEquals(10, new TaskConfig(config).getWindowMs());
+
+    config = new MapConfig(ImmutableMap.of(TaskConfig.WINDOW_MS, "-1"));
+    assertEquals(-1, new TaskConfig(config).getWindowMs());
+
+    // config not specified
+    assertEquals(TaskConfig.DEFAULT_WINDOW_MS, new TaskConfig(new 
MapConfig()).getWindowMs());
+  }
+
+  @Test
+  public void testGetCommitMs() {
+    Config config = new MapConfig(ImmutableMap.of(TaskConfig.COMMIT_MS, "10"));
+    assertEquals(10, new TaskConfig(config).getCommitMs());
+
+    config = new MapConfig(ImmutableMap.of(TaskConfig.COMMIT_MS, "-1"));
+    assertEquals(-1, new TaskConfig(config).getCommitMs());
+
+    // config not specified
+    assertEquals(TaskConfig.DEFAULT_COMMIT_MS, new TaskConfig(new 
MapConfig()).getCommitMs());
+  }
+
+  @Test
+  public void testGetTaskClass() {
+    String taskClass = "some.task.class";
+    Config config = new MapConfig(ImmutableMap.of(TaskConfig.TASK_CLASS, 
taskClass));
+    assertEquals(Optional.of(taskClass), new 
TaskConfig(config).getTaskClass());
+
+    // config not specified
+    assertFalse(new TaskConfig(new MapConfig()).getTaskClass().isPresent());
+  }
+
+  @Test
+  public void testGetCommandClass() {
+    String commandClass = "some.command.class";
+    String defaultCommandClass = "default.command.class";
+    Config config = new MapConfig(ImmutableMap.of(TaskConfig.COMMAND_BUILDER, 
commandClass));
+    assertEquals(commandClass, new 
TaskConfig(config).getCommandClass(defaultCommandClass));
+
+    // config not specified
+    assertEquals(defaultCommandClass, new TaskConfig(new 
MapConfig()).getCommandClass(defaultCommandClass));
+  }
+
+  @Test
+  public void testGetMessageChooserClass() {
+    String messageChooserClassValue = "some.message.chooser.class";
+    Config config = new 
MapConfig(ImmutableMap.of(TaskConfig.MESSAGE_CHOOSER_CLASS_NAME, 
messageChooserClassValue));
+    assertEquals(messageChooserClassValue, new 
TaskConfig(config).getMessageChooserClass());
+
+    // config not specified
+    assertEquals(RoundRobinChooserFactory.class.getName(), new TaskConfig(new 
MapConfig()).getMessageChooserClass());
+  }
+
+  @Test
+  public void testGetDropDeserializationErrors() {
+    Config config = new 
MapConfig(ImmutableMap.of(TaskConfig.DROP_DESERIALIZATION_ERRORS, "true"));
+    assertTrue(new TaskConfig(config).getDropDeserializationErrors());
+
+    config = new 
MapConfig(ImmutableMap.of(TaskConfig.DROP_DESERIALIZATION_ERRORS, "false"));
+    assertFalse(new TaskConfig(config).getDropDeserializationErrors());
+
+    // config not specified
+    assertFalse(new TaskConfig(new 
MapConfig()).getDropDeserializationErrors());
+  }
+
+  @Test
+  public void testGetDropSerializationErrors() {
+    Config config = new 
MapConfig(ImmutableMap.of(TaskConfig.DROP_SERIALIZATION_ERRORS, "true"));
+    assertTrue(new TaskConfig(config).getDropSerializationErrors());
+
+    config = new 
MapConfig(ImmutableMap.of(TaskConfig.DROP_SERIALIZATION_ERRORS, "false"));
+    assertFalse(new TaskConfig(config).getDropSerializationErrors());
+
+    // config not specified
+    assertFalse(new TaskConfig(new MapConfig()).getDropSerializationErrors());
+  }
+
+  @Test
+  public void testGetDropProducerErrors() {
+    Config config = new 
MapConfig(ImmutableMap.of(TaskConfig.DROP_PRODUCER_ERRORS, "true"));
+    assertTrue(new TaskConfig(config).getDropProducerErrors());
+
+    config = new MapConfig(ImmutableMap.of(TaskConfig.DROP_PRODUCER_ERRORS, 
"false"));
+    assertFalse(new TaskConfig(config).getDropProducerErrors());
+
+    // config not specified
+    assertFalse(new TaskConfig(new MapConfig()).getDropProducerErrors());
+  }
+
+  @Test
+  public void testGetPollIntervalMs() {
+    Config config = new MapConfig(ImmutableMap.of(TaskConfig.POLL_INTERVAL_MS, 
"10"));
+    assertEquals(10, new TaskConfig(config).getPollIntervalMs());
+
+    // config not specified
+    assertEquals(TaskConfig.DEFAULT_POLL_INTERVAL_MS, new TaskConfig(new 
MapConfig()).getPollIntervalMs());
+  }
+
+  @Test
+  public void testGetIgnoredExceptions() {
+    String ignoredExceptionsValue = "exception0.class, exception1.class";
+    Config config = new 
MapConfig(ImmutableMap.of(TaskConfig.IGNORED_EXCEPTIONS, 
ignoredExceptionsValue));
+    assertEquals(Optional.of(ignoredExceptionsValue), new 
TaskConfig(config).getIgnoredExceptions());
+
+    // config not specified
+    assertFalse(new TaskConfig(new 
MapConfig()).getIgnoredExceptions().isPresent());
+  }
+
+  @Test
+  public void testGetTaskNameGrouperFactory() {
+    String taskNameGrouperFactoryValue = "task.name.grouper.factory.class";
+    Config config = new MapConfig(ImmutableMap.of(TaskConfig.GROUPER_FACTORY, 
taskNameGrouperFactoryValue));
+    assertEquals(taskNameGrouperFactoryValue, new 
TaskConfig(config).getTaskNameGrouperFactory());
+
+    // config not specified
+    assertEquals(GroupByContainerCountFactory.class.getName(),
+        new TaskConfig(new MapConfig()).getTaskNameGrouperFactory());
+  }
+
+  @Test
+  public void testGetMaxConcurrency() {
+    Config config = new MapConfig(ImmutableMap.of(TaskConfig.MAX_CONCURRENCY, 
"10"));
+    assertEquals(10, new TaskConfig(config).getMaxConcurrency());
+
+    // config not specified
+    assertEquals(TaskConfig.DEFAULT_MAX_CONCURRENCY, new TaskConfig(new 
MapConfig()).getMaxConcurrency());
+  }
+
+  @Test
+  public void testGetCallbackTimeoutMs() {
+    Config config = new 
MapConfig(ImmutableMap.of(TaskConfig.CALLBACK_TIMEOUT_MS, "10"));
+    assertEquals(10, new TaskConfig(config).getCallbackTimeoutMs());
+
+    config = new MapConfig(ImmutableMap.of(TaskConfig.CALLBACK_TIMEOUT_MS, 
"-1"));
+    assertEquals(-1, new TaskConfig(config).getCallbackTimeoutMs());
+
+    // config not specified
+    assertEquals(TaskConfig.DEFAULT_CALLBACK_TIMEOUT_MS, new TaskConfig(new 
MapConfig()).getCallbackTimeoutMs());
+  }
+
+  @Test
+  public void testGetAsyncCommit() {
+    Config config = new MapConfig(ImmutableMap.of(TaskConfig.ASYNC_COMMIT, 
"true"));
+    assertTrue(new TaskConfig(config).getAsyncCommit());
+
+    config = new MapConfig(ImmutableMap.of(TaskConfig.ASYNC_COMMIT, "false"));
+    assertFalse(new TaskConfig(config).getAsyncCommit());
+
+    // config not specified
+    assertFalse(new TaskConfig(new MapConfig()).getAsyncCommit());
+  }
+
+  @Test
+  public void testGetMaxIdleMs() {
+    Config config = new MapConfig(ImmutableMap.of(TaskConfig.MAX_IDLE_MS, 
"20"));
+    assertEquals(20, new TaskConfig(config).getMaxIdleMs());
+
+    // config not specified
+    assertEquals(TaskConfig.DEFAULT_MAX_IDLE_MS, new TaskConfig(new 
MapConfig()).getMaxIdleMs());
+  }
+
+  @Test
+  public void testGetCheckpointManager() {
+    Config config =
+        new MapConfig(ImmutableMap.of(TaskConfig.CHECKPOINT_MANAGER_FACTORY, 
MockCheckpointManagerFactory.class.getName()));
+    assertTrue(new TaskConfig(config).getCheckpointManager(null, 
getClass().getClassLoader())
+        .get() instanceof MockCheckpointManager);
+
+    Config configEmptyString = new 
MapConfig(ImmutableMap.of(TaskConfig.CHECKPOINT_MANAGER_FACTORY, ""));
+    assertFalse(new TaskConfig(configEmptyString).getCheckpointManager(null, 
getClass().getClassLoader()).isPresent());
+
+    assertFalse(new TaskConfig(new MapConfig()).getCheckpointManager(null, 
getClass().getClassLoader()).isPresent());
+  }
+
+  @Test
+  public void testGetBroadcastSystemStreamPartitions() {
+    // no entry for "task.broadcast.inputs"
+    assertEquals(Collections.emptySet(), new TaskConfig(new 
MapConfig()).getBroadcastSystemStreamPartitions());
+
+    HashMap<String, String> map = new HashMap<>();
+    map.put("task.broadcast.inputs", "kafka.foo#4, kafka.boo#5, 
kafka.z-o-o#[12-14], kafka.foo.bar#[3-4]");
+    Config config = new MapConfig(map);
+    TaskConfig taskConfig = new TaskConfig(config);
+    Set<SystemStreamPartition> systemStreamPartitionSet = 
taskConfig.getBroadcastSystemStreamPartitions();
+
+    HashSet<SystemStreamPartition> expected = new HashSet<>();
+    expected.add(new SystemStreamPartition("kafka", "foo", new Partition(4)));
+    expected.add(new SystemStreamPartition("kafka", "boo", new Partition(5)));
+    expected.add(new SystemStreamPartition("kafka", "z-o-o", new 
Partition(12)));
+    expected.add(new SystemStreamPartition("kafka", "z-o-o", new 
Partition(13)));
+    expected.add(new SystemStreamPartition("kafka", "z-o-o", new 
Partition(14)));
+    expected.add(new SystemStreamPartition("kafka", "foo.bar", new 
Partition(3)));
+    expected.add(new SystemStreamPartition("kafka", "foo.bar", new 
Partition(4)));
+    assertEquals(expected, systemStreamPartitionSet);
+
+    map.put("task.broadcast.inputs", "kafka.foo");
+    taskConfig = new TaskConfig(new MapConfig(map));
+    boolean catchCorrectException = false;
+    try {
+      taskConfig.getBroadcastSystemStreamPartitions();
+    } catch (IllegalArgumentException e) {
+      catchCorrectException = true;
+    }
+    assertTrue(catchCorrectException);
+
+    map.put("task.broadcast.inputs", 
"kafka.org.apache.events.WhitelistedIps#1-2");
+    taskConfig = new TaskConfig(new MapConfig(map));
+    boolean invalidFormatException = false;
+    try {
+      taskConfig.getBroadcastSystemStreamPartitions();
+    } catch (IllegalArgumentException e) {
+      invalidFormatException = true;
+    }
+    assertTrue(invalidFormatException);
+  }
+
+  @Test
+  public void testGetBroadcastSystemStreams() {
+    Config config = new 
MapConfig(ImmutableMap.of(TaskConfig.BROADCAST_INPUT_STREAMS,
+        "kafka.foo#4, kafka.bar#5, otherKafka.foo#4, otherKafka.foo.bar#5"));
+    Set<SystemStream> expected = ImmutableSet.of(
+        new SystemStream("kafka", "foo"),
+        new SystemStream("kafka", "bar"),
+        new SystemStream("otherKafka", "foo"),
+        new SystemStream("otherKafka", "foo.bar"));
+    assertEquals(expected, new TaskConfig(config).getBroadcastSystemStreams());
+    assertTrue(new TaskConfig(new 
MapConfig()).getBroadcastSystemStreams().isEmpty());
+  }
+
+  @Test
+  public void testGetAllInputStreams() {
+    Config config = new MapConfig(ImmutableMap.of(
+        TaskConfig.INPUT_STREAMS, "kafka.foo, otherKafka.bar",
+        TaskConfig.BROADCAST_INPUT_STREAMS, "kafka.bar#4, otherKafka.foo#5"));
+    Set<SystemStream> expected = ImmutableSet.of(
+        new SystemStream("kafka", "foo"),
+        new SystemStream("otherKafka", "bar"),
+        new SystemStream("kafka", "bar"),
+        new SystemStream("otherKafka", "foo"));
+    assertEquals(expected, new TaskConfig(config).getAllInputStreams());
+
+    Config configOnlyBroadcast = new MapConfig(ImmutableMap.of(
+        TaskConfig.BROADCAST_INPUT_STREAMS, "kafka.bar#4, otherKafka.foo#5"));
+    Set<SystemStream> expectedOnlyBroadcast = ImmutableSet.of(
+        new SystemStream("kafka", "bar"),
+        new SystemStream("otherKafka", "foo"));
+    assertEquals(expectedOnlyBroadcast, new 
TaskConfig(configOnlyBroadcast).getAllInputStreams());
+
+    Config configOnlyInputs = new 
MapConfig(ImmutableMap.of(TaskConfig.INPUT_STREAMS, "kafka.foo, 
otherKafka.bar"));
+    Set<SystemStream> expectedOnlyInputs = ImmutableSet.of(
+        new SystemStream("kafka", "foo"),
+        new SystemStream("otherKafka", "bar"));
+    assertEquals(expectedOnlyInputs, new 
TaskConfig(configOnlyInputs).getAllInputStreams());
+
+    assertTrue(new TaskConfig(new MapConfig()).getAllInputStreams().isEmpty());
+  }
+
+  @Test
+  public void testGetShutdownMs() {
+    Config config = new MapConfig(ImmutableMap.of(TaskConfig.TASK_SHUTDOWN_MS, 
"10"));
+    assertEquals(10, new TaskConfig(config).getShutdownMs());
+
+    // unable to parse value into number
+    config = new MapConfig(ImmutableMap.of(TaskConfig.TASK_SHUTDOWN_MS, "not a 
number"));
+    assertEquals(TaskConfig.DEFAULT_TASK_SHUTDOWN_MS, new 
TaskConfig(config).getShutdownMs());
+
+    // config not specified
+    assertEquals(TaskConfig.DEFAULT_TASK_SHUTDOWN_MS, new TaskConfig(new 
MapConfig()).getShutdownMs());
+  }
+
+  /**
+   * Used for testing classloading a {@link CheckpointManagerFactory}.
+   */
+  public static class MockCheckpointManagerFactory implements 
CheckpointManagerFactory {
+    @Override
+    public CheckpointManager getCheckpointManager(Config config, 
MetricsRegistry registry) {
+      return new MockCheckpointManager();
+    }
+  }
+
+  /**
+   * Placeholder class to be returned by {@link MockCheckpointManagerFactory}.
+   */
+  private static class MockCheckpointManager implements CheckpointManager {
+    @Override
+    public void start() { }
+
+    @Override
+    public void register(TaskName taskName) { }
+
+    @Override
+    public void writeCheckpoint(TaskName taskName, Checkpoint checkpoint) { }
+
+    @Override
+    public Checkpoint readLastCheckpoint(TaskName taskName) {
+      return null;
+    }
+
+    @Override
+    public void stop() { }
+  }
+}
diff --git 
a/samza-core/src/test/java/org/apache/samza/config/TestTaskConfigJava.java 
b/samza-core/src/test/java/org/apache/samza/config/TestTaskConfigJava.java
deleted file mode 100644
index baf2d4f..0000000
--- a/samza-core/src/test/java/org/apache/samza/config/TestTaskConfigJava.java
+++ /dev/null
@@ -1,93 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.config;
-
-import static org.junit.Assert.*;
-
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Set;
-
-import com.google.common.collect.ImmutableMap;
-import org.apache.samza.Partition;
-import org.apache.samza.system.SystemStreamPartition;
-import org.junit.Test;
-
-public class TestTaskConfigJava {
-
-  @Test
-  public void testGetBroadcastSystemStreamPartitions() {
-    HashMap<String, String> map = new HashMap<String, String>();
-    map.put("task.broadcast.inputs", "kafka.foo#4, kafka.boo#5, 
kafka.z-o-o#[12-14], kafka.foo.bar#[3-4]");
-    Config config = new MapConfig(map);
-
-    TaskConfigJava taskConfig = new TaskConfigJava(config);
-    Set<SystemStreamPartition> systemStreamPartitionSet = 
taskConfig.getBroadcastSystemStreamPartitions();
-
-    HashSet<SystemStreamPartition> expected = new 
HashSet<SystemStreamPartition>();
-    expected.add(new SystemStreamPartition("kafka", "foo", new Partition(4)));
-    expected.add(new SystemStreamPartition("kafka", "boo", new Partition(5)));
-    expected.add(new SystemStreamPartition("kafka", "z-o-o", new 
Partition(12)));
-    expected.add(new SystemStreamPartition("kafka", "z-o-o", new 
Partition(13)));
-    expected.add(new SystemStreamPartition("kafka", "z-o-o", new 
Partition(14)));
-    expected.add(new SystemStreamPartition("kafka", "foo.bar", new 
Partition(3)));
-    expected.add(new SystemStreamPartition("kafka", "foo.bar", new 
Partition(4)));
-    assertEquals(expected, systemStreamPartitionSet);
-
-    map.put("task.broadcast.inputs", "kafka.foo");
-    taskConfig = new TaskConfigJava(new MapConfig(map));
-    boolean catchCorrectException = false;
-    try {
-      taskConfig.getBroadcastSystemStreamPartitions();
-    } catch (IllegalArgumentException e) {
-      catchCorrectException = true;
-    }
-    assertTrue(catchCorrectException);
-
-    map.put("task.broadcast.inputs", 
"kafka.org.apache.events.WhitelistedIps#1-2");
-    taskConfig = new TaskConfigJava(new MapConfig(map));
-    boolean invalidFormatException = false;
-    try {
-      taskConfig.getBroadcastSystemStreamPartitions();
-    } catch (IllegalArgumentException e) {
-      invalidFormatException = true;
-    }
-    assertTrue(invalidFormatException);
-  }
-
-  @Test
-  public void testAutoCommitConfig() {
-    // positive values of commit.ms => autoCommit = true
-    Config config1 = new MapConfig(ImmutableMap.of("task.commit.ms", "1"));
-    assertTrue(new TaskConfig(config1).isAutoCommitEnabled());
-
-    // no value for commit.ms => autoCommit = true
-    Config config2 = new MapConfig(ImmutableMap.of());
-    assertTrue(new TaskConfig(config2).isAutoCommitEnabled());
-
-    // A zero value for commit.ms => autoCommit = false
-    Config config3 = new MapConfig(ImmutableMap.of("task.commit.ms", "0"));
-    assertFalse(new TaskConfig(config3).isAutoCommitEnabled());
-
-    // negative value for commit.ms => autoCommit = false
-    Config config4 = new MapConfig(ImmutableMap.of("task.commit.ms", "-1"));
-    assertFalse(new TaskConfig(config4).isAutoCommitEnabled());
-  }
-}
diff --git 
a/samza-core/src/test/java/org/apache/samza/execution/TestExecutionPlanner.java 
b/samza-core/src/test/java/org/apache/samza/execution/TestExecutionPlanner.java
index 4077079..9f7644b 100644
--- 
a/samza-core/src/test/java/org/apache/samza/execution/TestExecutionPlanner.java
+++ 
b/samza-core/src/test/java/org/apache/samza/execution/TestExecutionPlanner.java
@@ -725,7 +725,7 @@ public class TestExecutionPlanner {
     assertEquals(1, jobConfigs.size());
 
     // GCD of 8, 16, 1600 and 252 is 4
-    assertEquals("4", jobConfigs.get(0).get(TaskConfig.WINDOW_MS()));
+    assertEquals("4", jobConfigs.get(0).get(TaskConfig.WINDOW_MS));
   }
 
   @Test
@@ -741,7 +741,7 @@ public class TestExecutionPlanner {
     assertEquals(1, jobConfigs.size());
 
     // GCD of 8, 16, 1600 and 252 is 4
-    assertEquals("4", jobConfigs.get(0).get(TaskConfig.WINDOW_MS()));
+    assertEquals("4", jobConfigs.get(0).get(TaskConfig.WINDOW_MS));
   }
 
   @Test
@@ -755,13 +755,13 @@ public class TestExecutionPlanner {
     ExecutionPlan plan = planner.plan(graphSpec);
     List<JobConfig> jobConfigs = plan.getJobConfigs();
     assertEquals(1, jobConfigs.size());
-    assertFalse(jobConfigs.get(0).containsKey(TaskConfig.WINDOW_MS()));
+    assertFalse(jobConfigs.get(0).containsKey(TaskConfig.WINDOW_MS));
   }
 
   @Test
   public void testTriggerIntervalWhenWindowMsIsConfigured() {
     Map<String, String> map = new HashMap<>(config);
-    map.put(TaskConfig.WINDOW_MS(), "2000");
+    map.put(TaskConfig.WINDOW_MS, "2000");
     map.put(JobConfig.JOB_INTERMEDIATE_STREAM_PARTITIONS(), 
String.valueOf(DEFAULT_PARTITIONS));
     Config cfg = new MapConfig(map);
 
@@ -770,7 +770,7 @@ public class TestExecutionPlanner {
     ExecutionPlan plan = planner.plan(graphSpec);
     List<JobConfig> jobConfigs = plan.getJobConfigs();
     assertEquals(1, jobConfigs.size());
-    assertEquals("2000", jobConfigs.get(0).get(TaskConfig.WINDOW_MS()));
+    assertEquals("2000", jobConfigs.get(0).get(TaskConfig.WINDOW_MS));
   }
 
   @Test
diff --git 
a/samza-core/src/test/java/org/apache/samza/execution/TestJobNodeConfigurationGenerator.java
 
b/samza-core/src/test/java/org/apache/samza/execution/TestJobNodeConfigurationGenerator.java
index 0962a14..f60be4f 100644
--- 
a/samza-core/src/test/java/org/apache/samza/execution/TestJobNodeConfigurationGenerator.java
+++ 
b/samza-core/src/test/java/org/apache/samza/execution/TestJobNodeConfigurationGenerator.java
@@ -30,7 +30,6 @@ import org.apache.samza.config.JobConfig;
 import org.apache.samza.config.MapConfig;
 import org.apache.samza.config.SerializerConfig;
 import org.apache.samza.config.TaskConfig;
-import org.apache.samza.config.TaskConfigJava;
 import org.apache.samza.storage.SideInputsProcessor;
 import org.apache.samza.system.descriptors.GenericInputDescriptor;
 import org.apache.samza.table.descriptors.BaseTableDescriptor;
@@ -75,7 +74,7 @@ public class TestJobNodeConfigurationGenerator extends 
ExecutionPlannerTestBase
     Config expectedJobConfig = getExpectedJobConfig(mockConfig, 
mockJobNode.getInEdges());
     validateJobConfig(expectedJobConfig, jobConfig);
     // additional, check the computed window.ms for join
-    assertEquals("3600000", jobConfig.get(TaskConfig.WINDOW_MS()));
+    assertEquals("3600000", jobConfig.get(TaskConfig.WINDOW_MS));
     Map<String, Serde> deserializedSerdes = 
validateAndGetDeserializedSerdes(jobConfig, 5);
     validateStreamConfigures(jobConfig, deserializedSerdes);
     validateJoinStoreConfigures(jobConfig, deserializedSerdes);
@@ -276,10 +275,10 @@ public class TestJobNodeConfigurationGenerator extends 
ExecutionPlannerTestBase
       }
     }
     if (!inputs.isEmpty()) {
-      configMap.put(TaskConfig.INPUT_STREAMS(), Joiner.on(',').join(inputs));
+      configMap.put(TaskConfig.INPUT_STREAMS, Joiner.on(',').join(inputs));
     }
     if (!broadcasts.isEmpty()) {
-      configMap.put(TaskConfigJava.BROADCAST_INPUT_STREAMS, 
Joiner.on(',').join(broadcasts));
+      configMap.put(TaskConfig.BROADCAST_INPUT_STREAMS, 
Joiner.on(',').join(broadcasts));
     }
     return new MapConfig(configMap);
   }
@@ -299,8 +298,8 @@ public class TestJobNodeConfigurationGenerator extends 
ExecutionPlannerTestBase
     assertEquals(expectedConfig.get(JobConfig.JOB_NAME()), 
jobConfig.getName().get());
     assertEquals(expectedConfig.get(JobConfig.JOB_ID()), jobConfig.getJobId());
     assertEquals("testJobGraphJson", 
jobConfig.get(JobNodeConfigurationGenerator.CONFIG_INTERNAL_EXECUTION_PLAN));
-    assertEquals(expectedConfig.get(TaskConfig.INPUT_STREAMS()), 
jobConfig.get(TaskConfig.INPUT_STREAMS()));
-    assertEquals(expectedConfig.get(TaskConfigJava.BROADCAST_INPUT_STREAMS), 
jobConfig.get(TaskConfigJava.BROADCAST_INPUT_STREAMS));
+    assertEquals(expectedConfig.get(TaskConfig.INPUT_STREAMS), 
jobConfig.get(TaskConfig.INPUT_STREAMS));
+    assertEquals(expectedConfig.get(TaskConfig.BROADCAST_INPUT_STREAMS), 
jobConfig.get(TaskConfig.BROADCAST_INPUT_STREAMS));
   }
 
   private void validateStreamSerdeConfigure(String streamId, Config config, 
Map<String, Serde> deserializedSerdes) {
diff --git 
a/samza-core/src/test/scala/org/apache/samza/container/TestTaskInstanceExceptionHandler.scala
 
b/samza-core/src/test/scala/org/apache/samza/container/TestTaskInstanceExceptionHandler.scala
index ca06b2a..0829a82 100644
--- 
a/samza-core/src/test/scala/org/apache/samza/container/TestTaskInstanceExceptionHandler.scala
+++ 
b/samza-core/src/test/scala/org/apache/samza/container/TestTaskInstanceExceptionHandler.scala
@@ -121,7 +121,7 @@ class TestTaskInstanceExceptionHandler extends 
AssertionsForJUnit with MockitoSu
   }
 
   private def build(config: Config): TaskInstanceExceptionHandler = {
-    TaskInstanceExceptionHandler.apply(this.metrics, config)
+    TaskInstanceExceptionHandler.apply(this.metrics, new TaskConfig(config))
   }
 
   /**
diff --git 
a/samza-core/src/test/scala/org/apache/samza/coordinator/TestJobCoordinator.scala
 
b/samza-core/src/test/scala/org/apache/samza/coordinator/TestJobCoordinator.scala
index d498b24..8117a00 100644
--- 
a/samza-core/src/test/scala/org/apache/samza/coordinator/TestJobCoordinator.scala
+++ 
b/samza-core/src/test/scala/org/apache/samza/coordinator/TestJobCoordinator.scala
@@ -249,8 +249,8 @@ class TestJobCoordinator extends FlatSpec with 
PrivateMethodTester {
     val getMatchedInputStreamPartitions = 
PrivateMethod[immutable.Set[Any]]('getMatchedInputStreamPartitions)
 
     val allSSP = JobModelManager invokePrivate 
getInputStreamPartitions(config, streamMetadataCache)
-    val matchedSSP = JobModelManager invokePrivate 
getMatchedInputStreamPartitions(config, streamMetadataCache,
-      getClass.getClassLoader)
+    val matchedSSP =
+      JobModelManager invokePrivate getMatchedInputStreamPartitions(config, 
streamMetadataCache, getClass.getClassLoader)
     assertEquals(matchedSSP, allSSP)
   }
 
diff --git 
a/samza-core/src/test/scala/org/apache/samza/system/TestSystemConsumers.scala 
b/samza-core/src/test/scala/org/apache/samza/system/TestSystemConsumers.scala
index 037fbb3..d1fce3e 100644
--- 
a/samza-core/src/test/scala/org/apache/samza/system/TestSystemConsumers.scala
+++ 
b/samza-core/src/test/scala/org/apache/samza/system/TestSystemConsumers.scala
@@ -25,8 +25,8 @@ import java.util.Collections
 import org.junit.Assert._
 import org.junit.Test
 import org.apache.samza.Partition
+import org.apache.samza.config.TaskConfig
 import org.apache.samza.serializers._
-import org.apache.samza.startpoint._
 import org.apache.samza.system.chooser.MessageChooser
 import org.apache.samza.system.chooser.DefaultChooser
 import org.apache.samza.system.chooser.MockMessageChooser
@@ -51,7 +51,7 @@ class TestSystemConsumers {
                                         new SerdeManager, new 
SystemConsumersMetrics,
                                         
SystemConsumers.DEFAULT_NO_NEW_MESSAGES_TIMEOUT,
                                         
SystemConsumers.DEFAULT_DROP_SERIALIZATION_ERROR,
-                                        
SystemConsumers.DEFAULT_POLL_INTERVAL_MS, clock = () => now)
+                                        TaskConfig.DEFAULT_POLL_INTERVAL_MS, 
clock = () => now)
 
     consumers.register(systemStreamPartition0, "0")
     consumers.register(systemStreamPartition1, "1234")
@@ -75,7 +75,7 @@ class TestSystemConsumers {
 
     // Advance the clock to trigger a new poll even though there are still
     // messages.
-    now = SystemConsumers.DEFAULT_POLL_INTERVAL_MS
+    now = TaskConfig.DEFAULT_POLL_INTERVAL_MS
 
     assertEquals(envelope, consumers.choose())
 
@@ -116,7 +116,7 @@ class TestSystemConsumers {
                                         new SerdeManager, new 
SystemConsumersMetrics,
                                         
SystemConsumers.DEFAULT_NO_NEW_MESSAGES_TIMEOUT,
                                         
SystemConsumers.DEFAULT_DROP_SERIALIZATION_ERROR,
-                                        
SystemConsumers.DEFAULT_POLL_INTERVAL_MS, clock = () => now)
+                                        TaskConfig.DEFAULT_POLL_INTERVAL_MS, 
clock = () => now)
 
     consumers.register(systemStreamPartition, "0")
     consumers.start
@@ -145,7 +145,7 @@ class TestSystemConsumers {
     assertNull(consumers.choose())
 
     // Increase clock interval.
-    now = SystemConsumers.DEFAULT_POLL_INTERVAL_MS
+    now = TaskConfig.DEFAULT_POLL_INTERVAL_MS
 
     // We get two messages now.
     assertEquals(envelope, consumers.choose())
@@ -316,7 +316,7 @@ class TestSystemConsumers {
       systemAdmins, new SerdeManager, new SystemConsumersMetrics,
       SystemConsumers.DEFAULT_NO_NEW_MESSAGES_TIMEOUT,
       SystemConsumers.DEFAULT_DROP_SERIALIZATION_ERROR,
-      SystemConsumers.DEFAULT_POLL_INTERVAL_MS, clock = () => 0)
+      TaskConfig.DEFAULT_POLL_INTERVAL_MS, clock = () => 0)
 
     consumers.register(systemStreamPartition1, "0")
     consumers.register(systemStreamPartition2, "0")
@@ -369,7 +369,7 @@ class TestSystemConsumers {
       systemAdmins, new SerdeManager, new SystemConsumersMetrics,
       SystemConsumers.DEFAULT_NO_NEW_MESSAGES_TIMEOUT,
       SystemConsumers.DEFAULT_DROP_SERIALIZATION_ERROR,
-      SystemConsumers.DEFAULT_POLL_INTERVAL_MS, clock = () => 0)
+      TaskConfig.DEFAULT_POLL_INTERVAL_MS, clock = () => 0)
 
     consumers.register(systemStreamPartition1, "0")
   }
diff --git 
a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala
 
b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala
index 7d093c3..deb8056 100644
--- 
a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala
+++ 
b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala
@@ -25,7 +25,6 @@ import com.google.common.annotations.VisibleForTesting
 import org.apache.kafka.clients.producer.KafkaProducer
 import org.apache.samza.config.ApplicationConfig.ApplicationMode
 import org.apache.samza.config.KafkaConfig.Config2Kafka
-import org.apache.samza.config.TaskConfig.Config2Task
 import org.apache.samza.config._
 import org.apache.samza.metrics.MetricsRegistry
 import org.apache.samza.system.{SystemAdmin, SystemConsumer, SystemFactory, 
SystemProducer}
@@ -80,12 +79,13 @@ class KafkaSystemFactory extends SystemFactory with Logging 
{
     // for us.
     info("Creating kafka producer for system %s, producerClientId %s" 
format(systemName, clientId))
 
+    val taskConfig = new TaskConfig(config)
     new KafkaSystemProducer(
       systemName,
       new ExponentialSleepStrategy(initialDelayMs = 
producerConfig.reconnectIntervalMs),
       getProducer,
       metrics,
-      dropProducerExceptions = config.getDropProducerErrors)
+      dropProducerExceptions = taskConfig.getDropProducerErrors)
   }
 
   def getAdmin(systemName: String, config: Config): SystemAdmin = {
diff --git 
a/samza-log4j/src/main/java/org/apache/samza/logging/log4j/StreamAppender.java 
b/samza-log4j/src/main/java/org/apache/samza/logging/log4j/StreamAppender.java
index d48136c..61b7475 100644
--- 
a/samza-log4j/src/main/java/org/apache/samza/logging/log4j/StreamAppender.java
+++ 
b/samza-log4j/src/main/java/org/apache/samza/logging/log4j/StreamAppender.java
@@ -279,7 +279,7 @@ public class StreamAppender extends AppenderSkeleton {
       throw new SamzaException("can not read the config", e);
     }
     // Make system producer drop producer errors for StreamAppender
-    config = new MapConfig(config, 
ImmutableMap.of(TaskConfig.DROP_PRODUCER_ERRORS(), "true"));
+    config = new MapConfig(config, 
ImmutableMap.of(TaskConfig.DROP_PRODUCER_ERRORS, "true"));
 
     return config;
   }
diff --git 
a/samza-log4j2/src/main/java/org/apache/samza/logging/log4j2/StreamAppender.java
 
b/samza-log4j2/src/main/java/org/apache/samza/logging/log4j2/StreamAppender.java
index a719317..b47ddab 100644
--- 
a/samza-log4j2/src/main/java/org/apache/samza/logging/log4j2/StreamAppender.java
+++ 
b/samza-log4j2/src/main/java/org/apache/samza/logging/log4j2/StreamAppender.java
@@ -300,7 +300,7 @@ public class StreamAppender extends AbstractAppender {
       throw new SamzaException("can not read the config", e);
     }
     // Make system producer drop producer errors for StreamAppender
-    config = new MapConfig(config, 
ImmutableMap.of(TaskConfig.DROP_PRODUCER_ERRORS(), "true"));
+    config = new MapConfig(config, 
ImmutableMap.of(TaskConfig.DROP_PRODUCER_ERRORS, "true"));
 
     return config;
   }
diff --git 
a/samza-sql-shell/src/main/java/org/apache/samza/sql/client/impl/SamzaExecutor.java
 
b/samza-sql-shell/src/main/java/org/apache/samza/sql/client/impl/SamzaExecutor.java
index 0434815..181f0f7 100755
--- 
a/samza-sql-shell/src/main/java/org/apache/samza/sql/client/impl/SamzaExecutor.java
+++ 
b/samza-sql-shell/src/main/java/org/apache/samza/sql/client/impl/SamzaExecutor.java
@@ -317,7 +317,7 @@ public class SamzaExecutor implements SqlExecutor {
     staticConfigs.put(JobConfig.JOB_NAME(), "sql-job-" + execId);
     staticConfigs.put(JobConfig.PROCESSOR_ID(), String.valueOf(execId));
     staticConfigs.put(JobCoordinatorConfig.JOB_COORDINATOR_FACTORY, 
PassthroughJobCoordinatorFactory.class.getName());
-    staticConfigs.put(TaskConfig.GROUPER_FACTORY(), 
SingleContainerGrouperFactory.class.getName());
+    staticConfigs.put(TaskConfig.GROUPER_FACTORY, 
SingleContainerGrouperFactory.class.getName());
 
     staticConfigs.put(SamzaSqlApplicationConfig.CFG_IO_RESOLVER, "config");
     String configIOResolverDomain =
diff --git 
a/samza-sql/src/test/java/org/apache/samza/sql/util/SamzaSqlTestConfig.java 
b/samza-sql/src/test/java/org/apache/samza/sql/util/SamzaSqlTestConfig.java
index 627dc65..364d0a9 100644
--- a/samza-sql/src/test/java/org/apache/samza/sql/util/SamzaSqlTestConfig.java
+++ b/samza-sql/src/test/java/org/apache/samza/sql/util/SamzaSqlTestConfig.java
@@ -82,7 +82,7 @@ public class SamzaSqlTestConfig {
     staticConfigs.put(JobConfig.JOB_NAME(), "sql-job");
     staticConfigs.put(JobConfig.PROCESSOR_ID(), "1");
     staticConfigs.put(JobCoordinatorConfig.JOB_COORDINATOR_FACTORY, 
PassthroughJobCoordinatorFactory.class.getName());
-    staticConfigs.put(TaskConfig.GROUPER_FACTORY(), 
SingleContainerGrouperFactory.class.getName());
+    staticConfigs.put(TaskConfig.GROUPER_FACTORY, 
SingleContainerGrouperFactory.class.getName());
 
     staticConfigs.put(SamzaSqlApplicationConfig.CFG_IO_RESOLVER, "config");
     String configIOResolverDomain =
diff --git 
a/samza-test/src/main/java/org/apache/samza/test/framework/TestRunner.java 
b/samza-test/src/main/java/org/apache/samza/test/framework/TestRunner.java
index 6737488..2f2e74e 100644
--- a/samza-test/src/main/java/org/apache/samza/test/framework/TestRunner.java
+++ b/samza-test/src/main/java/org/apache/samza/test/framework/TestRunner.java
@@ -110,7 +110,7 @@ public class TestRunner {
     configs.put(JobConfig.PROCESSOR_ID(), "1");
     configs.put(JobCoordinatorConfig.JOB_COORDINATOR_FACTORY, 
PassthroughJobCoordinatorFactory.class.getName());
     configs.put(JobConfig.STARTPOINT_METADATA_STORE_FACTORY(), 
InMemoryMetadataStoreFactory.class.getCanonicalName());
-    configs.put(TaskConfig.GROUPER_FACTORY(), 
SingleContainerGrouperFactory.class.getName());
+    configs.put(TaskConfig.GROUPER_FACTORY, 
SingleContainerGrouperFactory.class.getName());
     // Changing the base directory for non-changelog stores used by Samza 
application to separate the
     // on-disk store locations for concurrently executing tests
     configs.put(JobConfig.JOB_NON_LOGGED_STORE_BASE_DIR(),
@@ -131,7 +131,7 @@ public class TestRunner {
   private TestRunner(Class taskClass) {
     this();
     Preconditions.checkNotNull(taskClass);
-    configs.put(TaskConfig.TASK_CLASS(), taskClass.getName());
+    configs.put(TaskConfig.TASK_CLASS, taskClass.getName());
     this.app = new LegacyTaskApplication(taskClass.getName());
   }
 
@@ -373,11 +373,11 @@ public class TestRunner {
       Map<Integer, Iterable<StreamMessageType>> partitionData) {
     String systemName = descriptor.getSystemName();
     String streamName = (String) 
descriptor.getPhysicalName().orElse(descriptor.getStreamId());
-    if (configs.containsKey(TaskConfig.INPUT_STREAMS())) {
-      configs.put(TaskConfig.INPUT_STREAMS(),
-          configs.get(TaskConfig.INPUT_STREAMS()).concat("," + systemName + 
"." + streamName));
+    if (configs.containsKey(TaskConfig.INPUT_STREAMS)) {
+      configs.put(TaskConfig.INPUT_STREAMS,
+          configs.get(TaskConfig.INPUT_STREAMS).concat("," + systemName + "." 
+ streamName));
     } else {
-      configs.put(TaskConfig.INPUT_STREAMS(), systemName + "." + streamName);
+      configs.put(TaskConfig.INPUT_STREAMS, systemName + "." + streamName);
     }
     InMemorySystemDescriptor imsd = (InMemorySystemDescriptor) 
descriptor.getSystemDescriptor();
     imsd.withInMemoryScope(this.inMemoryScope);
diff --git 
a/samza-test/src/test/java/org/apache/samza/processor/TestZkStreamProcessorBase.java
 
b/samza-test/src/test/java/org/apache/samza/processor/TestZkStreamProcessorBase.java
index 777d219..81ac963 100644
--- 
a/samza-test/src/test/java/org/apache/samza/processor/TestZkStreamProcessorBase.java
+++ 
b/samza-test/src/test/java/org/apache/samza/processor/TestZkStreamProcessorBase.java
@@ -40,7 +40,7 @@ import org.apache.samza.config.Config;
 import org.apache.samza.config.JobConfig;
 import org.apache.samza.config.JobCoordinatorConfig;
 import org.apache.samza.config.MapConfig;
-import org.apache.samza.config.TaskConfigJava;
+import org.apache.samza.config.TaskConfig;
 import org.apache.samza.config.ZkConfig;
 import org.apache.samza.context.Context;
 import org.apache.samza.coordinator.JobCoordinator;
@@ -197,7 +197,7 @@ public class TestZkStreamProcessorBase extends 
IntegrationTestHarness {
     configs.put("task.name.grouper.factory", 
"org.apache.samza.container.grouper.task.GroupByContainerIdsFactory");
 
     configs.put(JobCoordinatorConfig.JOB_COORDINATOR_FACTORY, 
"org.apache.samza.zk.ZkJobCoordinatorFactory");
-    configs.put(TaskConfigJava.TASK_SHUTDOWN_MS, TASK_SHUTDOWN_MS);
+    configs.put(TaskConfig.TASK_SHUTDOWN_MS, TASK_SHUTDOWN_MS);
     configs.put(JobConfig.JOB_DEBOUNCE_TIME_MS(), JOB_DEBOUNCE_TIME_MS);
     configs.put(ZkConfig.ZK_CONSENSUS_TIMEOUT_MS, BARRIER_TIMEOUT_MS);
     configs.put(ZkConfig.ZK_SESSION_TIMEOUT_MS, ZK_SESSION_TIMEOUT_MS);
diff --git 
a/samza-test/src/test/java/org/apache/samza/test/controlmessages/EndOfStreamIntegrationTest.java
 
b/samza-test/src/test/java/org/apache/samza/test/controlmessages/EndOfStreamIntegrationTest.java
index b571ae0..054556e 100644
--- 
a/samza-test/src/test/java/org/apache/samza/test/controlmessages/EndOfStreamIntegrationTest.java
+++ 
b/samza-test/src/test/java/org/apache/samza/test/controlmessages/EndOfStreamIntegrationTest.java
@@ -81,7 +81,7 @@ public class EndOfStreamIntegrationTest extends 
IntegrationTestHarness {
     configs.put(JobConfig.JOB_NAME(), "test-eos-job");
     configs.put(JobConfig.PROCESSOR_ID(), "1");
     configs.put(JobCoordinatorConfig.JOB_COORDINATOR_FACTORY, 
PassthroughJobCoordinatorFactory.class.getName());
-    configs.put(TaskConfig.GROUPER_FACTORY(), 
SingleContainerGrouperFactory.class.getName());
+    configs.put(TaskConfig.GROUPER_FACTORY, 
SingleContainerGrouperFactory.class.getName());
 
     configs.put("systems.kafka.samza.factory", 
"org.apache.samza.system.kafka.KafkaSystemFactory");
     configs.put("systems.kafka.producer.bootstrap.servers", bootstrapUrl());
diff --git 
a/samza-test/src/test/java/org/apache/samza/test/controlmessages/WatermarkIntegrationTest.java
 
b/samza-test/src/test/java/org/apache/samza/test/controlmessages/WatermarkIntegrationTest.java
index 63dc254..7431bd7 100644
--- 
a/samza-test/src/test/java/org/apache/samza/test/controlmessages/WatermarkIntegrationTest.java
+++ 
b/samza-test/src/test/java/org/apache/samza/test/controlmessages/WatermarkIntegrationTest.java
@@ -19,6 +19,7 @@
 
 package org.apache.samza.test.controlmessages;
 
+import org.apache.samza.config.TaskConfig;
 import scala.collection.JavaConverters;
 
 import java.util.ArrayList;
@@ -34,7 +35,6 @@ import org.apache.samza.config.Config;
 import org.apache.samza.config.JobConfig;
 import org.apache.samza.config.JobCoordinatorConfig;
 import org.apache.samza.config.MapConfig;
-import org.apache.samza.config.TaskConfig;
 import org.apache.samza.container.SamzaContainer;
 import org.apache.samza.container.TaskInstance;
 import org.apache.samza.container.TaskName;
@@ -131,7 +131,7 @@ public class WatermarkIntegrationTest extends 
IntegrationTestHarness {
     configs.put(JobConfig.JOB_NAME(), "test-watermark-job");
     configs.put(JobConfig.PROCESSOR_ID(), "1");
     configs.put(JobCoordinatorConfig.JOB_COORDINATOR_FACTORY, 
PassthroughJobCoordinatorFactory.class.getName());
-    configs.put(TaskConfig.GROUPER_FACTORY(), 
SingleContainerGrouperFactory.class.getName());
+    configs.put(TaskConfig.GROUPER_FACTORY, 
SingleContainerGrouperFactory.class.getName());
 
     configs.put("systems.kafka.samza.factory", 
"org.apache.samza.system.kafka.KafkaSystemFactory");
     configs.put("systems.kafka.producer.bootstrap.servers", bootstrapUrl());
diff --git 
a/samza-test/src/test/java/org/apache/samza/test/framework/FaultInjectionTest.java
 
b/samza-test/src/test/java/org/apache/samza/test/framework/FaultInjectionTest.java
index 83b2b63..268f07c 100644
--- 
a/samza-test/src/test/java/org/apache/samza/test/framework/FaultInjectionTest.java
+++ 
b/samza-test/src/test/java/org/apache/samza/test/framework/FaultInjectionTest.java
@@ -55,9 +55,9 @@ public class FaultInjectionTest extends 
StreamApplicationIntegrationTestHarness
     Map<String, String> configs = new HashMap<>();
     configs.put(JobCoordinatorConfig.JOB_COORDINATOR_FACTORY, 
"org.apache.samza.zk.ZkJobCoordinatorFactory");
     configs.put(JobConfig.PROCESSOR_ID(), "0");
-    configs.put(TaskConfig.GROUPER_FACTORY(), 
"org.apache.samza.container.grouper.task.GroupByContainerIdsFactory");
+    configs.put(TaskConfig.GROUPER_FACTORY, 
"org.apache.samza.container.grouper.task.GroupByContainerIdsFactory");
     configs.put(FaultInjectionStreamApp.INPUT_TOPIC_NAME_PROP, "page-views");
-    configs.put(TaskConfig.INPUT_STREAMS(), "kafka.page-views");
+    configs.put(TaskConfig.INPUT_STREAMS, "kafka.page-views");
     configs.put(ZkConfig.ZK_CONNECT, zkConnect());
     configs.put(JobConfig.JOB_DEBOUNCE_TIME_MS(), "5000");
 
diff --git 
a/samza-test/src/test/java/org/apache/samza/test/operator/TestAsyncFlatMap.java 
b/samza-test/src/test/java/org/apache/samza/test/operator/TestAsyncFlatMap.java
index 4afff92..4f5d510 100644
--- 
a/samza-test/src/test/java/org/apache/samza/test/operator/TestAsyncFlatMap.java
+++ 
b/samza-test/src/test/java/org/apache/samza/test/operator/TestAsyncFlatMap.java
@@ -82,7 +82,7 @@ public class TestAsyncFlatMap extends IntegrationTestHarness {
   @Test(expected = SamzaException.class)
   public void testProcessingFutureCompletesAfterTaskTimeout() {
     Map<String, String> configs = new HashMap<>();
-    configs.put(TaskConfig.CALLBACK_TIMEOUT_MS(), "100");
+    configs.put(TaskConfig.CALLBACK_TIMEOUT_MS, "100");
     configs.put(PROCESS_JITTER, "200");
 
     runTest(PAGE_VIEWS, configs);
diff --git 
a/samza-test/src/test/java/org/apache/samza/test/operator/TestRepartitionJoinWindowApp.java
 
b/samza-test/src/test/java/org/apache/samza/test/operator/TestRepartitionJoinWindowApp.java
index fc5e75a..cb81b89 100644
--- 
a/samza-test/src/test/java/org/apache/samza/test/operator/TestRepartitionJoinWindowApp.java
+++ 
b/samza-test/src/test/java/org/apache/samza/test/operator/TestRepartitionJoinWindowApp.java
@@ -83,7 +83,7 @@ public class TestRepartitionJoinWindowApp extends 
StreamApplicationIntegrationTe
     Map<String, String> configs = new HashMap<>();
     configs.put(JobCoordinatorConfig.JOB_COORDINATOR_FACTORY, 
"org.apache.samza.standalone.PassthroughJobCoordinatorFactory");
     configs.put(JobConfig.PROCESSOR_ID(), "0");
-    configs.put(TaskConfig.GROUPER_FACTORY(), 
"org.apache.samza.container.grouper.task.GroupByContainerIdsFactory");
+    configs.put(TaskConfig.GROUPER_FACTORY, 
"org.apache.samza.container.grouper.task.GroupByContainerIdsFactory");
     configs.put("systems.kafka.samza.delete.committed.messages", "false");
     configs.put(RepartitionJoinWindowApp.INPUT_TOPIC_1_CONFIG_KEY, 
inputTopicName1);
     configs.put(RepartitionJoinWindowApp.INPUT_TOPIC_2_CONFIG_KEY, 
inputTopicName2);
@@ -112,7 +112,7 @@ public class TestRepartitionJoinWindowApp extends 
StreamApplicationIntegrationTe
     Map<String, String> configs = new HashMap<>();
     configs.put(JobCoordinatorConfig.JOB_COORDINATOR_FACTORY, 
"org.apache.samza.standalone.PassthroughJobCoordinatorFactory");
     configs.put(JobConfig.PROCESSOR_ID(), "0");
-    configs.put(TaskConfig.GROUPER_FACTORY(), 
"org.apache.samza.container.grouper.task.GroupByContainerIdsFactory");
+    configs.put(TaskConfig.GROUPER_FACTORY, 
"org.apache.samza.container.grouper.task.GroupByContainerIdsFactory");
     configs.put("systems.kafka.samza.delete.committed.messages", "true");
     configs.put(RepartitionJoinWindowApp.INPUT_TOPIC_1_CONFIG_KEY, 
inputTopicName1);
     configs.put(RepartitionJoinWindowApp.INPUT_TOPIC_2_CONFIG_KEY, 
inputTopicName2);
@@ -161,7 +161,7 @@ public class TestRepartitionJoinWindowApp extends 
StreamApplicationIntegrationTe
     Map<String, String> configs = new HashMap<>();
     configs.put(JobCoordinatorConfig.JOB_COORDINATOR_FACTORY, 
"org.apache.samza.standalone.PassthroughJobCoordinatorFactory");
     configs.put(JobConfig.PROCESSOR_ID(), "0");
-    configs.put(TaskConfig.GROUPER_FACTORY(), 
"org.apache.samza.container.grouper.task.GroupByContainerIdsFactory");
+    configs.put(TaskConfig.GROUPER_FACTORY, 
"org.apache.samza.container.grouper.task.GroupByContainerIdsFactory");
     configs.put(BroadcastAssertApp.INPUT_TOPIC_NAME_PROP, inputTopicName1);
 
     initializeTopics(inputTopicName1, inputTopicName2, outputTopicName);
diff --git 
a/samza-test/src/test/java/org/apache/samza/test/operator/TestRepartitionWindowApp.java
 
b/samza-test/src/test/java/org/apache/samza/test/operator/TestRepartitionWindowApp.java
index 2f08fed..82533a0 100644
--- 
a/samza-test/src/test/java/org/apache/samza/test/operator/TestRepartitionWindowApp.java
+++ 
b/samza-test/src/test/java/org/apache/samza/test/operator/TestRepartitionWindowApp.java
@@ -62,7 +62,7 @@ public class TestRepartitionWindowApp extends 
StreamApplicationIntegrationTestHa
     Map<String, String> configs = new HashMap<>();
     configs.put(JobCoordinatorConfig.JOB_COORDINATOR_FACTORY, 
"org.apache.samza.standalone.PassthroughJobCoordinatorFactory");
     configs.put(JobConfig.PROCESSOR_ID(), "0");
-    configs.put(TaskConfig.GROUPER_FACTORY(), 
"org.apache.samza.container.grouper.task.GroupByContainerIdsFactory");
+    configs.put(TaskConfig.GROUPER_FACTORY, 
"org.apache.samza.container.grouper.task.GroupByContainerIdsFactory");
 
     // run the application
     runApplication(new RepartitionWindowApp(), APP_NAME, configs);
diff --git 
a/samza-test/src/test/java/org/apache/samza/test/processor/TestZkLocalApplicationRunner.java
 
b/samza-test/src/test/java/org/apache/samza/test/processor/TestZkLocalApplicationRunner.java
index 0c43674..7ad0632 100644
--- 
a/samza-test/src/test/java/org/apache/samza/test/processor/TestZkLocalApplicationRunner.java
+++ 
b/samza-test/src/test/java/org/apache/samza/test/processor/TestZkLocalApplicationRunner.java
@@ -49,7 +49,6 @@ import org.apache.samza.config.JobConfig;
 import org.apache.samza.config.JobCoordinatorConfig;
 import org.apache.samza.config.MapConfig;
 import org.apache.samza.config.TaskConfig;
-import org.apache.samza.config.TaskConfigJava;
 import org.apache.samza.config.ZkConfig;
 import org.apache.samza.SamzaException;
 import org.apache.samza.container.TaskName;
@@ -197,12 +196,12 @@ public class TestZkLocalApplicationRunner extends 
IntegrationTestHarness {
     String coordinatorSystemName = "coordinatorSystem";
     Map<String, String> config = new HashMap<>();
     config.put(ZkConfig.ZK_CONSENSUS_TIMEOUT_MS, BARRIER_TIMEOUT_MS);
-    config.put(TaskConfig.INPUT_STREAMS(), 
Joiner.on(',').join(inputSystemStreams));
+    config.put(TaskConfig.INPUT_STREAMS, 
Joiner.on(',').join(inputSystemStreams));
     config.put(JobConfig.JOB_DEFAULT_SYSTEM(), 
TestZkLocalApplicationRunner.TEST_SYSTEM);
-    config.put(TaskConfig.IGNORED_EXCEPTIONS(), "*");
+    config.put(TaskConfig.IGNORED_EXCEPTIONS, "*");
     config.put(ZkConfig.ZK_CONNECT, zkConnect());
     config.put(JobConfig.SSP_GROUPER_FACTORY(), TEST_SSP_GROUPER_FACTORY);
-    config.put(TaskConfig.GROUPER_FACTORY(), TEST_TASK_GROUPER_FACTORY);
+    config.put(TaskConfig.GROUPER_FACTORY, TEST_TASK_GROUPER_FACTORY);
     config.put(JobCoordinatorConfig.JOB_COORDINATOR_FACTORY, 
TEST_JOB_COORDINATOR_FACTORY);
     config.put(ApplicationConfig.APP_NAME, appName);
     config.put(ApplicationConfig.APP_ID, appId);
@@ -210,8 +209,8 @@ public class TestZkLocalApplicationRunner extends 
IntegrationTestHarness {
     config.put(String.format("systems.%s.samza.factory", 
TestZkLocalApplicationRunner.TEST_SYSTEM), TEST_SYSTEM_FACTORY);
     config.put(JobConfig.JOB_NAME(), appName);
     config.put(JobConfig.JOB_ID(), appId);
-    config.put(TaskConfigJava.TASK_SHUTDOWN_MS, TASK_SHUTDOWN_MS);
-    config.put(TaskConfig.DROP_PRODUCER_ERRORS(), "true");
+    config.put(TaskConfig.TASK_SHUTDOWN_MS, TASK_SHUTDOWN_MS);
+    config.put(TaskConfig.DROP_PRODUCER_ERRORS, "true");
     config.put(JobConfig.JOB_DEBOUNCE_TIME_MS(), JOB_DEBOUNCE_TIME_MS);
     config.put(JobConfig.MONITOR_PARTITION_CHANGE_FREQUENCY_MS(), "1000");
     config.put(ClusterManagerConfig.HOST_AFFINITY_ENABLED, "true");
@@ -609,7 +608,7 @@ public class TestZkLocalApplicationRunner extends 
IntegrationTestHarness {
     publishKafkaEvents(inputKafkaTopic, 0, NUM_KAFKA_EVENTS, PROCESSOR_IDS[0]);
 
     Map<String, String> configMap = 
buildStreamApplicationConfigMap(ImmutableList.of(inputKafkaTopic), 
testStreamAppName, testStreamAppId);
-    configMap.put(TaskConfig.SHUTDOWN_MS(), "0");
+    configMap.put(TaskConfig.TASK_SHUTDOWN_MS, "0");
 
     configMap.put(JobConfig.PROCESSOR_ID(), PROCESSOR_IDS[0]);
     Config applicationConfig1 = new MapConfig(configMap);
@@ -642,7 +641,7 @@ public class TestZkLocalApplicationRunner extends 
IntegrationTestHarness {
     configMap.put(JobConfig.PROCESSOR_ID(), PROCESSOR_IDS[2]);
 
     // Reset the task shutdown ms for 3rd application to give it ample time to 
shutdown cleanly
-    configMap.put(TaskConfig.SHUTDOWN_MS(), TASK_SHUTDOWN_MS);
+    configMap.put(TaskConfig.TASK_SHUTDOWN_MS, TASK_SHUTDOWN_MS);
     Config applicationConfig3 = new MapConfig(configMap);
 
     CountDownLatch processedMessagesLatch3 = new CountDownLatch(1);
@@ -951,7 +950,7 @@ public class TestZkLocalApplicationRunner extends 
IntegrationTestHarness {
 
     TaskApplication taskApplication = new TestTaskApplication(TEST_SYSTEM, 
inputKafkaTopic, outputKafkaTopic, processedMessagesLatch1, shutdownLatch);
     MapConfig taskApplicationConfig = new 
MapConfig(ImmutableList.of(applicationConfig1,
-        ImmutableMap.of(TaskConfig.MAX_CONCURRENCY(), "1", 
JobConfig.SSP_GROUPER_FACTORY(), 
"org.apache.samza.container.grouper.stream.AllSspToSingleTaskGrouperFactory")));
+        ImmutableMap.of(TaskConfig.MAX_CONCURRENCY, "1", 
JobConfig.SSP_GROUPER_FACTORY(), 
"org.apache.samza.container.grouper.stream.AllSspToSingleTaskGrouperFactory")));
     ApplicationRunner appRunner = 
ApplicationRunners.getApplicationRunner(taskApplication, taskApplicationConfig);
 
     // Run the application.
diff --git 
a/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTableEndToEnd.java
 
b/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTableEndToEnd.java
index c3f0ba5..7760833 100644
--- 
a/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTableEndToEnd.java
+++ 
b/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTableEndToEnd.java
@@ -265,7 +265,7 @@ public class TestLocalTableEndToEnd extends 
IntegrationTestHarness {
     configs.put(JobConfig.JOB_NAME(), "test-table-job");
     configs.put(JobConfig.PROCESSOR_ID(), "1");
     configs.put(JobCoordinatorConfig.JOB_COORDINATOR_FACTORY, 
PassthroughJobCoordinatorFactory.class.getName());
-    configs.put(TaskConfig.GROUPER_FACTORY(), 
SingleContainerGrouperFactory.class.getName());
+    configs.put(TaskConfig.GROUPER_FACTORY, 
SingleContainerGrouperFactory.class.getName());
 
     // For intermediate streams
     configs.put("systems.kafka.samza.factory", 
"org.apache.samza.system.kafka.KafkaSystemFactory");
diff --git 
a/samza-test/src/test/scala/org/apache/samza/test/integration/StreamTaskTestUtil.scala
 
b/samza-test/src/test/scala/org/apache/samza/test/integration/StreamTaskTestUtil.scala
index 8982682..1263d46 100644
--- 
a/samza-test/src/test/scala/org/apache/samza/test/integration/StreamTaskTestUtil.scala
+++ 
b/samza-test/src/test/scala/org/apache/samza/test/integration/StreamTaskTestUtil.scala
@@ -44,6 +44,7 @@ import org.apache.samza.metrics.MetricsRegistryMap
 import org.apache.samza.storage.ChangelogStreamManager
 import org.apache.samza.system.{IncomingMessageEnvelope, SystemStreamPartition}
 import org.apache.samza.task._
+import org.apache.samza.util.ScalaJavaUtil.JavaOptionals
 import org.junit.Assert._
 
 import scala.collection.JavaConverters._
@@ -273,14 +274,14 @@ class StreamTaskTestUtil {
     val jobModel = new JobModel(mapConfig, containers)
     jobModel.maxChangeLogStreamPartitions = 1
 
-    val taskConfig = new TaskConfigJava(jobModel.getConfig)
-    val checkpointManager = Option(taskConfig.getCheckpointManager(new 
MetricsRegistryMap(), getClass.getClassLoader))
-    checkpointManager match {
-      case Some(checkpointManager) => {
-        checkpointManager.createResources
-        checkpointManager.stop
-      }
-      case _ => assert(checkpointManager != null, "No checkpoint manager 
factory configured")
+    val taskConfig = new TaskConfig(jobModel.getConfig)
+    val checkpointManagerOption = 
JavaOptionals.toRichOptional(taskConfig.getCheckpointManager(new 
MetricsRegistryMap(),
+      getClass.getClassLoader)).toOption
+    checkpointManagerOption match {
+      case Some(checkpointManager) =>
+        checkpointManager.createResources()
+        checkpointManager.stop()
+      case _ => throw new ConfigException("No checkpoint manager factory 
configured")
     }
 
     ChangelogStreamManager.createChangelogStreams(jobModel.getConfig, 
jobModel.maxChangeLogStreamPartitions)
diff --git 
a/samza-tools/src/main/java/org/apache/samza/tools/SamzaSqlConsole.java 
b/samza-tools/src/main/java/org/apache/samza/tools/SamzaSqlConsole.java
index ab60156..f26d691 100644
--- a/samza-tools/src/main/java/org/apache/samza/tools/SamzaSqlConsole.java
+++ b/samza-tools/src/main/java/org/apache/samza/tools/SamzaSqlConsole.java
@@ -116,7 +116,7 @@ public class SamzaSqlConsole {
     staticConfigs.put(JobConfig.JOB_NAME(), "sql-job");
     staticConfigs.put(JobConfig.PROCESSOR_ID(), "1");
     staticConfigs.put(JobCoordinatorConfig.JOB_COORDINATOR_FACTORY, 
PassthroughJobCoordinatorFactory.class.getName());
-    staticConfigs.put(TaskConfig.GROUPER_FACTORY(), 
SingleContainerGrouperFactory.class.getName());
+    staticConfigs.put(TaskConfig.GROUPER_FACTORY, 
SingleContainerGrouperFactory.class.getName());
 
     staticConfigs.put(SamzaSqlApplicationConfig.CFG_IO_RESOLVER, "config");
     String configIOResolverDomain =
diff --git 
a/samza-tools/src/main/java/org/apache/samza/tools/benchmark/SystemConsumerWithSamzaBench.java
 
b/samza-tools/src/main/java/org/apache/samza/tools/benchmark/SystemConsumerWithSamzaBench.java
index 683ced4..c7703b0 100644
--- 
a/samza-tools/src/main/java/org/apache/samza/tools/benchmark/SystemConsumerWithSamzaBench.java
+++ 
b/samza-tools/src/main/java/org/apache/samza/tools/benchmark/SystemConsumerWithSamzaBench.java
@@ -70,7 +70,7 @@ public class SystemConsumerWithSamzaBench extends 
AbstractSamzaBench {
     props.put(JobCoordinatorConfig.JOB_COORDINATOR_FACTORY, 
PassthroughJobCoordinatorFactory.class.getName());
     
props.put(String.format(ConfigBasedSspGrouperFactory.CONFIG_STREAM_PARTITIONS, 
streamId),
         Joiner.on(",").join(partitions));
-    props.put(TaskConfig.GROUPER_FACTORY(), 
ConfigBasedSspGrouperFactory.class.getName());
+    props.put(TaskConfig.GROUPER_FACTORY, 
ConfigBasedSspGrouperFactory.class.getName());
   }
 
   public void start() throws IOException, InterruptedException {

Reply via email to