mynameborat commented on a change in pull request #1039: SAMZA-2197: [Scala 
cleanup] Convert TaskConfig to Java 
URL: https://github.com/apache/samza/pull/1039#discussion_r286636175
 
 

 ##########
 File path: samza-core/src/main/java/org/apache/samza/config/TaskConfig.java
 ##########
 @@ -0,0 +1,298 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.config;
+
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.samza.Partition;
+import org.apache.samza.checkpoint.CheckpointManager;
+import org.apache.samza.checkpoint.CheckpointManagerFactory;
+import org.apache.samza.container.grouper.task.GroupByContainerCountFactory;
+import org.apache.samza.metrics.MetricsRegistry;
+import org.apache.samza.system.SystemStream;
+import org.apache.samza.system.SystemStreamPartition;
+import org.apache.samza.util.StreamUtil;
+import org.apache.samza.util.Util;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Helper for accessing config values related to tasks.
+ */
+public class TaskConfig extends MapConfig {
+  public static final Logger LOGGER = 
LoggerFactory.getLogger(TaskConfig.class);
+
+  // comma-separated list of system-streams
+  public static final String INPUT_STREAMS = "task.inputs";
+  // window period in milliseconds
+  public static final String WINDOW_MS = "task.window.ms";
+  static final long DEFAULT_WINDOW_MS = -1L;
+  // commit period in milliseconds
+  public static final String COMMIT_MS = "task.commit.ms";
+  static final long DEFAULT_COMMIT_MS = 60000L;
+  // how long to wait for a clean shutdown
+  public static final String TASK_SHUTDOWN_MS = "task.shutdown.ms";
+  static final long DEFAULT_TASK_SHUTDOWN_MS = 30000L;
+  // legacy config for specifying task class; replaced by SamzaApplication and 
app.class
+  public static final String TASK_CLASS = "task.class";
+  // command builder to use for launching a Samza job
+  public static final String COMMAND_BUILDER = "task.command.class";
+  // message chooser for controlling stream consumption
+  public static final String MESSAGE_CHOOSER_CLASS_NAME = "task.chooser.class";
+  // define whether drop the messages or not when deserialization fails
+  public static final String DROP_DESERIALIZATION_ERRORS = 
"task.drop.deserialization.errors";
+  // define whether drop the messages or not when serialization fails
+  public static final String DROP_SERIALIZATION_ERRORS = 
"task.drop.serialization.errors";
+  // whether to ignore producer errors and drop the messages that failed to 
send
+  public static final String DROP_PRODUCER_ERRORS = 
"task.drop.producer.errors";
+  // exceptions to ignore in process and window
+  public static final String IGNORED_EXCEPTIONS = "task.ignored.exceptions";
+  // class name for task grouper
+  public static final String GROUPER_FACTORY = "task.name.grouper.factory";
+  // max number of concurrent process for a AsyncStreamTask
+  public static final String MAX_CONCURRENCY = "task.max.concurrency";
+  static final int DEFAULT_MAX_CONCURRENCY = 1;
+  // timeout period for triggering a callback
+  public static final String CALLBACK_TIMEOUT_MS = "task.callback.timeout.ms";
+  static final long DEFAULT_CALLBACK_TIMEOUT_MS = -1L;
+  // to enable async commit in a AsyncStreamTask
+  public static final String ASYNC_COMMIT = "task.async.commit";
+  // maximum time to wait for a task worker to complete when there are no new 
messages to handle
+  public static final String MAX_IDLE_MS = "task.max.idle.ms";
+  static final long DEFAULT_MAX_IDLE_MS = 10L;
+  /**
+   * Samza's container polls for more messages under two conditions. The first
+   * condition arises when there are simply no remaining buffered messages to
+   * process for any input SystemStreamPartition. The second condition arises
+   * when some input SystemStreamPartitions have empty buffers, but some do
+   * not. In the latter case, a polling interval is defined to determine how
+   * often to refresh the empty SystemStreamPartition buffers. By default,
+   * this interval is 50ms, which means that any empty SystemStreamPartition
+   * buffer will be refreshed at least every 50ms. A higher value here means
+   * that empty SystemStreamPartitions will be refreshed less often, which
+   * means more latency is introduced, but less CPU and network will be used.
+   * Decreasing this value means that empty SystemStreamPartitions are
+   * refreshed more frequently, thereby introducing less latency, but
+   * increasing CPU and network utilization.
+   */
+  public static final String POLL_INTERVAL_MS = "task.poll.interval.ms";
+  // broadcast streams consumed by all tasks. e.g. kafka.foo#1
+  public static final String BROADCAST_INPUT_STREAMS = "task.broadcast.inputs";
+  private static final String BROADCAST_STREAM_PATTERN = "^[\\d]+$";
+  private static final String BROADCAST_STREAM_RANGE_PATTERN = 
"^\\[[\\d]+\\-[\\d]+\\]$";
+  // class name to use when sending offset checkpoints
+  public static final String CHECKPOINT_MANAGER_FACTORY = 
"task.checkpoint.factory";
+
+  public TaskConfig(Config config) {
+    super(config);
+  }
+
+  public Set<SystemStream> getInputStreams() {
+    Optional<String> inputStreams = Optional.ofNullable(get(INPUT_STREAMS));
+    if (!inputStreams.isPresent() || inputStreams.get().isEmpty()) {
+      return Collections.emptySet();
+    } else {
+      return Stream.of(inputStreams.get().split(","))
+          .map(systemStreamNames -> 
StreamUtil.getSystemStreamFromNames(systemStreamNames.trim()))
+          .collect(Collectors.toSet());
+    }
+  }
+
+  public long getWindowMs() {
+    return getLong(WINDOW_MS, DEFAULT_WINDOW_MS);
+  }
+
+  public long getCommitMs() {
+    return getLong(COMMIT_MS, DEFAULT_COMMIT_MS);
+  }
+
+  public Optional<String> getTaskClass() {
+    return Optional.ofNullable(get(TASK_CLASS));
+  }
+
+  public String getCommandClass(String defaultCommandClass) {
+    return get(COMMAND_BUILDER, defaultCommandClass);
+  }
+
+  public Optional<String> getMessageChooserClass() {
+    return Optional.ofNullable(get(MESSAGE_CHOOSER_CLASS_NAME));
+  }
+
+  public boolean getDropDeserializationErrors() {
+    return getBoolean(DROP_DESERIALIZATION_ERRORS, false);
+  }
+
+  public boolean getDropSerializationErrors() {
+    return getBoolean(DROP_SERIALIZATION_ERRORS, false);
+  }
+
+  public boolean getDropProducerErrors() {
+    return getBoolean(DROP_PRODUCER_ERRORS, false);
+  }
+
+  public Optional<Integer> getPollIntervalMs() {
+    return Optional.ofNullable(get(POLL_INTERVAL_MS)).map(Integer::parseInt);
+  }
+
+  public Optional<String> getIgnoredExceptions() {
+    return Optional.ofNullable(get(IGNORED_EXCEPTIONS));
+  }
+
+  public String getTaskNameGrouperFactory() {
+    Optional<String> taskNameGrouperFactory = 
Optional.ofNullable(get(GROUPER_FACTORY));
+    if (taskNameGrouperFactory.isPresent()) {
+      return taskNameGrouperFactory.get();
+    } else {
+      LOGGER.info(String.format("No %s configuration, using %s", 
GROUPER_FACTORY,
+          GroupByContainerCountFactory.class.getName()));
+      return GroupByContainerCountFactory.class.getName();
+    }
+  }
+
+  public int getMaxConcurrency() {
+    return getInt(MAX_CONCURRENCY, DEFAULT_MAX_CONCURRENCY);
+  }
+
+  public long getCallbackTimeoutMs() {
+    return getLong(CALLBACK_TIMEOUT_MS, DEFAULT_CALLBACK_TIMEOUT_MS);
+  }
+
+  public boolean getAsyncCommit() {
+    return getBoolean(ASYNC_COMMIT, false);
+  }
+
+  public long getMaxIdleMs() {
+    return getLong(MAX_IDLE_MS, DEFAULT_MAX_IDLE_MS);
+  }
+
+  /**
+   * Create the checkpoint manager
+   *
+   * @param metricsRegistry Registry of metrics to use. Can be null if not 
using metrics.
+   * @return CheckpointManager object if checkpoint manager factory is 
configured, otherwise empty.
+   */
+  public Optional<CheckpointManager> getCheckpointManager(MetricsRegistry 
metricsRegistry) {
 
 Review comment:
   This class feels overloaded exposing config and managing some object 
instantiations. 
   Would it be possible to separate out the responsibilities as part of this 
PR? If not, can we create a clean up ticket to track this.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to