This is an automated email from the ASF dual-hosted git repository.

pmaheshwari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git


The following commit(s) were added to refs/heads/master by this push:
     new 02a3d6a  SAMZA-2120: Enable custom handling of ConsumerRecords 
consumed by Kafka
02a3d6a is described below

commit 02a3d6a62fd77b00316b1b92c76713687320d24e
Author: Cameron Lee <[email protected]>
AuthorDate: Mon Mar 11 17:35:33 2019 -0700

    SAMZA-2120: Enable custom handling of ConsumerRecords consumed by Kafka
    
    This replaces https://github.com/apache/samza/pull/941. Instead of adding 
an object hook into KafkaConsumerProxy, this allows KafkaConsumerProxy to be 
directly extended for record processing.
    
    Author: Cameron Lee <[email protected]>
    
    Reviewers: Prateek Maheshwari <[email protected]>
    
    Closes #947 from cameronlee314/kafka_proxy_refactor
---
 .../samza/system/kafka/KafkaSystemConsumer.java    | 12 ++--
 .../samza/system/kafka/KafkaConsumerProxy.java     | 65 ++++++++++++++++++----
 .../system/kafka/KafkaConsumerProxyFactory.java    | 29 ++++++++++
 .../samza/system/kafka/KafkaSystemFactory.scala    |  7 ++-
 .../system/kafka/TestKafkaSystemConsumer.java      |  4 +-
 5 files changed, 98 insertions(+), 19 deletions(-)

diff --git 
a/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaSystemConsumer.java
 
b/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaSystemConsumer.java
index 93ded8b..ee51222 100644
--- 
a/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaSystemConsumer.java
+++ 
b/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaSystemConsumer.java
@@ -66,7 +66,7 @@ public class KafkaSystemConsumer<K, V> extends 
BlockingEnvelopeMap implements Sy
 
   // This proxy contains a separate thread, which reads kafka messages (with 
consumer.poll()) and populates
   // BlockingEnvelopMap's buffers.
-  final private KafkaConsumerProxy proxy;
+  final private KafkaConsumerProxy<K, V> proxy;
 
   // keep registration data until the start - mapping between registered SSPs 
and topicPartitions, and their offsets
   final Map<TopicPartition, String> topicPartitionsToOffset = new HashMap<>();
@@ -80,13 +80,13 @@ public class KafkaSystemConsumer<K, V> extends 
BlockingEnvelopeMap implements Sy
    * @param kafkaConsumer kafka Consumer object to be used by this system 
consumer
    * @param systemName system name for which we create the consumer
    * @param config application config
-   * @param clientId clientId from the kafka consumer to be used in the 
KafkaConsumerProxy
+   * @param clientId clientId from the kafka consumer
+   * @param kafkaConsumerProxyFactory factory for creating a 
KafkaConsumerProxy to use in this consumer
    * @param metrics metrics for this KafkaSystemConsumer
    * @param clock system clock
    */
   public KafkaSystemConsumer(Consumer<K, V> kafkaConsumer, String systemName, 
Config config, String clientId,
-      KafkaSystemConsumerMetrics metrics, Clock clock) {
-
+      KafkaConsumerProxyFactory<K, V> kafkaConsumerProxyFactory, 
KafkaSystemConsumerMetrics metrics, Clock clock) {
     super(metrics.registry(), clock, metrics.getClass().getName());
 
     this.kafkaConsumer = kafkaConsumer;
@@ -102,8 +102,8 @@ public class KafkaSystemConsumer<K, V> extends 
BlockingEnvelopeMap implements Sy
 
     // Create the proxy to do the actual message reading.
     String metricName = String.format("%s-%s", systemName, clientId);
-    proxy = new KafkaConsumerProxy(kafkaConsumer, systemName, clientId, 
messageSink, metrics, metricName);
-    LOG.info("{}: Created KafkaConsumerProxy {} ", this, proxy);
+    proxy = kafkaConsumerProxyFactory.create(this.messageSink);
+    LOG.info("{}: Created proxy {} ", this, proxy);
   }
 
   /**
diff --git 
a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaConsumerProxy.java
 
b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaConsumerProxy.java
index 0ea16e7..93b1ab2 100644
--- 
a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaConsumerProxy.java
+++ 
b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaConsumerProxy.java
@@ -35,6 +35,7 @@ import kafka.common.TopicAndPartition;
 import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.common.Metric;
 import org.apache.kafka.common.MetricName;
 import org.apache.kafka.common.TopicPartition;
@@ -51,7 +52,7 @@ import org.slf4j.LoggerFactory;
  * This class is not thread safe. There will be only one instance of this 
class per KafkaSystemConsumer object.
  * We still need some synchronization around kafkaConsumer. See pollConsumer() 
method for details.
  */
