SAMZA-1868: Create new SamzaAmdmin for Kafka

This Request is a copy of #647(got garbled). This PR  already addresses all the 
comments brought up in the other request.

Author: Boris S <[email protected]>
Author: Boris S <[email protected]>
Author: Boris Shkolnik <[email protected]>

Reviewers: Shanthoosh Venkatraman <[email protected]>, Prateek Maheshwari 
<[email protected]>

Closes #662 from sborya/NewConsumerAdmin2


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/63d33fa0
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/63d33fa0
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/63d33fa0

Branch: refs/heads/master
Commit: 63d33fa0617488d25a0d3fb061423271392d20f6
Parents: 3c78e06
Author: Boris S <[email protected]>
Authored: Thu Oct 11 14:26:51 2018 -0700
Committer: Boris S <[email protected]>
Committed: Thu Oct 11 14:26:51 2018 -0700

----------------------------------------------------------------------
 .../samza/application/ApplicationUtil.java      |   1 -
 .../org/apache/samza/config/SystemConfig.scala  |   2 +-
 .../samza/config/KafkaConsumerConfig.java       | 194 ++++++
 .../samza/system/kafka/KafkaSystemAdmin.java    | 665 +++++++++++++++++++
 .../samza/system/kafka/KafkaSystemConsumer.java | 366 ++++++++++
 .../org/apache/samza/config/KafkaConfig.scala   |   5 +
 .../samza/config/KafkaConsumerConfig.java       | 201 ------
 .../samza/system/kafka/KafkaConsumerProxy.java  |   2 +
 .../samza/system/kafka/KafkaSystemAdmin.scala   | 608 -----------------
 .../kafka/KafkaSystemAdminUtilsScala.scala      | 192 ++++++
 .../samza/system/kafka/KafkaSystemConsumer.java | 371 -----------
 .../samza/system/kafka/KafkaSystemFactory.scala |  63 +-
 .../scala/org/apache/samza/util/KafkaUtil.scala |  11 -
 .../samza/config/TestKafkaConsumerConfig.java   |  60 +-
 .../system/kafka/TestKafkaSystemAdminJava.java  | 185 ++++--
 .../kafka/TestKafkaSystemAdminWithMock.java     | 317 +++++++++
 .../system/kafka/TestKafkaSystemConsumer.java   | 225 +++++++
 .../kafka/TestKafkaSystemConsumerMetrics.java   | 109 +++
 .../system/kafka/TestKafkaSystemAdmin.scala     | 109 ++-
 .../system/kafka/TestKafkaSystemConsumer.java   | 220 ------
 .../operator/TestRepartitionJoinWindowApp.java  |  18 +-
 .../AbstractIntegrationTestHarness.scala        |  60 +-
 22 files changed, 2366 insertions(+), 1618 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/63d33fa0/samza-core/src/main/java/org/apache/samza/application/ApplicationUtil.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/application/ApplicationUtil.java 
b/samza-core/src/main/java/org/apache/samza/application/ApplicationUtil.java
index b39ad3c..f779619 100644
--- a/samza-core/src/main/java/org/apache/samza/application/ApplicationUtil.java
+++ b/samza-core/src/main/java/org/apache/samza/application/ApplicationUtil.java
@@ -59,5 +59,4 @@ public class ApplicationUtil {
     }
     return new LegacyTaskApplication(taskClassOption.get());
   }
