added new samza kafka system consumer using new kafka consumer

Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/c0ea25cb
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/c0ea25cb
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/c0ea25cb

Branch: refs/heads/NewKafkaSystemConsumer
Commit: c0ea25cbc674a1d67546f7f47a6f36f6ee58bdc6
Parents: 7254460
Author: Boris S <[email protected]>
Authored: Wed Aug 29 10:52:30 2018 -0700
Committer: Boris S <[email protected]>
Committed: Wed Aug 29 10:52:30 2018 -0700

----------------------------------------------------------------------
 .../clients/consumer/KafkaConsumerConfig.java   | 15 ++-
 .../samza/system/kafka/KafkaConsumerProxy.java  |  7 +-
 .../samza/system/kafka/KafkaSystemFactory.scala | 59 +-----------
 .../system/kafka/NewKafkaSystemConsumer.java    | 97 +++++++++++++-------
 4 files changed, 80 insertions(+), 98 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/c0ea25cb/samza-kafka/src/main/scala/org/apache/kafka/clients/consumer/KafkaConsumerConfig.java
----------------------------------------------------------------------
diff --git 
a/samza-kafka/src/main/scala/org/apache/kafka/clients/consumer/KafkaConsumerConfig.java
 
b/samza-kafka/src/main/scala/org/apache/kafka/clients/consumer/KafkaConsumerConfig.java
index 97360e2..b29a041 100644
--- 
a/samza-kafka/src/main/scala/org/apache/kafka/clients/consumer/KafkaConsumerConfig.java
+++ 
b/samza-kafka/src/main/scala/org/apache/kafka/clients/consumer/KafkaConsumerConfig.java
@@ -40,6 +40,7 @@ public class KafkaConsumerConfig extends ConsumerConfig {
   private static final String SAMZA_OFFSET_SMALLEST = "smallest";
   private static final String KAFKA_OFFSET_LATEST = "latest";
   private static final String KAFKA_OFFSET_EARLIEST = "earliest";
+  private static final String KAFKA_OFFSET_NONE = "none";
   /*
    * By default, KafkaConsumer will fetch ALL available messages for all the 
partitions.
    * This may cause memory issues. That's why we will limit the number of 
messages per partition we get on EACH poll().
@@ -64,16 +65,14 @@ public class KafkaConsumerConfig extends ConsumerConfig {
     consumerProps.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId);
     consumerProps.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, clientId);
 
-    /********************************************
-     * Open-source Kafka Consumer configuration *
-     *******************************************/
+    //Open-source Kafka Consumer configuration
     consumerProps.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, 
"false"); // Disable consumer auto-commit
 
     consumerProps.setProperty(
         ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
         getAutoOffsetResetValue(consumerProps));  // Translate samza config 
value to kafka config value
 