-class KafkaConsumerProxy<K, V> {
+public class KafkaConsumerProxy<K, V> {
   private static final Logger LOG = 
LoggerFactory.getLogger(KafkaConsumerProxy.class);
 
   private static final int SLEEP_MS_WHILE_NO_TOPIC_PARTITION = 100;
@@ -74,8 +75,8 @@ class KafkaConsumerProxy<K, V> {
   private volatile Throwable failureCause = null;
   private final CountDownLatch consumerPollThreadStartLatch = new 
CountDownLatch(1);
 
-  KafkaConsumerProxy(Consumer<K, V> kafkaConsumer, String systemName, String 
clientId,
-      KafkaSystemConsumer.KafkaConsumerMessageSink messageSink, 
KafkaSystemConsumerMetrics samzaConsumerMetrics,
+  public KafkaConsumerProxy(Consumer<K, V> kafkaConsumer, String systemName, 
String clientId,
+      KafkaSystemConsumer<K, V>.KafkaConsumerMessageSink messageSink, 
KafkaSystemConsumerMetrics samzaConsumerMetrics,
       String metricName) {
 
     this.kafkaConsumer = kafkaConsumer;
@@ -310,12 +311,8 @@ class KafkaConsumerProxy<K, V> {
         results.put(ssp, messages);
       }
 
-      K key = record.key();
-      Object value = record.value();
-      IncomingMessageEnvelope imEnvelope =
-          new IncomingMessageEnvelope(ssp, String.valueOf(record.offset()), 
key, value, getRecordSize(record),
-              record.timestamp(), Instant.now().toEpochMilli());
-      messages.add(imEnvelope);
+      IncomingMessageEnvelope incomingMessageEnvelope = 
handleNewRecord(record, ssp);
+      messages.add(incomingMessageEnvelope);
     }
     if (LOG.isDebugEnabled()) {
       LOG.debug("# records per SSP:");
@@ -328,7 +325,28 @@ class KafkaConsumerProxy<K, V> {
     return results;
   }
 
-  private int getRecordSize(ConsumerRecord<K, V> r) {
+  /**
+   * Convert a {@link ConsumerRecord} to an {@link IncomingMessageEnvelope}. 
This may also execute some other custom
+   * logic for each new {@link IncomingMessageEnvelope}.
+   *
+   * This has a protected visibility so that {@link KafkaConsumerProxy} can be 
extended to add special handling logic
+   * for custom Kafka systems.
+   *
+   * @param consumerRecord {@link ConsumerRecord} from Kafka that was consumed
+   * @param systemStreamPartition {@link SystemStreamPartition} corresponding 
to the record
+   * @return {@link IncomingMessageEnvelope} corresponding to the {@code 
consumerRecord}
+   */
+  protected IncomingMessageEnvelope handleNewRecord(ConsumerRecord<K, V> 
consumerRecord,
+      SystemStreamPartition systemStreamPartition) {
+    return new IncomingMessageEnvelope(systemStreamPartition, 
String.valueOf(consumerRecord.offset()),
+        consumerRecord.key(), consumerRecord.value(), 
getRecordSize(consumerRecord), consumerRecord.timestamp(),
+        Instant.now().toEpochMilli());
+  }
+
+  /**
+   * Protected to help extensions of this class build {@link 
IncomingMessageEnvelope}s.
+   */
+  protected int getRecordSize(ConsumerRecord<K, V> r) {
     int keySize = (r.key() == null) ? 0 : r.serializedKeySize();
     return keySize + r.serializedValueSize();
   }
@@ -423,5 +441,32 @@ class KafkaConsumerProxy<K, V> {
   public String toString() {
     return String.format("consumerProxy-%s-%s", systemName, clientId);
   }
+
+
+
+  /**
+   * Used to create an instance of {@link KafkaConsumerProxy}. This can be 
overridden in case an extension of
+   * {@link KafkaConsumerProxy} needs to be used within kafka system 
components like {@link KafkaSystemConsumer}.
+   */
+  public static class BaseFactory<K, V> implements 
KafkaConsumerProxyFactory<K, V> {
+    private final KafkaConsumer<K, V> kafkaConsumer;
+    private final String systemName;
+    private final String clientId;
+    private final KafkaSystemConsumerMetrics kafkaSystemConsumerMetrics;
+
+    public BaseFactory(KafkaConsumer<K, V> kafkaConsumer, String systemName, 
String clientId,
+        KafkaSystemConsumerMetrics kafkaSystemConsumerMetrics) {
+      this.kafkaConsumer = kafkaConsumer;
+      this.systemName = systemName;
+      this.clientId = clientId;
+      this.kafkaSystemConsumerMetrics = kafkaSystemConsumerMetrics;
+    }
+
+    public KafkaConsumerProxy<K, V> create(KafkaSystemConsumer<K, 
V>.KafkaConsumerMessageSink messageSink) {
+      String metricName = String.format("%s-%s", systemName, clientId);
+      return new KafkaConsumerProxy<>(this.kafkaConsumer, this.systemName, 
this.clientId, messageSink,
+          this.kafkaSystemConsumerMetrics, metricName);
+    }
+  }
 }
 
diff --git 
a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaConsumerProxyFactory.java
 
b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaConsumerProxyFactory.java
new file mode 100644
index 0000000..a566d2a
--- /dev/null
+++ 
b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaConsumerProxyFactory.java
@@ -0,0 +1,29 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.samza.system.kafka;
+
+/**
+ * Used to create an instance of {@link KafkaConsumerProxy}. This can be 
overridden in case an extension of
+ * {@link KafkaConsumerProxy} needs to be used within kafka system components 
like {@link KafkaSystemConsumer}.
+ */
+public interface KafkaConsumerProxyFactory<K, V> {
+  KafkaConsumerProxy<K, V> create(KafkaSystemConsumer<K, 
V>.KafkaConsumerMessageSink messageSink);
+}
diff --git 
a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala
 
b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala
index cca7a6b..035d3a8 100644
--- 
a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala
+++ 
b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala
@@ -57,8 +57,11 @@ class KafkaSystemFactory extends SystemFactory with Logging {
     val kafkaConsumer = 
KafkaSystemConsumer.createKafkaConsumerImpl[Array[Byte], 
Array[Byte]](systemName, kafkaConsumerConfig)
     info("Created kafka consumer for system %s, clientId %s: %s" format 
(systemName, clientId, kafkaConsumer))
 
-    val kafkaSystemConsumer = new KafkaSystemConsumer(kafkaConsumer, 
systemName, config, clientId, metrics,
-      new SystemClock)
+    val kafkaConsumerProxyFactory =
+      new KafkaConsumerProxy.BaseFactory[Array[Byte], 
Array[Byte]](kafkaConsumer, systemName, clientId, metrics)
+
+    val kafkaSystemConsumer = new KafkaSystemConsumer(kafkaConsumer, 
systemName, config, clientId,
+      kafkaConsumerProxyFactory, metrics, new SystemClock)
     info("Created samza system consumer for system %s, config %s: %s" 
format(systemName, config, kafkaSystemConsumer))
 
     kafkaSystemConsumer
diff --git 
a/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaSystemConsumer.java
 
b/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaSystemConsumer.java
index 981ac45..e66c374 100644
--- 
a/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaSystemConsumer.java
+++ 
b/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaSystemConsumer.java
@@ -41,6 +41,7 @@ import org.junit.Assert;
 import org.junit.Test;
 
 import static org.junit.Assert.*;
+import static org.mockito.Mockito.mock;
 
 
 public class TestKafkaSystemConsumer {
@@ -215,7 +216,8 @@ public class TestKafkaSystemConsumer {
   static class MockKafkaSystemConsumer extends KafkaSystemConsumer {
     public MockKafkaSystemConsumer(Consumer kafkaConsumer, String systemName, 
Config config, String clientId,
         KafkaSystemConsumerMetrics metrics, Clock clock) {
-      super(kafkaConsumer, systemName, config, clientId, metrics, clock);
+      super(kafkaConsumer, systemName, config, clientId, (messageSink) -> 
mock(KafkaConsumerProxy.class), metrics,
+          clock);
     }
 
     @Override

Reply via email to