-
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/samza/blob/63d33fa0/samza-core/src/main/scala/org/apache/samza/config/SystemConfig.scala
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/scala/org/apache/samza/config/SystemConfig.scala 
b/samza-core/src/main/scala/org/apache/samza/config/SystemConfig.scala
index bebdbd8..00e65a7 100644
--- a/samza-core/src/main/scala/org/apache/samza/config/SystemConfig.scala
+++ b/samza-core/src/main/scala/org/apache/samza/config/SystemConfig.scala
@@ -50,7 +50,7 @@ class SystemConfig(config: Config) extends 
ScalaMapConfig(config) with Logging {
 
   def getDefaultSystemOffset(systemName: String) = 
getOption(SystemConfig.CONSUMER_OFFSET_DEFAULT format (systemName))
 
-  def deleteCommittedMessages(systemName: String) = 
getOption(SystemConfig.DELETE_COMMITTED_MESSAGES format (systemName))
+  def deleteCommittedMessages(systemName: String) = 
getBoolean(SystemConfig.DELETE_COMMITTED_MESSAGES format (systemName), false)
 
   /**
    * Returns a list of all system names from the config file. Useful for

http://git-wip-us.apache.org/repos/asf/samza/blob/63d33fa0/samza-kafka/src/main/java/org/apache/samza/config/KafkaConsumerConfig.java
----------------------------------------------------------------------
diff --git 
a/samza-kafka/src/main/java/org/apache/samza/config/KafkaConsumerConfig.java 
b/samza-kafka/src/main/java/org/apache/samza/config/KafkaConsumerConfig.java
new file mode 100644
index 0000000..ad17e82
--- /dev/null
+++ b/samza-kafka/src/main/java/org/apache/samza/config/KafkaConsumerConfig.java
@@ -0,0 +1,194 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.samza.config;
+
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.tuple.ImmutablePair;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.RangeAssignor;
+import org.apache.kafka.common.serialization.ByteArrayDeserializer;
+import org.apache.samza.SamzaException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.Option;
+
+
+/**
+ * The configuration class for KafkaConsumer
+ */
+public class KafkaConsumerConfig extends HashMap<String, Object> {
+
+  public static final Logger LOG = 
LoggerFactory.getLogger(KafkaConsumerConfig.class);
+
+  public static final String ZOOKEEPER_CONNECT = "zookeeper.connect";
+
+  private final String systemName;
+  /*
+   * By default, KafkaConsumer will fetch some big number of available 
messages for all the partitions.
+   * This may cause memory issues. That's why we will limit the number of 
messages per partition we get on EACH poll().
+   */
+  static final String DEFAULT_KAFKA_CONSUMER_MAX_POLL_RECORDS = "100";
+
+  private KafkaConsumerConfig(Map<String, Object> props, String systemName) {
+    super(props);
+    this.systemName = systemName;
+  }
+
+  /**
+   * Create kafka consumer configs, based on the subset of global configs.
+   * @param config application config
+   * @param systemName system name
+   * @param clientId client id provided by the caller
+   * @return KafkaConsumerConfig
+   */
+  public static KafkaConsumerConfig getKafkaSystemConsumerConfig(Config 
config, String systemName, String clientId) {
+
+    Config subConf = config.subset(String.format("systems.%s.consumer.", 
systemName), true);
+
+    final String groupId = createConsumerGroupId(config);
+
+    Map<String, Object> consumerProps = new HashMap<>();
+    consumerProps.putAll(subConf);
+
+    consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
+    consumerProps.put(ConsumerConfig.CLIENT_ID_CONFIG, clientId);
+
+    // These are values we enforce in sazma, and they cannot be overwritten.
+
+    // Disable consumer auto-commit because Samza controls commits
+    consumerProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
+
+    // Translate samza config value to kafka config value
+    consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
+        getAutoOffsetResetValue((String) 
consumerProps.get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG)));
+
+    // if consumer bootstrap servers are not configured, get them from the 
producer configs
+    if (!subConf.containsKey(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG)) {
+      String bootstrapServers =
+          config.get(String.format("systems.%s.producer.%s", systemName, 
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG));
+      if (StringUtils.isEmpty(bootstrapServers)) {
+        throw new SamzaException("Missing " + 
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG + " config  for " + systemName);
+      }
+      consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 
bootstrapServers);
+    }
+
+    // Always use default partition assignment strategy. Do not allow override.
+    consumerProps.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, 
RangeAssignor.class.getName());
+
+    // the consumer is fully typed, and deserialization can be too. But in 
case it is not provided we should
+    // default to byte[]
+    if 
(!consumerProps.containsKey(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG)) {
+      LOG.info("setting key serialization for the consumer(for system {}) to 
ByteArrayDeserializer", systemName);
+      consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
ByteArrayDeserializer.class.getName());
+    }
+    if 
(!consumerProps.containsKey(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG)) {
+      LOG.info("setting value serialization for the consumer(for system {}) to 
ByteArrayDeserializer", systemName);
+      consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
ByteArrayDeserializer.class.getName());
+    }
+
+    // Override default max poll config if there is no value
+    consumerProps.computeIfAbsent(ConsumerConfig.MAX_POLL_RECORDS_CONFIG,
+        (k) -> DEFAULT_KAFKA_CONSUMER_MAX_POLL_RECORDS);
+
+    return new KafkaConsumerConfig(consumerProps, systemName);
+  }
+
+  public String getClientId() {
+    String clientId = (String) get(ConsumerConfig.CLIENT_ID_CONFIG);
+    if (StringUtils.isBlank(clientId)) {
+      throw new SamzaException("client Id is not set for consumer for system=" 
+ systemName);
+    }
+    return clientId;
+  }
+
+  // group id should be unique per job
+  static String createConsumerGroupId(Config config) {
+    Pair<String, String> jobNameId = getJobNameAndId(config);
+
+    return String.format("%s-%s", jobNameId.getLeft(), jobNameId.getRight());
+  }
+
+  // client id should be unique per job
+  public static String createClientId(String prefix, Config config) {
+
+    Pair<String, String> jobNameId = getJobNameAndId(config);
+    String jobName = jobNameId.getLeft();
+    String jobId = jobNameId.getRight();
+    return String.format("%s-%s-%s", prefix.replaceAll("\\W", "_"), 
jobName.replaceAll("\\W", "_"),
+        jobId.replaceAll("\\W", "_"));
+  }
+
+  public static Pair<String, String> getJobNameAndId(Config config) {
+    JobConfig jobConfig = new JobConfig(config);
+    Option jobNameOption = jobConfig.getName();
+    if (jobNameOption.isEmpty()) {
+      throw new ConfigException("Missing job name");
+    }
+    String jobName = (String) jobNameOption.get();
+    return new ImmutablePair<>(jobName, jobConfig.getJobId());
+  }
+
+  /**
+   * If settings for auto.reset in samza are different from settings in Kafka 
(auto.offset.reset),
+   * then need to convert them (see kafka.apache.org/documentation):
+   * "largest" -> "latest"
+   * "smallest" -> "earliest"
+   *
+   * If no setting specified we return "latest" (same as Kafka).
+   * @param autoOffsetReset value from the app provided config
+   * @return String representing the config value for "auto.offset.reset" 
property
+   */
+  static String getAutoOffsetResetValue(final String autoOffsetReset) {
+    final String SAMZA_OFFSET_LARGEST = "largest";
+    final String SAMZA_OFFSET_SMALLEST = "smallest";
+    final String KAFKA_OFFSET_LATEST = "latest";
+    final String KAFKA_OFFSET_EARLIEST = "earliest";
+    final String KAFKA_OFFSET_NONE = "none";
+
+    if (autoOffsetReset == null) {
+      return KAFKA_OFFSET_LATEST; // return default
+    }
+
+    // accept kafka values directly
+    if (autoOffsetReset.equals(KAFKA_OFFSET_EARLIEST) || 
autoOffsetReset.equals(KAFKA_OFFSET_LATEST)
+        || autoOffsetReset.equals(KAFKA_OFFSET_NONE)) {
+      return autoOffsetReset;
+    }
+
+    String newAutoOffsetReset;
+    switch (autoOffsetReset) {
+      case SAMZA_OFFSET_LARGEST:
+        newAutoOffsetReset = KAFKA_OFFSET_LATEST;
+        break;
+      case SAMZA_OFFSET_SMALLEST:
+        newAutoOffsetReset = KAFKA_OFFSET_EARLIEST;
+        break;
+      default:
+        newAutoOffsetReset = KAFKA_OFFSET_LATEST;
+    }
+    LOG.info("AutoOffsetReset value converted from {} to {}", autoOffsetReset, 
newAutoOffsetReset);
+    return newAutoOffsetReset;
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/samza/blob/63d33fa0/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaSystemAdmin.java
----------------------------------------------------------------------
diff --git 
a/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaSystemAdmin.java 
b/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaSystemAdmin.java
new file mode 100644
index 0000000..cb2db10
--- /dev/null
+++ 
b/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaSystemAdmin.java
@@ -0,0 +1,665 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.system.kafka;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableSet;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+import kafka.admin.AdminClient;
+import kafka.utils.ZkUtils;
+import org.apache.commons.lang3.NotImplementedException;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.samza.Partition;
+import org.apache.samza.SamzaException;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.KafkaConfig;
+import org.apache.samza.config.SystemConfig;
+import org.apache.samza.system.ExtendedSystemAdmin;
+import org.apache.samza.system.StreamSpec;
+import org.apache.samza.system.StreamValidationException;
+import org.apache.samza.system.SystemStreamMetadata;
+import org.apache.samza.system.SystemStreamPartition;
+import org.apache.samza.util.ExponentialSleepStrategy;
+import org.apache.samza.util.ScalaJavaUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.Function0;
+import scala.Function1;
+import scala.Function2;
+import scala.collection.JavaConverters;
+import scala.runtime.AbstractFunction0;
+import scala.runtime.AbstractFunction1;
+import scala.runtime.AbstractFunction2;
+import scala.runtime.BoxedUnit;
+
+import static org.apache.samza.config.KafkaConsumerConfig.*;
+
+
+public class KafkaSystemAdmin implements ExtendedSystemAdmin {
+  private static final Logger LOG = 
LoggerFactory.getLogger(KafkaSystemAdmin.class);
+
+  // Default exponential sleep strategy values
+  protected static final double DEFAULT_EXPONENTIAL_SLEEP_BACK_OFF_MULTIPLIER 
= 2.0;
+  protected static final long DEFAULT_EXPONENTIAL_SLEEP_INITIAL_DELAY_MS = 500;
+  protected static final long DEFAULT_EXPONENTIAL_SLEEP_MAX_DELAY_MS = 10000;
+  protected static final int MAX_RETRIES_ON_EXCEPTION = 5;
+  protected static final int DEFAULT_REPL_FACTOR = 2;
+
+  // used in TestRepartitionJoinWindowApp TODO - remove SAMZA-1945
+  @VisibleForTesting
+  public static volatile boolean deleteMessageCalled = false;
+
+  protected final String systemName;
+  protected final Consumer metadataConsumer;
+
+  // get ZkUtils object to connect to Kafka's ZK.
+  private final Supplier<ZkUtils> getZkConnection;
+
+  // Custom properties to create a new coordinator stream.
+  private final Properties coordinatorStreamProperties;
+
+  // Replication factor for a new coordinator stream.
+  private final int coordinatorStreamReplicationFactor;
+
+  // Replication factor and kafka properties for changelog topic creation
+  private final Map<String, ChangelogInfo> changelogTopicMetaInformation;
+
+  // Kafka properties for intermediate topics creation
+  private final Map<String, Properties> intermediateStreamProperties;
+
+  // adminClient is required for deleteCommittedMessages operation
+  private final AdminClient adminClient;
+
+  // used for intermediate streams
+  private final boolean deleteCommittedMessages;
+
+  private final AtomicBoolean stopped = new AtomicBoolean(false);
+
+  public KafkaSystemAdmin(String systemName, Config config, Consumer 
metadataConsumer) {
+    this.systemName = systemName;
+
+    if (metadataConsumer == null) {
+      throw new SamzaException(
+          "Cannot construct KafkaSystemAdmin for system " + systemName + " 
with null metadataConsumer");
+    }
+    this.metadataConsumer = metadataConsumer;
+
+    // populate brokerList from either consumer or producer configs
+    Properties props = new Properties();
+    String brokerList = config.get(
+        String.format(KafkaConfig.CONSUMER_CONFIGS_CONFIG_KEY(), systemName, 
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG));
+    if (brokerList == null) {
+      brokerList = 
config.get(String.format(KafkaConfig.PRODUCER_CONFIGS_CONFIG_KEY(), systemName,
+          ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG));
+    }
+    if (brokerList == null) {
+      throw new SamzaException(
+          ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG + " is required for 
systemAdmin for system " + systemName);
+    }
+    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);
+
+    // kafka.admin.AdminUtils requires zkConnect
+    // this will change after we move to the new org.apache..AdminClient
+    String zkConnect =
+        config.get(String.format(KafkaConfig.CONSUMER_CONFIGS_CONFIG_KEY(), 
systemName, ZOOKEEPER_CONNECT));
+    if (StringUtils.isBlank(zkConnect)) {
+      throw new SamzaException("Missing zookeeper.connect config for admin for 
system " + systemName);
+    }
+    props.put(ZOOKEEPER_CONNECT, zkConnect);
+
+    adminClient = AdminClient.create(props);
+
+    getZkConnection = () -> {
+      return ZkUtils.apply(zkConnect, 6000, 6000, false);
+    };
+
+    KafkaConfig kafkaConfig = new KafkaConfig(config);
+    coordinatorStreamReplicationFactor = 
Integer.valueOf(kafkaConfig.getCoordinatorReplicationFactor());
+    coordinatorStreamProperties = 
KafkaSystemAdminUtilsScala.getCoordinatorTopicProperties(kafkaConfig);
+
+    Map<String, String> storeToChangelog =
+        
JavaConverters.mapAsJavaMapConverter(kafkaConfig.getKafkaChangelogEnabledStores()).asJava();
+    // Construct the meta information for each topic, if the replication 
factor is not defined,
+    // we use 2 (DEFAULT_REPL_FACTOR) as the number of replicas for the change 
log stream.
+    changelogTopicMetaInformation = new HashMap<>();
+    for (Map.Entry<String, String> e : storeToChangelog.entrySet()) {
+      String storeName = e.getKey();
+      String topicName = e.getValue();
+      String replicationFactorStr = 
kafkaConfig.getChangelogStreamReplicationFactor(storeName);
+      int replicationFactor =
+          StringUtils.isEmpty(replicationFactorStr) ? DEFAULT_REPL_FACTOR : 
Integer.valueOf(replicationFactorStr);
+      ChangelogInfo changelogInfo =
+          new ChangelogInfo(replicationFactor, 
kafkaConfig.getChangelogKafkaProperties(storeName));
+      LOG.info(String.format("Creating topic meta information for topic: %s 
with replication factor: %s", topicName,
+          replicationFactor));
+      changelogTopicMetaInformation.put(topicName, changelogInfo);
+    }
+
+    // special flag to allow/enforce deleting of committed messages
+    SystemConfig systemConfig = new SystemConfig(config);
+    this.deleteCommittedMessages = 
systemConfig.deleteCommittedMessages(systemName);
+
+    intermediateStreamProperties =
+        
JavaConverters.mapAsJavaMapConverter(KafkaSystemAdminUtilsScala.getIntermediateStreamProperties(config))
+            .asJava();
+
+    LOG.info(String.format("Created KafkaSystemAdmin for system %s", 
systemName));
+  }
+
+  @Override
+  public void start() {
+    // Plese note. There is slight inconsistency in the use of this class.
+    // Some of the functionality of this class may actually be used BEFORE 
start() is called.
+    // The SamzaContainer gets metadata (using this class) in 
SamzaContainer.apply,
+    // but this "start" actually gets called in SamzaContainer.run.
+    // review this usage (SAMZA-1888)
+
+    // Throw exception if start is called after stop
+    if (stopped.get()) {
+      throw new IllegalStateException("SamzaKafkaAdmin.start() is called after 
stop()");
+    }
+  }
+
+  @Override
+  public void stop() {
+    if (stopped.compareAndSet(false, true)) {
+      try {
+        metadataConsumer.close();
+      } catch (Exception e) {
+        LOG.warn("metadataConsumer.close for system " + systemName + " failed 
with exception.", e);
+      }
+      try {
+        adminClient.close();
+      } catch (Exception e) {
+        LOG.warn("adminClient.close for system " + systemName + " failed with 
exception.", e);
+      }
+    }
+  }
+
+  /**
+   * Note! This method does not populate SystemStreamMetadata for each stream 
with real data.
+   * Thus, this method should ONLY be used to get number of partitions for 
each stream.
+   * It will throw NotImplementedException if anyone tries to access the 
actual metadata.
+   * @param streamNames set of streams for which get the partitions counts
+   * @param cacheTTL cache TTL if caching the data
+   * @return a map, keyed on stream names. Number of partitions in 
SystemStreamMetadata is the output of this method.
+   */
+  @Override
+  public Map<String, SystemStreamMetadata> 
getSystemStreamPartitionCounts(Set<String> streamNames, long cacheTTL) {
+    // This optimization omits actual metadata for performance. Instead, we 
inject a dummy for all partitions.
+    final SystemStreamMetadata.SystemStreamPartitionMetadata dummySspm =
+        new SystemStreamMetadata.SystemStreamPartitionMetadata(null, null, 
null) {
+          String msg =
+              "getSystemStreamPartitionCounts does not populate 
SystemStreaMetadata info. Only number of partitions";
+
+          @Override
+          public String getOldestOffset() {
+            throw new NotImplementedException(msg);
+          }
+
+          @Override
+          public String getNewestOffset() {
+            throw new NotImplementedException(msg);
+          }
+
+          @Override
+          public String getUpcomingOffset() {
+            throw new NotImplementedException(msg);
+          }
+        };
+
+    ExponentialSleepStrategy strategy = new 
ExponentialSleepStrategy(DEFAULT_EXPONENTIAL_SLEEP_BACK_OFF_MULTIPLIER,
+        DEFAULT_EXPONENTIAL_SLEEP_INITIAL_DELAY_MS, 
DEFAULT_EXPONENTIAL_SLEEP_MAX_DELAY_MS);
+
+    Function1<ExponentialSleepStrategy.RetryLoop, Map<String, 
SystemStreamMetadata>> fetchMetadataOperation =
+        new AbstractFunction1<ExponentialSleepStrategy.RetryLoop, Map<String, 
SystemStreamMetadata>>() {
+          @Override
+          public Map<String, SystemStreamMetadata> 
apply(ExponentialSleepStrategy.RetryLoop loop) {
+            Map<String, SystemStreamMetadata> allMetadata = new HashMap<>();
+
+            streamNames.forEach(streamName -> {
+              Map<Partition, 
SystemStreamMetadata.SystemStreamPartitionMetadata> partitionMetadata = new 
HashMap<>();
+
+              List<PartitionInfo> partitionInfos = 
metadataConsumer.partitionsFor(streamName);
+              LOG.debug("Stream {} has partitions {}", streamName, 
partitionInfos);
+
+              partitionInfos.forEach(partitionInfo -> {
+                partitionMetadata.put(new 
Partition(partitionInfo.partition()), dummySspm);
+              });
+
+              allMetadata.put(streamName, new SystemStreamMetadata(streamName, 
partitionMetadata));
+            });
+
+            loop.done();
+            return allMetadata;
+          }
+        };
+
+    Map<String, SystemStreamMetadata> result = 
strategy.run(fetchMetadataOperation,
+        new AbstractFunction2<Exception, ExponentialSleepStrategy.RetryLoop, 
BoxedUnit>() {
+          @Override
+          public BoxedUnit apply(Exception exception, 
ExponentialSleepStrategy.RetryLoop loop) {
+            if (loop.sleepCount() < MAX_RETRIES_ON_EXCEPTION) {
+              LOG.warn(String.format("Fetching systemstreampartition counts 
for: %s threw an exception. Retrying.",
+                  streamNames), exception);
+            } else {
+              LOG.error(String.format("Fetching systemstreampartition counts 
for: %s threw an exception.", streamNames),
+                  exception);
+              loop.done();
+              throw new SamzaException(exception);
+            }
+            return null;
+          }
+        }).get();
+
+    LOG.info("SystemStream partition counts for system {}: {}", systemName, 
result);
+    return result;
+  }
+
+  @Override
+  public Map<SystemStreamPartition, String> 
getOffsetsAfter(Map<SystemStreamPartition, String> offsets) {
+    // This is safe to do with Kafka, even if a topic is key-deduped. If the
+    // offset doesn't exist on a compacted topic, Kafka will return the first
+    // message AFTER the offset that was specified in the fetch request.
+    return offsets.entrySet()
+        .stream()
+        .collect(Collectors.toMap(Map.Entry::getKey, (entry) -> 
String.valueOf(Long.valueOf(entry.getValue()) + 1)));
+  }
+
+  @Override
+  public Map<String, SystemStreamMetadata> getSystemStreamMetadata(Set<String> 
streamNames) {
+    return getSystemStreamMetadata(streamNames,
+        new 
ExponentialSleepStrategy(DEFAULT_EXPONENTIAL_SLEEP_BACK_OFF_MULTIPLIER,
+            DEFAULT_EXPONENTIAL_SLEEP_INITIAL_DELAY_MS, 
DEFAULT_EXPONENTIAL_SLEEP_MAX_DELAY_MS));
+  }
+
+  @Override
+  public Map<SystemStreamPartition, 
SystemStreamMetadata.SystemStreamPartitionMetadata> getSSPMetadata(
+      Set<SystemStreamPartition> ssps) {
+
+    LOG.info("Fetching SSP metadata for: {}", ssps);
+    List<TopicPartition> topicPartitions = ssps.stream()
+        .map(ssp -> new TopicPartition(ssp.getStream(), 
ssp.getPartition().getPartitionId()))
+        .collect(Collectors.toList());
+
+    OffsetsMaps topicPartitionsMetadata = 
fetchTopicPartitionsMetadata(topicPartitions);
+
+    Map<SystemStreamPartition, 
SystemStreamMetadata.SystemStreamPartitionMetadata> sspToSSPMetadata = new 
HashMap<>();
+    for (SystemStreamPartition ssp : ssps) {
+      String oldestOffset = 
topicPartitionsMetadata.getOldestOffsets().get(ssp);
+      String newestOffset = 
topicPartitionsMetadata.getNewestOffsets().get(ssp);
+      String upcomingOffset = 
topicPartitionsMetadata.getUpcomingOffsets().get(ssp);
+
+      sspToSSPMetadata.put(ssp,
+          new SystemStreamMetadata.SystemStreamPartitionMetadata(oldestOffset, 
newestOffset, upcomingOffset));
+    }
+    return sspToSSPMetadata;
+  }
+
+  /**
+   * Given a set of stream names (topics), fetch metadata from Kafka for each
+   * stream, and return a map from stream name to SystemStreamMetadata for
+   * each stream. This method will return null for oldest and newest offsets
+   * if a given SystemStreamPartition is empty. This method will block and
+   * retry indefinitely until it gets a successful response from Kafka.
+   *
+   * @param streamNames a set of strings of stream names/topics
+   * @param retryBackoff retry backoff strategy
+   * @return a map from topic to SystemStreamMetadata which has offsets for 
each partition
+   */
+  public Map<String, SystemStreamMetadata> getSystemStreamMetadata(Set<String> 
streamNames,
+      ExponentialSleepStrategy retryBackoff) {
+
+    LOG.info("Fetching system stream metadata for {} from system {}", 
streamNames, systemName);
+
+    Function1<ExponentialSleepStrategy.RetryLoop, Map<String, 
SystemStreamMetadata>> fetchMetadataOperation =
+        new AbstractFunction1<ExponentialSleepStrategy.RetryLoop, Map<String, 
SystemStreamMetadata>>() {
+          @Override
+          public Map<String, SystemStreamMetadata> 
apply(ExponentialSleepStrategy.RetryLoop loop) {
+            Map<String, SystemStreamMetadata> metadata = 
fetchSystemStreamMetadata(streamNames);
+            loop.done();
+            return metadata;
+          }
+        };
+
+    Function2<Exception, ExponentialSleepStrategy.RetryLoop, BoxedUnit> 
onExceptionRetryOperation =
+        new AbstractFunction2<Exception, ExponentialSleepStrategy.RetryLoop, 
BoxedUnit>() {
+          @Override
+          public BoxedUnit apply(Exception exception, 
ExponentialSleepStrategy.RetryLoop loop) {
+            if (loop.sleepCount() < MAX_RETRIES_ON_EXCEPTION) {
+              LOG.warn(
+                  String.format("Fetching system stream metadata for: %s threw 
an exception. Retrying.", streamNames),
+                  exception);
+            } else {
+              LOG.error(String.format("Fetching system stream metadata for: %s 
threw an exception.", streamNames),
+                  exception);
+              loop.done();
+              throw new SamzaException(exception);
+            }
+
+            return null;
+          }
+        };
+
+    Function0<Map<String, SystemStreamMetadata>> fallbackOperation =
+        new AbstractFunction0<Map<String, SystemStreamMetadata>>() {
+          @Override
+          public Map<String, SystemStreamMetadata> apply() {
+            throw new SamzaException("Failed to get system stream metadata");
+          }
+        };
+
+    Map<String, SystemStreamMetadata> result =
+        retryBackoff.run(fetchMetadataOperation, 
onExceptionRetryOperation).getOrElse(fallbackOperation);
+    return result;
+  }
+
+  @Override
+  public String getNewestOffset(SystemStreamPartition ssp, Integer maxRetries) 
{
+    LOG.info("Fetching newest offset for: {}", ssp);
+
+    ExponentialSleepStrategy strategy = new 
ExponentialSleepStrategy(DEFAULT_EXPONENTIAL_SLEEP_BACK_OFF_MULTIPLIER,
+        DEFAULT_EXPONENTIAL_SLEEP_INITIAL_DELAY_MS, 
DEFAULT_EXPONENTIAL_SLEEP_MAX_DELAY_MS);
+
+    Function1<ExponentialSleepStrategy.RetryLoop, String> fetchNewestOffset =
+        new AbstractFunction1<ExponentialSleepStrategy.RetryLoop, String>() {
+          @Override
+          public String apply(ExponentialSleepStrategy.RetryLoop loop) {
+            String result = fetchNewestOffset(ssp);
+            loop.done();
+            return result;
+          }
+        };
+
+    String offset = strategy.run(fetchNewestOffset,
+        new AbstractFunction2<Exception, ExponentialSleepStrategy.RetryLoop, 
BoxedUnit>() {
+          @Override
+          public BoxedUnit apply(Exception exception, 
ExponentialSleepStrategy.RetryLoop loop) {
+            if (loop.sleepCount() < maxRetries) {
+              LOG.warn(String.format("Fetching newest offset for: %s threw an 
exception. Retrying.", ssp), exception);
+            } else {
+              LOG.error(String.format("Fetching newest offset for: %s threw an 
exception.", ssp), exception);
+              loop.done();
+              throw new SamzaException("Exception while trying to get newest 
offset", exception);
+            }
+            return null;
+          }
+        }).get();
+
+    return offset;
+  }
+
+  /**
+   * Convert TopicPartition to SystemStreamPartition
+   * @param topicPartition the topic partition to be created
+   * @return an instance of SystemStreamPartition
+   */
+  private SystemStreamPartition toSystemStreamPartition(TopicPartition 
topicPartition) {
+    String topic = topicPartition.topic();
+    Partition partition = new Partition(topicPartition.partition());
+    return new SystemStreamPartition(systemName, topic, partition);
+  }
+
+  /**
+   * Uses {@code metadataConsumer} to fetch the metadata for the {@code 
topicPartitions}.
+   * Warning: If multiple threads call this with the same {@code 
metadataConsumer}, then this will not protect against
+   * concurrent access to the {@code metadataConsumer}.
+   */
+  private OffsetsMaps fetchTopicPartitionsMetadata(List<TopicPartition> 
topicPartitions) {
+    Map<SystemStreamPartition, String> oldestOffsets = new HashMap<>();
+    Map<SystemStreamPartition, String> newestOffsets = new HashMap<>();
+    Map<SystemStreamPartition, String> upcomingOffsets = new HashMap<>();
+
+    Map<TopicPartition, Long> oldestOffsetsWithLong = 
metadataConsumer.beginningOffsets(topicPartitions);
+    LOG.debug("Kafka-fetched beginningOffsets: {}", oldestOffsetsWithLong);
+    Map<TopicPartition, Long> upcomingOffsetsWithLong = 
metadataConsumer.endOffsets(topicPartitions);
+    LOG.debug("Kafka-fetched endOffsets: {}", upcomingOffsetsWithLong);
+
+    oldestOffsetsWithLong.forEach((topicPartition, offset) -> {
+      oldestOffsets.put(toSystemStreamPartition(topicPartition), 
String.valueOf(offset));
+    });
+
+    upcomingOffsetsWithLong.forEach((topicPartition, offset) -> {
+      upcomingOffsets.put(toSystemStreamPartition(topicPartition), 
String.valueOf(offset));
+
+      // Kafka's beginning Offset corresponds to the offset for the oldest 
message.
+      // Kafka's end offset corresponds to the offset for the upcoming 
message, and it is the newest offset + 1.
+      // When upcoming offset is <=0, the topic appears empty, we put oldest 
offset 0 and the newest offset null.
+      // When upcoming offset is >0, we subtract the upcoming offset by one 
for the newest offset.
+      // For normal case, the newest offset will correspond to the offset of 
the newest message in the stream;
+      // But for the big message, it is not the case. Seeking on the newest 
offset gives nothing for the newest big message.
+      // For now, we keep it as is for newest offsets the same as historical 
metadata structure.
+      if (offset <= 0) {
+        LOG.warn(
+            "Empty Kafka topic partition {} with upcoming offset {}. Skipping 
newest offset and setting oldest offset to 0 to consume from beginning",
+            topicPartition, offset);
+        oldestOffsets.put(toSystemStreamPartition(topicPartition), "0");
+      } else {
+        newestOffsets.put(toSystemStreamPartition(topicPartition), 
String.valueOf(offset - 1));
+      }
+    });
+    return new OffsetsMaps(oldestOffsets, newestOffsets, upcomingOffsets);
+  }
+
+  /**
+   * Fetch SystemStreamMetadata for each topic with the consumer
+   * @param topics set of topics to get metadata info for
+   * @return map of topic to SystemStreamMetadata
+   */
+  private Map<String, SystemStreamMetadata> 
fetchSystemStreamMetadata(Set<String> topics) {
+    Map<SystemStreamPartition, String> allOldestOffsets = new HashMap<>();
+    Map<SystemStreamPartition, String> allNewestOffsets = new HashMap<>();
+    Map<SystemStreamPartition, String> allUpcomingOffsets = new HashMap<>();
+
+    LOG.info("Fetching SystemStreamMetadata for topics {} on system {}", 
topics, systemName);
+
+    topics.forEach(topic -> {
+      List<PartitionInfo> partitionInfos = 
metadataConsumer.partitionsFor(topic);
+
+      if (partitionInfos == null) {
+        String msg = String.format("Partition info not(yet?) available for 
system %s topic %s", systemName, topic);
+        throw new SamzaException(msg);
+      }
+
+      List<TopicPartition> topicPartitions = partitionInfos.stream()
+          .map(partitionInfo -> new TopicPartition(partitionInfo.topic(), 
partitionInfo.partition()))
+          .collect(Collectors.toList());
+
+      OffsetsMaps offsetsForTopic = 
fetchTopicPartitionsMetadata(topicPartitions);
+      allOldestOffsets.putAll(offsetsForTopic.getOldestOffsets());
+      allNewestOffsets.putAll(offsetsForTopic.getNewestOffsets());
+      allUpcomingOffsets.putAll(offsetsForTopic.getUpcomingOffsets());
+    });
+
+    scala.collection.immutable.Map<String, SystemStreamMetadata> result =
+        
KafkaSystemAdminUtilsScala.assembleMetadata(ScalaJavaUtil.toScalaMap(allOldestOffsets),
+            ScalaJavaUtil.toScalaMap(allNewestOffsets), 
ScalaJavaUtil.toScalaMap(allUpcomingOffsets));
+
+    LOG.debug("assembled SystemStreamMetadata is: {}", result);
+    return JavaConverters.mapAsJavaMapConverter(result).asJava();
+  }
+
+  private String fetchNewestOffset(SystemStreamPartition ssp) {
+    LOG.debug("Fetching newest offset for {}", ssp);
+    String newestOffset;
+
+    TopicPartition topicPartition = new TopicPartition(ssp.getStream(), 
ssp.getPartition().getPartitionId());
+
+    // the offsets returned from the consumer is the Long type
+    Long upcomingOffset =
+        (Long) 
metadataConsumer.endOffsets(Collections.singletonList(topicPartition)).get(topicPartition);
+
+    // Kafka's "latest" offset is always last message in stream's offset + 1,
+    // so get newest message in stream by subtracting one. This is safe
+    // even for key-deduplicated streams, since the last message will
+    // never be deduplicated.
+    if (upcomingOffset <= 0) {
+      LOG.debug("Stripping newest offsets for {} because the topic appears 
empty.", topicPartition);
+      newestOffset = null;
+    } else {
+      newestOffset = String.valueOf(upcomingOffset - 1);
+    }
+
+    LOG.info("Newest offset for ssp {} is: {}", ssp, newestOffset);
+    return newestOffset;
+  }
+
+  @Override
+  public Integer offsetComparator(String offset1, String offset2) {
+    if (offset1 == null || offset2 == null) {
+      return -1;
+    }
+
+    return Long.valueOf(offset1).compareTo(Long.valueOf(offset2));
+  }
+
+  @Override
+  public boolean createStream(StreamSpec streamSpec) {
+    LOG.info("Creating Kafka topic: {} on system: {}", 
streamSpec.getPhysicalName(), streamSpec.getSystemName());
+
+    return KafkaSystemAdminUtilsScala.createStream(toKafkaSpec(streamSpec), 
getZkConnection);
+  }
+
+  @Override
+  public boolean clearStream(StreamSpec streamSpec) {
+    LOG.info("Creating Kafka topic: {} on system: {}", 
streamSpec.getPhysicalName(), streamSpec.getSystemName());
+
+    KafkaSystemAdminUtilsScala.clearStream(streamSpec, getZkConnection);
+
+    Map<String, List<PartitionInfo>> topicsMetadata = 
getTopicMetadata(ImmutableSet.of(streamSpec.getPhysicalName()));
+    return topicsMetadata.get(streamSpec.getPhysicalName()).isEmpty();
+  }
+
+  /**
+   * Converts a StreamSpec into a KafakStreamSpec. Special handling for 
coordinator and changelog stream.
+   * @param spec a StreamSpec object
+   * @return KafkaStreamSpec object
+   */
+  KafkaStreamSpec toKafkaSpec(StreamSpec spec) {
+    KafkaStreamSpec kafkaSpec;
+    if (spec.isChangeLogStream()) {
+      String topicName = spec.getPhysicalName();
+      ChangelogInfo topicMeta = changelogTopicMetaInformation.get(topicName);
+      if (topicMeta == null) {
+        throw new StreamValidationException("Unable to find topic information 
for topic " + topicName);
+      }
+
+      kafkaSpec = new KafkaStreamSpec(spec.getId(), topicName, systemName, 
spec.getPartitionCount(),
+          topicMeta.replicationFactor(), topicMeta.kafkaProps());
+    } else if (spec.isCoordinatorStream()) {
+      kafkaSpec =
+          new KafkaStreamSpec(spec.getId(), spec.getPhysicalName(), 
systemName, 1, coordinatorStreamReplicationFactor,
+              coordinatorStreamProperties);
+    } else if (intermediateStreamProperties.containsKey(spec.getId())) {
+      kafkaSpec = 
KafkaStreamSpec.fromSpec(spec).copyWithProperties(intermediateStreamProperties.get(spec.getId()));
+    } else {
+      kafkaSpec = KafkaStreamSpec.fromSpec(spec);
+    }
+    return kafkaSpec;
+  }
+
+  @Override
+  public void validateStream(StreamSpec streamSpec) throws 
StreamValidationException {
+    LOG.info("About to validate stream = " + streamSpec);
+
+    String streamName = streamSpec.getPhysicalName();
+    SystemStreamMetadata systemStreamMetadata =
+        
getSystemStreamMetadata(Collections.singleton(streamName)).get(streamName);
+    if (systemStreamMetadata == null) {
+      throw new StreamValidationException(
+          "Failed to obtain metadata for stream " + streamName + ". Validation 
failed.");
+    }
+
+    int actualPartitionCounter = 
systemStreamMetadata.getSystemStreamPartitionMetadata().size();
+    int expectedPartitionCounter = streamSpec.getPartitionCount();
+    LOG.info("actualCount=" + actualPartitionCounter + "; expectedCount=" + 
expectedPartitionCounter);
+    if (actualPartitionCounter != expectedPartitionCounter) {
+      throw new StreamValidationException(
+          String.format("Mismatch of partitions for stream %s. Expected %d, 
got %d. Validation failed.", streamName,
+              expectedPartitionCounter, actualPartitionCounter));
+    }
+  }
+
+  // get partition info for topic
+  Map<String, List<PartitionInfo>> getTopicMetadata(Set<String> topics) {
+    Map<String, List<PartitionInfo>> streamToPartitionsInfo = new HashMap();
+    List<PartitionInfo> partitionInfoList;
+    for (String topic : topics) {
+      partitionInfoList = metadataConsumer.partitionsFor(topic);
+      streamToPartitionsInfo.put(topic, partitionInfoList);
+    }
+
+    return streamToPartitionsInfo;
+  }
+
+  /**
+   * Delete records up to (and including) the provided ssp offsets for
+   * all system stream partitions specified in the map.
+   * This only works with Kafka cluster 0.11 or later. Otherwise it's a no-op.
+   * @param offsets specifies up to what offsets the messages should be deleted
+   */
+  @Override
+  public void deleteMessages(Map<SystemStreamPartition, String> offsets) {
+    if (deleteCommittedMessages) {
+      KafkaSystemAdminUtilsScala.deleteMessages(adminClient, offsets);
+      deleteMessageCalled = true;
+    }
+  }
+
+  /**
+   * Container for metadata about offsets.
+   */
+  private static class OffsetsMaps {
+    private final Map<SystemStreamPartition, String> oldestOffsets;
+    private final Map<SystemStreamPartition, String> newestOffsets;
+    private final Map<SystemStreamPartition, String> upcomingOffsets;
+
+    private OffsetsMaps(Map<SystemStreamPartition, String> oldestOffsets,
+        Map<SystemStreamPartition, String> newestOffsets, 
Map<SystemStreamPartition, String> upcomingOffsets) {
+      this.oldestOffsets = oldestOffsets;
+      this.newestOffsets = newestOffsets;
+      this.upcomingOffsets = upcomingOffsets;
+    }
+
+    private Map<SystemStreamPartition, String> getOldestOffsets() {
+      return oldestOffsets;
+    }
+
+    private Map<SystemStreamPartition, String> getNewestOffsets() {
+      return newestOffsets;
+    }
+
+    private Map<SystemStreamPartition, String> getUpcomingOffsets() {
+      return upcomingOffsets;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/63d33fa0/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaSystemConsumer.java
----------------------------------------------------------------------
diff --git 
a/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaSystemConsumer.java
 
b/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaSystemConsumer.java
new file mode 100644
index 0000000..65d0e42
--- /dev/null
+++ 
b/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaSystemConsumer.java
@@ -0,0 +1,366 @@
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.samza.system.kafka;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import kafka.common.TopicAndPartition;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.samza.SamzaException;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.KafkaConfig;
+import org.apache.samza.system.IncomingMessageEnvelope;
+import org.apache.samza.system.SystemConsumer;
+import org.apache.samza.system.SystemStreamPartition;
+import org.apache.samza.util.BlockingEnvelopeMap;
+import org.apache.samza.util.Clock;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.Option;
+
+
+public class KafkaSystemConsumer<K, V> extends BlockingEnvelopeMap implements 
SystemConsumer {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(KafkaSystemConsumer.class);
+
+  private static final long FETCH_THRESHOLD = 50000;
+  private static final long FETCH_THRESHOLD_BYTES = -1L;
+
+  protected final Consumer<K, V> kafkaConsumer;
+  protected final String systemName;
+  protected final String clientId;
+  private final AtomicBoolean stopped = new AtomicBoolean(false);
+  private final AtomicBoolean started = new AtomicBoolean(false);
+  private final Config config;
+  private final boolean fetchThresholdBytesEnabled;
+  private final KafkaSystemConsumerMetrics metrics;
+
+  // This sink is used to transfer the messages from the proxy/consumer to the 
BlockingEnvelopeMap.
+  final KafkaConsumerMessageSink messageSink;
+
+  // This proxy contains a separate thread, which reads kafka messages (with 
consumer.poll()) and populates
+  // BlockingEnvelopMap's buffers.
+  final private KafkaConsumerProxy proxy;
+
+  // keep registration data until the start - mapping between registered SSPs 
and topicPartitions, and their offsets
+  final Map<TopicPartition, String> topicPartitionsToOffset = new HashMap<>();
+  final Map<TopicPartition, SystemStreamPartition> topicPartitionsToSSP = new 
HashMap<>();
+
+  long perPartitionFetchThreshold;
+  long perPartitionFetchThresholdBytes;
+
+  /**
+   * Create a KafkaSystemConsumer for the provided {@code systemName}
+   * @param systemName system name for which we create the consumer
+   * @param config application config
+   * @param metrics metrics for this KafkaSystemConsumer
+   * @param clock system clock
+   */
+  public KafkaSystemConsumer(Consumer<K, V> kafkaConsumer, String systemName, 
Config config, String clientId,
+      KafkaSystemConsumerMetrics metrics, Clock clock) {
+
+    super(metrics.registry(), clock, metrics.getClass().getName());
+
+    this.kafkaConsumer = kafkaConsumer;
+    this.clientId = clientId;
+    this.systemName = systemName;
+    this.config = config;
+    this.metrics = metrics;
+
+    fetchThresholdBytesEnabled = new 
KafkaConfig(config).isConsumerFetchThresholdBytesEnabled(systemName);
+
+    // create a sink for passing the messages between the proxy and the 
consumer
+    messageSink = new KafkaConsumerMessageSink();
+
+    // Create the proxy to do the actual message reading.
+    String metricName = String.format("%s-%s", systemName, clientId);
+    proxy = new KafkaConsumerProxy(kafkaConsumer, systemName, clientId, 
messageSink, metrics, metricName);
+    LOG.info("{}: Created KafkaConsumerProxy {} ", this, proxy);
+  }
+
+  /**
+   * Create internal kafka consumer object, which will be used in the Proxy.
+   * @param systemName system name for which we create the consumer
+   * @param kafkaConsumerConfig config object for Kafka's KafkaConsumer
+   * @return KafkaConsumer object
+   */
+  public static <K,V> KafkaConsumer<K, V> createKafkaConsumerImpl(String 
systemName,
+      HashMap<String, Object> kafkaConsumerConfig) {
+
+    LOG.info("Instantiating KafkaConsumer for systemName {} with properties 
{}", systemName, kafkaConsumerConfig);
+    return new KafkaConsumer<>(kafkaConsumerConfig);
+  }
+
+  @Override
+  public void start() {
+    if (!started.compareAndSet(false, true)) {
+      LOG.warn("{}: Attempting to start the consumer for the second (or more) 
time.", this);
+      return;
+    }
+    if (stopped.get()) {
+      LOG.error("{}: Attempting to start a stopped consumer", this);
+      return;
+    }
+    // initialize the subscriptions for all the registered TopicPartitions
+    startSubscription();
+    // needs to be called after all the registrations are completed
+    setFetchThresholds();
+
+    startConsumer();
+    LOG.info("{}: Consumer started", this);
+  }
+
+  private void startSubscription() {
+    //subscribe to all the registered TopicPartitions
+    LOG.info("{}: Consumer subscribes to {}", this, 
topicPartitionsToSSP.keySet());
+    try {
+      synchronized (kafkaConsumer) {
+        // we are using assign (and not subscribe), so we need to specify both 
topic and partition
+        kafkaConsumer.assign(topicPartitionsToSSP.keySet());
+      }
+    } catch (Exception e) {
+      throw new SamzaException("Consumer subscription failed for " + this, e);
+    }
+  }
+
+  /**
+   * Set the offsets to start from.
+   * Register the TopicPartitions with the proxy.
+   * Start the proxy.
+   */
+  void startConsumer() {
+    // set the offset for each TopicPartition
+    if (topicPartitionsToOffset.size() <= 0) {
+      LOG.error ("{}: Consumer is not subscribed to any SSPs", this);
+    }
+
+    topicPartitionsToOffset.forEach((tp, startingOffsetString) -> {
+      long startingOffset = Long.valueOf(startingOffsetString);
+
+      try {
+        synchronized (kafkaConsumer) {
+          kafkaConsumer.seek(tp, startingOffset); // this value should already 
be the 'upcoming' value
+        }
+      } catch (Exception e) {
+        // all recoverable execptions are handled by the client.
+        // if we get here there is nothing left to do but bail out.
+        String msg =
+            String.format("%s: Got Exception while seeking to %s for partition 
%s", this, startingOffsetString, tp);
+        LOG.error(msg, e);
+        throw new SamzaException(msg, e);
+      }
+
+      LOG.info("{}: Changing consumer's starting offset for tp = %s to %s", 
this, tp, startingOffsetString);
+
+      // add the partition to the proxy
+      proxy.addTopicPartition(topicPartitionsToSSP.get(tp), startingOffset);
+    });
+
+    // start the proxy thread
+    if (proxy != null && !proxy.isRunning()) {
+      LOG.info("{}: Starting proxy {}", this, proxy);
+      proxy.start();
+    }
+  }
+
+  private void setFetchThresholds() {
+    // get the thresholds, and set defaults if not defined.
+    KafkaConfig kafkaConfig = new KafkaConfig(config);
+
+    Option<String> fetchThresholdOption = 
kafkaConfig.getConsumerFetchThreshold(systemName);
+    long fetchThreshold = FETCH_THRESHOLD;
+    if (fetchThresholdOption.isDefined()) {
+      fetchThreshold = Long.valueOf(fetchThresholdOption.get());
+    }
+
+    Option<String> fetchThresholdBytesOption = 
kafkaConfig.getConsumerFetchThresholdBytes(systemName);
+    long fetchThresholdBytes = FETCH_THRESHOLD_BYTES;
+    if (fetchThresholdBytesOption.isDefined()) {
+      fetchThresholdBytes = Long.valueOf(fetchThresholdBytesOption.get());
+    }
+
+    int numPartitions = topicPartitionsToSSP.size();
+    if (numPartitions != topicPartitionsToOffset.size()) {
+      throw new SamzaException("topicPartitionsToSSP.size() doesn't match 
topicPartitionsToOffset.size()");
+    }
+
+
+    if (numPartitions > 0) {
+      perPartitionFetchThreshold = fetchThreshold / numPartitions;
+      if (fetchThresholdBytesEnabled) {
+        // currently this feature cannot be enabled, because we do not have 
the size of the messages available.
+        // messages get double buffered, hence divide by 2
+        perPartitionFetchThresholdBytes = (fetchThresholdBytes / 2) / 
numPartitions;
+      }
+    }
+    LOG.info("{}: fetchThresholdBytes = {}; fetchThreshold={}; 
numPartitions={}, perPartitionFetchThreshold={}, 
perPartitionFetchThresholdBytes(0 if disabled)={}",
+        this, fetchThresholdBytes, fetchThreshold, numPartitions, 
perPartitionFetchThreshold, perPartitionFetchThresholdBytes);
+  }
+
+  @Override
+  public void stop() {
+    if (!stopped.compareAndSet(false, true)) {
+      LOG.warn("{}: Attempting to stop stopped consumer.", this);
+      return;
+    }
+
+    LOG.info("{}: Stopping Samza kafkaConsumer ", this);
+
+    // stop the proxy (with 1 minute timeout)
+    if (proxy != null) {
+      LOG.info("{}: Stopping proxy {}", this, proxy);
+      proxy.stop(TimeUnit.SECONDS.toMillis(60));
+    }
+
+    try {
+      synchronized (kafkaConsumer) {
+        LOG.info("{}: Closing kafkaSystemConsumer {}", this, kafkaConsumer);
+        kafkaConsumer.close();
+      }
+    } catch (Exception e) {
+      LOG.warn("{}: Failed to stop KafkaSystemConsumer.", this, e);
+    }
+  }
+
+  /**
+   * record the ssp and the offset. Do not submit it to the consumer yet.
+   * @param systemStreamPartition ssp to register
+   * @param offset offset to register with
+   */
+  @Override
+  public void register(SystemStreamPartition systemStreamPartition, String 
offset) {
+    if (started.get()) {
+      String msg = String.format("%s: Trying to register partition after 
consumer has been started. ssp=%s", this,
+          systemStreamPartition);
+      throw new SamzaException(msg);
+    }
+
+    if (!systemStreamPartition.getSystem().equals(systemName)) {
+      LOG.warn("{}: ignoring SSP {}, because this consumer's system doesn't 
match.", this, systemStreamPartition);
+      return;
+    }
+    LOG.info("{}: Registering ssp = {} with offset {}", this, 
systemStreamPartition, offset);
+
+    super.register(systemStreamPartition, offset);
+
+    TopicPartition tp = toTopicPartition(systemStreamPartition);
+
+    topicPartitionsToSSP.put(tp, systemStreamPartition);
+
+    String existingOffset = topicPartitionsToOffset.get(tp);
+    // register the older (of the two) offset in the consumer, to guarantee we 
do not miss any messages.
+    if (existingOffset == null || compareOffsets(existingOffset, offset) > 0) {
+      topicPartitionsToOffset.put(tp, offset);
+    }
+
+    metrics.registerTopicAndPartition(toTopicAndPartition(tp));
+  }
+
+  /**
+   * Compare two String offsets.
+   * Note. There is a method in KafkaSystemAdmin that does that, but that 
would require instantiation of systemadmin for each consumer.
+   * @return see {@link Long#compareTo(Long)}
+   */
+  private static int compareOffsets(String offset1, String offset2) {
+    return Long.valueOf(offset1).compareTo(Long.valueOf(offset2));
+  }
+
+  @Override
+  public String toString() {
+    return String.format("%s:%s", systemName, clientId);
+  }
+
+  @Override
+  public Map<SystemStreamPartition, List<IncomingMessageEnvelope>> poll(
+      Set<SystemStreamPartition> systemStreamPartitions, long timeout) throws 
InterruptedException {
+
+    // check if the proxy is running
+    if (!proxy.isRunning()) {
+      stop();
+      String message = String.format("%s: KafkaConsumerProxy has stopped.", 
this);
+      throw new SamzaException(message, proxy.getFailureCause());
+    }
+
+    return super.poll(systemStreamPartitions, timeout);
+  }
+
+  /**
+   * convert from TopicPartition to TopicAndPartition
+   */
+  public static TopicAndPartition toTopicAndPartition(TopicPartition tp) {
+    return new TopicAndPartition(tp.topic(), tp.partition());
+  }
+
+  /**
+   * convert to TopicPartition from SystemStreamPartition
+   */
+  public static TopicPartition toTopicPartition(SystemStreamPartition ssp) {
+    return new TopicPartition(ssp.getStream(), 
ssp.getPartition().getPartitionId());
+  }
+
+  /**
+   * return system name for this consumer
+   * @return system name
+   */
+  public String getSystemName() {
+    return systemName;
+  }
+
+  public class KafkaConsumerMessageSink {
+
+    public void setIsAtHighWatermark(SystemStreamPartition ssp, boolean 
isAtHighWatermark) {
+      setIsAtHead(ssp, isAtHighWatermark);
+    }
+
+    boolean needsMoreMessages(SystemStreamPartition ssp) {
+      LOG.debug("{}: needsMoreMessages from following SSP: {}. 
fetchLimitByBytes enabled={}; messagesSizeInQueue={};"
+              + "(limit={}); messagesNumInQueue={}(limit={};", this, ssp, 
fetchThresholdBytesEnabled,
+          getMessagesSizeInQueue(ssp), perPartitionFetchThresholdBytes, 
getNumMessagesInQueue(ssp),
+          perPartitionFetchThreshold);
+
+      if (fetchThresholdBytesEnabled) {
+        return getMessagesSizeInQueue(ssp) < perPartitionFetchThresholdBytes;
+      } else {
+        return getNumMessagesInQueue(ssp) < perPartitionFetchThreshold;
+      }
+    }
+
+    void addMessage(SystemStreamPartition ssp, IncomingMessageEnvelope 
envelope) {
+      LOG.trace("{}: Incoming message ssp = {}: envelope = {}.", this, ssp, 
envelope);
+
+      try {
+        put(ssp, envelope);
+      } catch (InterruptedException e) {
+        throw new SamzaException(
+            String.format("%s: Consumer was interrupted while trying to add 
message with offset %s for ssp %s", this,
+                envelope.getOffset(), ssp));
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/63d33fa0/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala
----------------------------------------------------------------------
diff --git 
a/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala 
b/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala
index e5cca36..f492518 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala
+++ b/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala
@@ -62,6 +62,11 @@ object KafkaConfig {
   val JOB_COORDINATOR_REPLICATION_FACTOR = "job.coordinator." + 
TOPIC_REPLICATION_FACTOR
   val JOB_COORDINATOR_SEGMENT_BYTES = "job.coordinator." + SEGMENT_BYTES
 
+  val CONSUMER_CONFIGS_CONFIG_KEY = "systems.%s.consumer.%s"
+  val PRODUCER_BOOTSTRAP_SERVERS_CONFIG_KEY = 
"systems.%s.producer.bootstrap.servers"
+  val PRODUCER_CONFIGS_CONFIG_KEY = "systems.%s.producer.%s"
+  val CONSUMER_ZK_CONNECT_CONFIG_KEY = "systems.%s.consumer.zookeeper.connect"
+
   /**
     * Defines how low a queue can get for a single system/stream/partition
     * combination before trying to fetch more messages for it.

http://git-wip-us.apache.org/repos/asf/samza/blob/63d33fa0/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConsumerConfig.java
----------------------------------------------------------------------
diff --git 
a/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConsumerConfig.java 
b/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConsumerConfig.java
deleted file mode 100644
index 6cebc28..0000000
--- 
a/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConsumerConfig.java
+++ /dev/null
@@ -1,201 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-package org.apache.samza.config;
-
-import java.util.HashMap;
-import java.util.Map;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.kafka.clients.consumer.ConsumerConfig;
-import org.apache.kafka.clients.consumer.RangeAssignor;
-import org.apache.kafka.common.serialization.ByteArrayDeserializer;
-import org.apache.samza.SamzaException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import scala.Option;
-
-
-/**
- * The configuration class for KafkaConsumer
- */
-public class KafkaConsumerConfig extends HashMap<String, Object> {
-
-  public static final Logger LOG = 
LoggerFactory.getLogger(KafkaConsumerConfig.class);
-
-  static final String PRODUCER_CLIENT_ID_PREFIX = "kafka-producer";
-  static final String CONSUMER_CLIENT_ID_PREFIX = "kafka-consumer";
-  static final String ADMIN_CLIENT_ID_PREFIX = "samza-admin";
-
-  /*
-   * By default, KafkaConsumer will fetch some big number of available 
messages for all the partitions.
-   * This may cause memory issues. That's why we will limit the number of 
messages per partition we get on EACH poll().
-   */
-  static final String DEFAULT_KAFKA_CONSUMER_MAX_POLL_RECORDS = "100";
-
-  private KafkaConsumerConfig(Map<String, Object> map) {
-    super(map);
-  }
-
-  /**
-   * Helper method to create configs for use in Kafka consumer.
-   * The values are based on the "consumer" subset of the configs provided by 
the app and Samza overrides.
-   *
-   * @param config config provided by the app.
-   * @param systemName system name to get the consumer configuration for.
-   * @param clientId client id to be used in the Kafka consumer.
-   * @return KafkaConsumerConfig
-   */
-  public static KafkaConsumerConfig getKafkaSystemConsumerConfig(Config 
config, String systemName, String clientId) {
-
-    Config subConf = config.subset(String.format("systems.%s.consumer.", 
systemName), true);
-
-    //Kafka client configuration
-    String groupId = getConsumerGroupId(config);
-
-    Map<String, Object> consumerProps = new HashMap<>();
-    consumerProps.putAll(subConf);
-
-    consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
-    consumerProps.put(ConsumerConfig.CLIENT_ID_CONFIG, clientId);
-
-    // These are values we enforce in sazma, and they cannot be overwritten.
-
-    // Disable consumer auto-commit because Samza controls commits
-    consumerProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
-
-    // Translate samza config value to kafka config value
-    consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
-        getAutoOffsetResetValue((String) 
consumerProps.get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG)));
-
-    // if consumer bootstrap servers are not configured, get them from the 
producer configs
-    if (!subConf.containsKey(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG)) {
-      String bootstrapServers =
-          config.get(String.format("systems.%s.producer.%s", systemName, 
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG));
-      if (StringUtils.isEmpty(bootstrapServers)) {
-        throw new SamzaException("Missing " + 
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG + " config  for " + systemName);
-      }
-      consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 
bootstrapServers);
-    }
-
-    // Always use default partition assignment strategy. Do not allow override.
-    consumerProps.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, 
RangeAssignor.class.getName());
-
-    // the consumer is fully typed, and deserialization can be too. But in 
case it is not provided we should
-    // default to byte[]
-    if 
(!consumerProps.containsKey(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG)) {
-      LOG.info("setting key serialization for the consumer(for system {}) to 
ByteArrayDeserializer", systemName);
-      consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
ByteArrayDeserializer.class.getName());
-    }
-    if 
(!consumerProps.containsKey(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG)) {
-      LOG.info("setting value serialization for the consumer(for system {}) to 
ByteArrayDeserializer", systemName);
-      consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
ByteArrayDeserializer.class.getName());
-    }
-
-    // Override default max poll config if there is no value
-    consumerProps.computeIfAbsent(ConsumerConfig.MAX_POLL_RECORDS_CONFIG,
-        (k) -> DEFAULT_KAFKA_CONSUMER_MAX_POLL_RECORDS);
-
-    return new KafkaConsumerConfig(consumerProps);
-  }
-
-  // group id should be unique per job
-  static String getConsumerGroupId(Config config) {
-    JobConfig jobConfig = new JobConfig(config);
-    Option jobNameOption = jobConfig.getName();
-    if (jobNameOption.isEmpty()) {
-      throw new ConfigException("Missing job name");
-    }
-    String jobName = (String) jobNameOption.get();
-
-    String jobId = jobConfig.getJobId();
-
-    return String.format("%s-%s", jobName, jobId);
-  }
-
-  // client id should be unique per job
-  public static String getConsumerClientId(Config config) {
-    return getConsumerClientId(CONSUMER_CLIENT_ID_PREFIX, config);
-  }
-
-  public static String getProducerClientId(Config config) {
-    return getConsumerClientId(PRODUCER_CLIENT_ID_PREFIX, config);
-  }
-
-  public static String getAdminClientId(Config config) {
-    return getConsumerClientId(ADMIN_CLIENT_ID_PREFIX, config);
-  }
-
-  static String getConsumerClientId(String id, Config config) {
-    JobConfig jobConfig = new JobConfig(config);
-    Option jobNameOption = jobConfig.getName();
-    if (jobNameOption.isEmpty()) {
-      throw new ConfigException("Missing job name");
-    }
-    String jobName = (String) jobNameOption.get();
-
-    String jobId = jobConfig.getJobId();
-
-    return String.format("%s-%s-%s", id.replaceAll("\\W", "_"), 
jobName.replaceAll("\\W", "_"),
-        jobId.replaceAll("\\W", "_"));
-  }
-
-  /**
-   * If settings for auto.reset in samza are different from settings in Kafka 
(auto.offset.reset),
-   * then need to convert them (see kafka.apache.org/documentation):
-   * "largest" -> "latest"
-   * "smallest" -> "earliest"
-   *
-   * If no setting specified we return "latest" (same as Kafka).
-   * @param autoOffsetReset value from the app provided config
-   * @return String representing the config value for "auto.offset.reset" 
property
-   */
-  static String getAutoOffsetResetValue(final String autoOffsetReset) {
-    final String SAMZA_OFFSET_LARGEST = "largest";
-    final String SAMZA_OFFSET_SMALLEST = "smallest";
-    final String KAFKA_OFFSET_LATEST = "latest";
-    final String KAFKA_OFFSET_EARLIEST = "earliest";
-    final String KAFKA_OFFSET_NONE = "none";
-
-    if (autoOffsetReset == null) {
-      return KAFKA_OFFSET_LATEST; // return default
-    }
-
-    // accept kafka values directly
-    if (autoOffsetReset.equals(KAFKA_OFFSET_EARLIEST) || 
autoOffsetReset.equals(KAFKA_OFFSET_LATEST)
-        || autoOffsetReset.equals(KAFKA_OFFSET_NONE)) {
-      return autoOffsetReset;
-    }
-
-    String newAutoOffsetReset;
-    switch (autoOffsetReset) {
-      case SAMZA_OFFSET_LARGEST:
-        newAutoOffsetReset = KAFKA_OFFSET_LATEST;
-        break;
-      case SAMZA_OFFSET_SMALLEST:
-        newAutoOffsetReset = KAFKA_OFFSET_EARLIEST;
-        break;
-      default:
-        newAutoOffsetReset = KAFKA_OFFSET_LATEST;
-    }
-    LOG.info("AutoOffsetReset value converted from {} to {}", autoOffsetReset, 
newAutoOffsetReset);
-    return newAutoOffsetReset;
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/samza/blob/63d33fa0/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaConsumerProxy.java
----------------------------------------------------------------------
diff --git 
a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaConsumerProxy.java
 
b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaConsumerProxy.java
index 04071c1..e47add7 100644
--- 
a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaConsumerProxy.java
+++ 
b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaConsumerProxy.java
@@ -90,6 +90,8 @@ class KafkaConsumerProxy<K, V> {
     consumerPollThread.setDaemon(true);
     consumerPollThread.setName(
         "Samza KafkaConsumerProxy Poll " + consumerPollThread.getName() + " - 
" + systemName);
+
+    LOG.info("Creating KafkaConsumerProxy with systeName={}, clientId={}, 
metricsName={}", systemName, clientId, metricName);
   }
 
   /**

Reply via email to