This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 0487804 Support Kafka's ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG in
PulsarKafkaConsumer. (#3911)
0487804 is described below
commit 0487804c6ba39f227c1f2e7d293700dcc3dcf646
Author: Marvin Cai <[email protected]>
AuthorDate: Thu Mar 28 13:33:58 2019 -0700
Support Kafka's ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG in
PulsarKafkaConsumer. (#3911)
* Support Kafka's ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG in
PulsarKafkaConsumer.
Apply onConsume in poll() before returning the ConsumerRecords,
apply onCommit in doCommitOffsets() before committing all offsets.
Also apply doc to reflect support for
ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG.
* Update error message for applying onConsume and onCommit for interceptors
to include the specific interceptor name.
---
.../clients/consumer/PulsarKafkaConsumer.java | 46 +++++++++++++++++++++-
site2/docs/adaptors-kafka.md | 1 +
2 files changed, 46 insertions(+), 1 deletion(-)
diff --git
a/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/kafka/clients/consumer/PulsarKafkaConsumer.java
b/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/kafka/clients/consumer/PulsarKafkaConsumer.java
index 0592a4b..0c0059f 100644
---
a/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/kafka/clients/consumer/PulsarKafkaConsumer.java
+++
b/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/kafka/clients/consumer/PulsarKafkaConsumer.java
@@ -86,6 +86,8 @@ public class PulsarKafkaConsumer<K, V> implements Consumer<K,
V>, MessageListene
private final Set<TopicPartition> unpolledPartitions = new HashSet<>();
private final SubscriptionInitialPosition strategy;
+ private List<ConsumerInterceptor<K, V>> interceptors;
+
private volatile boolean closed = false;
private final int maxRecordsInSinglePoll;
@@ -162,6 +164,9 @@ public class PulsarKafkaConsumer<K, V> implements
Consumer<K, V>, MessageListene
maxRecordsInSinglePoll = 1000;
}
+ interceptors = (List) config.getConfiguredInstances(
+ ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG,
ConsumerInterceptor.class);
+
this.properties = new Properties();
config.originals().forEach((k, v) -> properties.put(k, v));
ClientBuilder clientBuilder =
PulsarClientKafkaConfig.getClientBuilder(properties);
@@ -374,7 +379,8 @@ public class PulsarKafkaConsumer<K, V> implements
Consumer<K, V>, MessageListene
commitAsync();
}
- return new ConsumerRecords<>(records);
+ // If no interceptor is provided, interceptors list will an empty
list, original ConsumerRecords will be return.
+ return applyConsumerInterceptorsOnConsume(interceptors, new
ConsumerRecords<>(records));
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
@@ -438,6 +444,7 @@ public class PulsarKafkaConsumer<K, V> implements
Consumer<K, V>, MessageListene
private CompletableFuture<Void> doCommitOffsets(Map<TopicPartition,
OffsetAndMetadata> offsets) {
List<CompletableFuture<Void>> futures = new ArrayList<>();
+ applyConsumerInterceptorsOnCommit(interceptors, offsets);
offsets.forEach((topicPartition, offsetAndMetadata) -> {
org.apache.pulsar.client.api.Consumer<byte[]> consumer =
consumers.get(topicPartition);
lastCommittedOffset.put(topicPartition, offsetAndMetadata);
@@ -457,6 +464,43 @@ public class PulsarKafkaConsumer<K, V> implements
Consumer<K, V>, MessageListene
return offsets;
}
+ /**
+ * Apply all onConsume methods in a list of ConsumerInterceptors.
+ * Catch any exception during the process.
+ *
+ * @param interceptors Interceptors provided.
+ * @param consumerRecords ConsumerRecords returned by calling {@link
this#poll(long)}.
+ * @return ConsumerRecords after applying all
ConsumerInterceptor in interceptors list.
+ */
+ private ConsumerRecords
applyConsumerInterceptorsOnConsume(List<ConsumerInterceptor<K, V>>
interceptors, ConsumerRecords consumerRecords) {
+ ConsumerRecords processedConsumerRecords = consumerRecords;
+ for (ConsumerInterceptor interceptor : interceptors) {
+ try {
+ processedConsumerRecords =
interceptor.onConsume(processedConsumerRecords);
+ } catch (Exception e) {
+ log.warn("Error executing onConsume for interceptor {}.",
interceptor.getClass().getCanonicalName(), e);
+ }
+ }
+ return processedConsumerRecords;
+ }
+
+ /**
+ * Apply all onCommit methods in a list of ConsumerInterceptors.
+ * Catch any exception during the process.
+ *
+ * @param interceptors Interceptors provided.
+ * @param offsets Offsets need to be commit.
+ */
+ private void applyConsumerInterceptorsOnCommit(List<ConsumerInterceptor<K,
V>> interceptors, Map<TopicPartition, OffsetAndMetadata> offsets) {
+ for (ConsumerInterceptor interceptor : interceptors) {
+ try {
+ interceptor.onCommit(offsets);
+ } catch (Exception e) {
+ log.warn("Error executing onCommit for interceptor {}.",
interceptor.getClass().getCanonicalName(), e);
+ }
+ }
+ }
+
@Override
public void seek(TopicPartition partition, long offset) {
MessageId msgId = MessageIdUtils.getMessageId(offset);
diff --git a/site2/docs/adaptors-kafka.md b/site2/docs/adaptors-kafka.md
index 785cdcc..ca4f877 100644
--- a/site2/docs/adaptors-kafka.md
+++ b/site2/docs/adaptors-kafka.md
@@ -209,6 +209,7 @@ Properties:
| `fetch.min.bytes` | Ignored |
|
| `fetch.max.bytes` | Ignored |
|
| `fetch.max.wait.ms` | Ignored |
|
+| `interceptor.classes` | Yes |
|
| `metadata.max.age.ms` | Ignored |
|
| `max.partition.fetch.bytes` | Ignored |
|
| `send.buffer.bytes` | Ignored |
|