This is an automated email from the ASF dual-hosted git repository.
pmaheshwari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git
The following commit(s) were added to refs/heads/master by this push:
new 02a3d6a SAMZA-2120: Enable custom handling of ConsumerRecords
consumed by Kafka
02a3d6a is described below
commit 02a3d6a62fd77b00316b1b92c76713687320d24e
Author: Cameron Lee <[email protected]>
AuthorDate: Mon Mar 11 17:35:33 2019 -0700
SAMZA-2120: Enable custom handling of ConsumerRecords consumed by Kafka
This replaces https://github.com/apache/samza/pull/941. Instead of adding
an object hook into KafkaConsumerProxy, this allows KafkaConsumerProxy to be
directly extended for record processing.
Author: Cameron Lee <[email protected]>
Reviewers: Prateek Maheshwari <[email protected]>
Closes #947 from cameronlee314/kafka_proxy_refactor
---
.../samza/system/kafka/KafkaSystemConsumer.java | 12 ++--
.../samza/system/kafka/KafkaConsumerProxy.java | 65 ++++++++++++++++++----
.../system/kafka/KafkaConsumerProxyFactory.java | 29 ++++++++++
.../samza/system/kafka/KafkaSystemFactory.scala | 7 ++-
.../system/kafka/TestKafkaSystemConsumer.java | 4 +-
5 files changed, 98 insertions(+), 19 deletions(-)
diff --git
a/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaSystemConsumer.java
b/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaSystemConsumer.java
index 93ded8b..ee51222 100644
---
a/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaSystemConsumer.java
+++
b/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaSystemConsumer.java
@@ -66,7 +66,7 @@ public class KafkaSystemConsumer<K, V> extends
BlockingEnvelopeMap implements Sy
// This proxy contains a separate thread, which reads kafka messages (with
consumer.poll()) and populates
// BlockingEnvelopMap's buffers.
- final private KafkaConsumerProxy proxy;
+ final private KafkaConsumerProxy<K, V> proxy;
// keep registration data until the start - mapping between registered SSPs
and topicPartitions, and their offsets
final Map<TopicPartition, String> topicPartitionsToOffset = new HashMap<>();
@@ -80,13 +80,13 @@ public class KafkaSystemConsumer<K, V> extends
BlockingEnvelopeMap implements Sy
* @param kafkaConsumer kafka Consumer object to be used by this system
consumer
* @param systemName system name for which we create the consumer
* @param config application config
- * @param clientId clientId from the kafka consumer to be used in the
KafkaConsumerProxy
+ * @param clientId clientId from the kafka consumer
+ * @param kafkaConsumerProxyFactory factory for creating a
KafkaConsumerProxy to use in this consumer
* @param metrics metrics for this KafkaSystemConsumer
* @param clock system clock
*/
public KafkaSystemConsumer(Consumer<K, V> kafkaConsumer, String systemName,
Config config, String clientId,
- KafkaSystemConsumerMetrics metrics, Clock clock) {
-
+ KafkaConsumerProxyFactory<K, V> kafkaConsumerProxyFactory,
KafkaSystemConsumerMetrics metrics, Clock clock) {
super(metrics.registry(), clock, metrics.getClass().getName());
this.kafkaConsumer = kafkaConsumer;
@@ -102,8 +102,8 @@ public class KafkaSystemConsumer<K, V> extends
BlockingEnvelopeMap implements Sy
// Create the proxy to do the actual message reading.
String metricName = String.format("%s-%s", systemName, clientId);
- proxy = new KafkaConsumerProxy(kafkaConsumer, systemName, clientId,
messageSink, metrics, metricName);
- LOG.info("{}: Created KafkaConsumerProxy {} ", this, proxy);
+ proxy = kafkaConsumerProxyFactory.create(this.messageSink);
+ LOG.info("{}: Created proxy {} ", this, proxy);
}
/**
diff --git
a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaConsumerProxy.java
b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaConsumerProxy.java
index 0ea16e7..93b1ab2 100644
---
a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaConsumerProxy.java
+++
b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaConsumerProxy.java
@@ -35,6 +35,7 @@ import kafka.common.TopicAndPartition;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.TopicPartition;
@@ -51,7 +52,7 @@ import org.slf4j.LoggerFactory;
* This class is not thread safe. There will be only one instance of this
class per KafkaSystemConsumer object.
* We still need some synchronization around kafkaConsumer. See pollConsumer()
method for details.
*/
-class KafkaConsumerProxy<K, V> {
+public class KafkaConsumerProxy<K, V> {
private static final Logger LOG =
LoggerFactory.getLogger(KafkaConsumerProxy.class);
private static final int SLEEP_MS_WHILE_NO_TOPIC_PARTITION = 100;
@@ -74,8 +75,8 @@ class KafkaConsumerProxy<K, V> {
private volatile Throwable failureCause = null;
private final CountDownLatch consumerPollThreadStartLatch = new
CountDownLatch(1);
- KafkaConsumerProxy(Consumer<K, V> kafkaConsumer, String systemName, String
clientId,
- KafkaSystemConsumer.KafkaConsumerMessageSink messageSink,
KafkaSystemConsumerMetrics samzaConsumerMetrics,
+ public KafkaConsumerProxy(Consumer<K, V> kafkaConsumer, String systemName,
String clientId,
+ KafkaSystemConsumer<K, V>.KafkaConsumerMessageSink messageSink,
KafkaSystemConsumerMetrics samzaConsumerMetrics,
String metricName) {
this.kafkaConsumer = kafkaConsumer;
@@ -310,12 +311,8 @@ class KafkaConsumerProxy<K, V> {
results.put(ssp, messages);
}
- K key = record.key();
- Object value = record.value();
- IncomingMessageEnvelope imEnvelope =
- new IncomingMessageEnvelope(ssp, String.valueOf(record.offset()),
key, value, getRecordSize(record),
- record.timestamp(), Instant.now().toEpochMilli());
- messages.add(imEnvelope);
+ IncomingMessageEnvelope incomingMessageEnvelope =
handleNewRecord(record, ssp);
+ messages.add(incomingMessageEnvelope);
}
if (LOG.isDebugEnabled()) {
LOG.debug("# records per SSP:");
@@ -328,7 +325,28 @@ class KafkaConsumerProxy<K, V> {
return results;
}
- private int getRecordSize(ConsumerRecord<K, V> r) {
+ /**
+ * Convert a {@link ConsumerRecord} to an {@link IncomingMessageEnvelope}.
This may also execute some other custom
+ * logic for each new {@link IncomingMessageEnvelope}.
+ *
+ * This has a protected visibility so that {@link KafkaConsumerProxy} can be
extended to add special handling logic
+ * for custom Kafka systems.
+ *
+ * @param consumerRecord {@link ConsumerRecord} from Kafka that was consumed
+ * @param systemStreamPartition {@link SystemStreamPartition} corresponding
to the record
+ * @return {@link IncomingMessageEnvelope} corresponding to the {@code
consumerRecord}
+ */
+ protected IncomingMessageEnvelope handleNewRecord(ConsumerRecord<K, V>
consumerRecord,
+ SystemStreamPartition systemStreamPartition) {
+ return new IncomingMessageEnvelope(systemStreamPartition,
String.valueOf(consumerRecord.offset()),
+ consumerRecord.key(), consumerRecord.value(),
getRecordSize(consumerRecord), consumerRecord.timestamp(),
+ Instant.now().toEpochMilli());
+ }
+
+ /**
+ * Protected to help extensions of this class build {@link
IncomingMessageEnvelope}s.
+ */
+ protected int getRecordSize(ConsumerRecord<K, V> r) {
int keySize = (r.key() == null) ? 0 : r.serializedKeySize();
return keySize + r.serializedValueSize();
}
@@ -423,5 +441,32 @@ class KafkaConsumerProxy<K, V> {
public String toString() {
return String.format("consumerProxy-%s-%s", systemName, clientId);
}
+
+
+
+ /**
+ * Used to create an instance of {@link KafkaConsumerProxy}. This can be
overridden in case an extension of
+ * {@link KafkaConsumerProxy} needs to be used within kafka system
components like {@link KafkaSystemConsumer}.
+ */
+ public static class BaseFactory<K, V> implements
KafkaConsumerProxyFactory<K, V> {
+ private final KafkaConsumer<K, V> kafkaConsumer;
+ private final String systemName;
+ private final String clientId;
+ private final KafkaSystemConsumerMetrics kafkaSystemConsumerMetrics;
+
+ public BaseFactory(KafkaConsumer<K, V> kafkaConsumer, String systemName,
String clientId,
+ KafkaSystemConsumerMetrics kafkaSystemConsumerMetrics) {
+ this.kafkaConsumer = kafkaConsumer;
+ this.systemName = systemName;
+ this.clientId = clientId;
+ this.kafkaSystemConsumerMetrics = kafkaSystemConsumerMetrics;
+ }
+
+ public KafkaConsumerProxy<K, V> create(KafkaSystemConsumer<K,
V>.KafkaConsumerMessageSink messageSink) {
+ String metricName = String.format("%s-%s", systemName, clientId);
+ return new KafkaConsumerProxy<>(this.kafkaConsumer, this.systemName,
this.clientId, messageSink,
+ this.kafkaSystemConsumerMetrics, metricName);
+ }
+ }
}
diff --git
a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaConsumerProxyFactory.java
b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaConsumerProxyFactory.java
new file mode 100644
index 0000000..a566d2a
--- /dev/null
+++
b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaConsumerProxyFactory.java
@@ -0,0 +1,29 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.samza.system.kafka;
+
+/**
+ * Used to create an instance of {@link KafkaConsumerProxy}. This can be
overridden in case an extension of
+ * {@link KafkaConsumerProxy} needs to be used within kafka system components
like {@link KafkaSystemConsumer}.
+ */
+public interface KafkaConsumerProxyFactory<K, V> {
+ KafkaConsumerProxy<K, V> create(KafkaSystemConsumer<K,
V>.KafkaConsumerMessageSink messageSink);
+}
diff --git
a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala
b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala
index cca7a6b..035d3a8 100644
---
a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala
+++
b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala
@@ -57,8 +57,11 @@ class KafkaSystemFactory extends SystemFactory with Logging {
val kafkaConsumer =
KafkaSystemConsumer.createKafkaConsumerImpl[Array[Byte],
Array[Byte]](systemName, kafkaConsumerConfig)
info("Created kafka consumer for system %s, clientId %s: %s" format
(systemName, clientId, kafkaConsumer))
- val kafkaSystemConsumer = new KafkaSystemConsumer(kafkaConsumer,
systemName, config, clientId, metrics,
- new SystemClock)
+ val kafkaConsumerProxyFactory =
+ new KafkaConsumerProxy.BaseFactory[Array[Byte],
Array[Byte]](kafkaConsumer, systemName, clientId, metrics)
+
+ val kafkaSystemConsumer = new KafkaSystemConsumer(kafkaConsumer,
systemName, config, clientId,
+ kafkaConsumerProxyFactory, metrics, new SystemClock)
info("Created samza system consumer for system %s, config %s: %s"
format(systemName, config, kafkaSystemConsumer))
kafkaSystemConsumer
diff --git
a/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaSystemConsumer.java
b/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaSystemConsumer.java
index 981ac45..e66c374 100644
---
a/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaSystemConsumer.java
+++
b/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaSystemConsumer.java
@@ -41,6 +41,7 @@ import org.junit.Assert;
import org.junit.Test;
import static org.junit.Assert.*;
+import static org.mockito.Mockito.mock;
public class TestKafkaSystemConsumer {
@@ -215,7 +216,8 @@ public class TestKafkaSystemConsumer {
static class MockKafkaSystemConsumer extends KafkaSystemConsumer {
public MockKafkaSystemConsumer(Consumer kafkaConsumer, String systemName,
Config config, String clientId,
KafkaSystemConsumerMetrics metrics, Clock clock) {
- super(kafkaConsumer, systemName, config, clientId, metrics, clock);
+ super(kafkaConsumer, systemName, config, clientId, (messageSink) ->
mock(KafkaConsumerProxy.class), metrics,
+ clock);
}
@Override