-    // makesure bootstrap configs are in ?? SHOULD WE FAIL IF THEY ARE NOT?
+    // make sure bootstrap configs are in ?? SHOULD WE FAIL IF THEY ARE NOT?
     if (! subConf.containsKey(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG)) {
       // get it from the producer config
       String bootstrapServer = 
config.get(String.format("systems.%s.producer.%s", systemName, 
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG));
@@ -139,6 +138,14 @@ public class KafkaConsumerConfig extends ConsumerConfig {
    */
   static String getAutoOffsetResetValue(Properties properties) {
     String autoOffsetReset = 
properties.getProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, 
KAFKA_OFFSET_LATEST);
+
+    // accept kafka values directly
+    if (autoOffsetReset.equals(KAFKA_OFFSET_EARLIEST) ||
+        autoOffsetReset.equals(KAFKA_OFFSET_LATEST) ||
+        autoOffsetReset.equals(KAFKA_OFFSET_NONE)) {
+      return autoOffsetReset;
+    }
+
     switch (autoOffsetReset) {
       case SAMZA_OFFSET_LARGEST:
         return KAFKA_OFFSET_LATEST;

http://git-wip-us.apache.org/repos/asf/samza/blob/c0ea25cb/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaConsumerProxy.java
----------------------------------------------------------------------
diff --git 
a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaConsumerProxy.java
 
b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaConsumerProxy.java
index 66971af..01b345a 100644
--- 
a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaConsumerProxy.java
+++ 
b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaConsumerProxy.java
@@ -69,7 +69,6 @@ public class KafkaConsumerProxy<K, V> {
   private final Map<SystemStreamPartition, Long> nextOffsets = new 
ConcurrentHashMap<>();
   // lags behind the high water mark, as reported by the Kafka consumer.
   private final Map<SystemStreamPartition, Long> latestLags = new HashMap<>();
-  private final NewKafkaSystemConsumer.ValueUnwrapper<V> valueUnwrapper;
 
   private volatile boolean isRunning = false;
   private volatile Throwable failureCause = null;
@@ -77,7 +76,7 @@ public class KafkaConsumerProxy<K, V> {
 
   public KafkaConsumerProxy(Consumer<K, V> kafkaConsumer, String systemName, 
String clientId,
       NewKafkaSystemConsumer.KafkaConsumerMessageSink messageSink, 
KafkaSystemConsumerMetrics samzaConsumerMetrics,
-      String metricName, NewKafkaSystemConsumer.ValueUnwrapper<V> 
valueUnwrapper) {
+      String metricName) {
 
     this.kafkaConsumer = kafkaConsumer;
     this.systemName = systemName;
@@ -85,7 +84,6 @@ public class KafkaConsumerProxy<K, V> {
     this.kafkaConsumerMetrics = samzaConsumerMetrics;
     this.metricName = metricName;
     this.clientId = clientId;
-    this.valueUnwrapper = valueUnwrapper;
 
     // TODO - see if we need new metrics (not host:port based)
     this.kafkaConsumerMetrics.registerBrokerProxy(metricName, 0);
@@ -257,8 +255,7 @@ public class KafkaConsumerProxy<K, V> {
       //}
 
       final K key = r.key();
-      final Object value =
-          valueUnwrapper == null ? r.value() : 
valueUnwrapper.unwrapValue(ssp.getSystemStream(), r.value());
+      final Object value = r.value();
       IncomingMessageEnvelope imEnvelope =
           new IncomingMessageEnvelope(ssp, String.valueOf(r.offset()), key, 
value, msgSize);
       listMsgs.add(imEnvelope);

http://git-wip-us.apache.org/repos/asf/samza/blob/c0ea25cb/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala
----------------------------------------------------------------------
diff --git 
a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala
 
b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala
index c7f6aed..6a5eda9 100644
--- 
a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala
+++ 
b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala
@@ -55,64 +55,9 @@ class KafkaSystemFactory extends SystemFactory with Logging {
     val clientId = KafkaUtil.getClientId("samza-consumer", config)
     val metrics = new KafkaSystemConsumerMetrics(systemName, registry)
 
-    // Kind of goofy to need a producer config for consumers, but we need 
metadata.
-    val producerConfig = config.getKafkaSystemProducerConfig(systemName, 
clientId)
-    val bootstrapServers = producerConfig.bootsrapServers
-    //val consumerConfig = config.getKafkaSystemConsumerConfig(systemName, 
clientId)
-
-    //val kafkaConfig = new KafkaConfig(config)
-
-
-   // val timeout = consumerConfig.socketTimeoutMs
-    //val bufferSize = consumerConfig.socketReceiveBufferBytes
-    //val fetchSize = new 
StreamFetchSizes(consumerConfig.fetchMessageMaxBytes, 
config.getFetchMessageMaxBytesTopics(systemName))
-    //val consumerMinSize = consumerConfig.fetchMinBytes
-    //val consumerMaxWait = consumerConfig.fetchWaitMaxMs
-    //val autoOffsetResetDefault = consumerConfig.autoOffsetReset
-    val autoOffsetResetTopics = config.getAutoOffsetResetTopics(systemName)
-    val fetchThreshold = 
config.getConsumerFetchThreshold(systemName).getOrElse("50000").toInt
-    val fetchThresholdBytes = 
config.getConsumerFetchThresholdBytes(systemName).getOrElse("-1").toLong
-    //val offsetGetter = new GetOffset(autoOffsetResetDefault, 
autoOffsetResetTopics)
-    //val metadataStore = new ClientUtilTopicMetadataStore(bootstrapServers, 
clientId, timeout)
-
-
-    val kafkaConsumer: KafkaConsumer[Array[Byte], Array[Byte]] =
-      NewKafkaSystemConsumer.getKafkaConsumerImpl(systemName, clientId, config)
-
-    def valueUnwrapper: NewKafkaSystemConsumer.ValueUnwrapper[Array[Byte]] = 
null;// TODO add real unrapper from
-    val kc = new NewKafkaSystemConsumer (
-      kafkaConsumer, systemName, config, clientId,
-      metrics, new SystemClock, false, valueUnwrapper)
-
-    kc
-    /*
-      new KafkaSystemConsumer(
-      systemName = systemName,
-      systemAdmin = getAdmin(systemName, config),
-      metrics = metrics,
-      metadataStore = metadataStore,
-      clientId = clientId,
-      timeout = timeout,
-      bufferSize = bufferSize,
-      fetchSize = fetchSize,
-      consumerMinSize = consumerMinSize,
-      consumerMaxWait = consumerMaxWait,
-      fetchThreshold = fetchThreshold,
-      fetchThresholdBytes = fetchThresholdBytes,
-      fetchLimitByBytesEnabled = 
config.isConsumerFetchThresholdBytesEnabled(systemName),
-      offsetGetter = offsetGetter)
-      */
-  }
-
-  /*
-  def getKafkaConsumerImpl(systemName: String, config: KafkaConfig) = {
-    info("Consumer properties in getKafkaConsumerImpl: systemName: {}, 
consumerProperties: {}", systemName, config)
-
-    val byteArrayDeserializer = new ByteArrayDeserializer
-    new KafkaConsumer[Array[Byte], 
Array[Byte]](config.configForVanillaConsumer(),
-      byteArrayDeserializer, byteArrayDeserializer)
+    NewKafkaSystemConsumer.getNewKafkaSystemConsumer(
+      systemName, config, clientId, metrics, new SystemClock)
   }
-  */
 
   def getProducer(systemName: String, config: Config, registry: 
MetricsRegistry): SystemProducer = {
     val clientId = KafkaUtil.getClientId("samza-producer", config)

http://git-wip-us.apache.org/repos/asf/samza/blob/c0ea25cb/samza-kafka/src/main/scala/org/apache/samza/system/kafka/NewKafkaSystemConsumer.java
----------------------------------------------------------------------
diff --git 
a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/NewKafkaSystemConsumer.java
 
b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/NewKafkaSystemConsumer.java
index 26db610..dd7e584 100644
--- 
a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/NewKafkaSystemConsumer.java
+++ 
b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/NewKafkaSystemConsumer.java
@@ -21,6 +21,7 @@
 
 package org.apache.samza.system.kafka;
 
+import java.util.Collection;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -40,29 +41,24 @@ import org.apache.samza.Partition;
 import org.apache.samza.SamzaException;
 import org.apache.samza.config.Config;
 import org.apache.samza.config.KafkaConfig;
+import org.apache.samza.config.StreamConfig;
 import org.apache.samza.system.IncomingMessageEnvelope;
 import org.apache.samza.system.SystemConsumer;
 import org.apache.samza.system.SystemStream;
 import org.apache.samza.system.SystemStreamPartition;
 import org.apache.samza.util.BlockingEnvelopeMap;
 import org.apache.samza.util.Clock;
+import org.apache.samza.util.KafkaUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import scala.Option;
+import scala.collection.JavaConversions;
 
 
 public class NewKafkaSystemConsumer<K,V> extends BlockingEnvelopeMap 
implements SystemConsumer{
 
   private static final Logger LOG = 
LoggerFactory.getLogger(NewKafkaSystemConsumer.class);
 
-  /**
-   * Provides a way to unwrap the value further. It is used for intermediate 
stream messages.
-   * @param <T> value type
-   */
-  public interface ValueUnwrapper<T> {
-    Object unwrapValue(SystemStream systemStream, T value);
-  }
-
   private static final long FETCH_THRESHOLD = 50000;
   private static final long FETCH_THRESHOLD_BYTES = -1L;
   private final Consumer<K,V> kafkaConsumer;
@@ -75,7 +71,6 @@ public class NewKafkaSystemConsumer<K,V> extends 
BlockingEnvelopeMap implements
   private final AtomicBoolean started = new AtomicBoolean(false);
   private final Config config;
   private final boolean fetchThresholdBytesEnabled;
-  private final ValueUnwrapper<V> valueUnwrapper;
 
   // This sink is used to transfer the messages from the proxy/consumer to the 
BlockingEnvelopeMap.
   private KafkaConsumerMessageSink messageSink;
@@ -99,9 +94,7 @@ public class NewKafkaSystemConsumer<K,V> extends 
BlockingEnvelopeMap implements
       Config config,
       String clientId,
       KafkaSystemConsumerMetrics metrics,
-      Clock clock,
-      boolean fetchThresholdBytesEnabled,
-      ValueUnwrapper<V> valueUnwrapper) {
+      Clock clock) {
 
     super(metrics.registry(),clock, metrics.getClass().getName());
 
@@ -109,41 +102,64 @@ public class NewKafkaSystemConsumer<K,V> extends 
BlockingEnvelopeMap implements
     this.clientId = clientId;
     this.systemName = systemName;
     this.config = config;
-    this.fetchThresholdBytesEnabled = fetchThresholdBytesEnabled;
     this.metricName = systemName + " " + clientId;
 
     this.kafkaConsumer = kafkaConsumer;
-    this.valueUnwrapper = valueUnwrapper;
+
+    this.fetchThresholdBytesEnabled = new 
KafkaConfig(config).isConsumerFetchThresholdBytesEnabled(systemName);
 
     LOG.info(String.format(
         "Created SamzaLiKafkaSystemConsumer for system=%s, clientId=%s, 
metricName=%s with liKafkaConsumer=%s",
         systemName, clientId, metricName, this.kafkaConsumer.toString()));
   }
 
-  public static KafkaConsumer<byte[], byte[]> getKafkaConsumerImpl(String 
systemName, String clientId, Config config) {
+  public static <K, V> NewKafkaSystemConsumer getNewKafkaSystemConsumer(
+      String systemName,
+      Config config,
+      String clientId,
+      KafkaSystemConsumerMetrics metrics,
+      Clock clock) {
+
+    // extract consumer configs and create kafka consumer
+    KafkaConsumer<K, V> kafkaConsumer = getKafkaConsumerImpl(systemName, 
clientId, config);
+
+    return new NewKafkaSystemConsumer(kafkaConsumer,
+        systemName,
+        config,
+        clientId,
+        metrics,
+        clock);
+  }
+
+  /**
+   * create kafka consumer
+   * @param systemName
+   * @param clientId
+   * @param config
+   * @return kafka consumer
+   */
+  private static <K, V> KafkaConsumer<K, V> getKafkaConsumerImpl(String 
systemName, String clientId, Config config) {
 
     Map<String, String> injectProps = new HashMap<>();
-    injectProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
ByteArrayDeserializer.class.getName());
-    injectProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
ByteArrayDeserializer.class.getName());
 
+    // the consumer is fully typed, and deserialization can be too. But in 
case it is not provided we should
+    // default to byte[]
+    if ( !config.containsKey(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG)) {
+      LOG.info("default key serialization for the consumer(for {}) to 
ByteArrayDeserializer", systemName);
+      injectProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
ByteArrayDeserializer.class.getName());
+    }
+    if ( !config.containsKey(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG)) {
+      LOG.info("default value serialization for the consumer(for {}) to 
ByteArrayDeserializer", systemName);
+      injectProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
ByteArrayDeserializer.class.getName());
+    }
+
+    // extract kafka consumer configs
     KafkaConsumerConfig consumerConfig =
         KafkaConsumerConfig.getKafkaSystemConsumerConfig(config, systemName, 
clientId, injectProps);
 
     LOG.info("==============>Consumer properties in getKafkaConsumerImpl: 
systemName: {}, consumerProperties: {}", systemName, 
consumerConfig.originals());
-    /*
-    Map<String, Object> kafkaConsumerConfig = 
consumerConfig.originals().entrySet().stream()
-        .collect(Collectors.toMap((kv)->kv.getKey(), 
(kv)->(Object)kv.getValue()));
-*/
-
-    return new KafkaConsumer<byte[], byte[]>(consumerConfig.originals());
-  }
 
-  /**
-   * return system name for this consumer
-   * @return system name
-   */
-  public String getSystemName() {
-    return systemName;
+    return new KafkaConsumer<>(consumerConfig.originals());
   }
 
   @Override
@@ -156,7 +172,7 @@ public class NewKafkaSystemConsumer<K,V> extends 
BlockingEnvelopeMap implements
       LOG.warn("attempting to start a stopped consumer");
       return;
     }
-LOG.info("==============>About to start consumer");
+    LOG.info("==============>About to start consumer");
     // initialize the subscriptions for all the registered TopicPartitions
     startSubscription();
     LOG.info("==============>subscription started");
@@ -193,7 +209,7 @@ LOG.info("==============>About to start consumer");
 
     // create the thread with the consumer
     proxy = new KafkaConsumerProxy(kafkaConsumer, systemName, clientId, 
messageSink,
-        samzaConsumerMetrics, metricName, valueUnwrapper);
+        samzaConsumerMetrics, metricName);
 
     LOG.info("==============>Created consumer proxy: " + proxy);
   }
@@ -363,6 +379,23 @@ LOG.info("==============>About to start consumer");
     return new SystemStreamPartition(systemName, tp.topic(), new 
Partition(tp.partition()));
   }
 
+  /**
+   * return system name for this consumer
+   * @return system name
+   */
+  public String getSystemName() {
+    return systemName;
+  }
+
+  private static Set<SystemStream> getIntermediateStreams(Config config) {
+    StreamConfig streamConfig = new StreamConfig(config);
+    Collection<String> streamIds = 
JavaConversions.asJavaCollection(streamConfig.getStreamIds());
+    return streamIds.stream()
+        .filter(streamConfig::getIsIntermediateStream)
+        .map(id -> streamConfig.streamIdToSystemStream(id))
+        .collect(Collectors.toSet());
+  }
+
   ////////////////////////////////////
   // inner class for the message sink
   ////////////////////////////////////

Reply via email to