This is an automated email from the ASF dual-hosted git repository.
davsclaus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/master by this push:
new 25539af CAMEL-13339 - Fix to use last processed offset maintained by
Camel KafkaConsumer to avoid message loss upon partition revoke
25539af is described below
commit 25539afcfdf1966eaafc83b28657e6f410a5bc40
Author: Viswa Ramamoorthy <[email protected]>
AuthorDate: Fri Mar 22 07:37:04 2019 -0400
CAMEL-13339 - Fix to use last processed offset maintained by Camel
KafkaConsumer to avoid message loss upon partition revoke
---
.../camel/component/kafka/KafkaConsumer.java | 17 ++-
.../KafkaConsumerRebalancePartitionRevokeTest.java | 123 +++++++++++++++++++++
2 files changed, 136 insertions(+), 4 deletions(-)
diff --git
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
index ebb987c..9417dc7 100644
---
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
+++
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
@@ -22,9 +22,11 @@ import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
+import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.regex.Pattern;
import java.util.stream.StreamSupport;
@@ -171,6 +173,7 @@ public class KafkaConsumer extends DefaultConsumer {
private final Pattern topicPattern;
private final String threadId;
private final Properties kafkaProps;
+ private final Map<String, Long> lastProcessedOffset = new
ConcurrentHashMap<>();
KafkaFetchRecords(String topicName, Pattern topicPattern, String id,
Properties kafkaProps) {
this.topicName = topicName;
@@ -294,6 +297,7 @@ public class KafkaConsumer extends DefaultConsumer {
long partitionLastOffset = -1;
Iterator<ConsumerRecord<Object, Object>>
recordIterator = allRecords.records(partition).iterator();
+ log.debug("Records count {} received for partition
{}", allRecords.records(partition).size(), partition);
if (!breakOnErrorHit && recordIterator.hasNext()) {
ConsumerRecord<Object, Object> record;
@@ -341,6 +345,8 @@ public class KafkaConsumer extends DefaultConsumer {
} else {
// record was success so remember its
offset
partitionLastOffset = record.offset();
+ //lastOffsetProcessed would be used by
Consumer re-balance listener to preserve offset state upon partition revoke
+
lastProcessedOffset.put(serializeOffsetKey(partition), partitionLastOffset);
}
}
@@ -418,12 +424,15 @@ public class KafkaConsumer extends DefaultConsumer {
log.debug("onPartitionsRevoked: {} from topic {}", threadId,
topicName);
StateRepository<String, String> offsetRepository =
endpoint.getConfiguration().getOffsetRepository();
- if (offsetRepository != null) {
for (TopicPartition partition : partitions) {
- long offset = consumer.position(partition);
- log.debug("Saving offset repository state {} from topic {}
with offset: {}", threadId, topicName, offset);
- offsetRepository.setState(serializeOffsetKey(partition),
serializeOffsetValue(offset));
+ String offsetKey = serializeOffsetKey(partition);
+ Long offset = lastProcessedOffset.get(offsetKey);
+ if (offset == null) {
+ offset = -1l;
}
+ log.debug("Saving offset repository state {} from offsetKey {}
with offset: {}", threadId, offsetKey, offset);
+ commitOffset(offsetRepository, partition, offset, true);
+ lastProcessedOffset.remove(offsetKey);
}
}
diff --git
a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerRebalancePartitionRevokeTest.java
b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerRebalancePartitionRevokeTest.java
new file mode 100644
index 0000000..756a917
--- /dev/null
+++
b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerRebalancePartitionRevokeTest.java
@@ -0,0 +1,123 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.kafka;
+
+import java.util.Properties;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.camel.EndpointInject;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.impl.JndiRegistry;
+import org.apache.camel.impl.MemoryStateRepository;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.junit.After;
+import org.junit.Test;
+
+public class KafkaConsumerRebalancePartitionRevokeTest extends
BaseEmbeddedKafkaTest {
+ private static final String TOPIC = "offset-rebalance";
+
+ @EndpointInject(uri = "mock:result")
+ private MockEndpoint result;
+
+ private OffsetStateRepository stateRepository;
+ private CountDownLatch messagesLatch;
+
+ private org.apache.kafka.clients.producer.KafkaProducer<String, String>
producer;
+
+ @Override
+ protected void doPreSetup() throws Exception {
+ Properties props = getDefaultProperties();
+ producer = new
org.apache.kafka.clients.producer.KafkaProducer<>(props);
+
+ kafkaBroker.createTopic(TOPIC, 2);
+ for (int i = 0; i < 2; i++) {
+ producer.send(new ProducerRecord<>(TOPIC, i % 2, "key", "message-"
+ i));
+ }
+ messagesLatch = new CountDownLatch(1);
+ stateRepository = new OffsetStateRepository(messagesLatch);
+ }
+
+ @After
+ public void after() {
+ if (producer != null) {
+ producer.close();
+ }
+ }
+
+ @Test
+ public void ensurePartitionRevokeCallsWithLastProcessedOffset() throws
Exception {
+ boolean partitionRevokeCalled = messagesLatch.await(30000,
TimeUnit.MILLISECONDS);
+ assertTrue("StateRepository.setState should have been called with
offset >= 0 for topic" + TOPIC +
+ ". Remaining count : " + messagesLatch.getCount(),
partitionRevokeCalled);
+ }
+
+ @Override
+ protected JndiRegistry createRegistry() throws Exception {
+ JndiRegistry registry = super.createRegistry();
+ registry.bind("offset", stateRepository);
+ return registry;
+ }
+
+ @Override
+ protected RouteBuilder createRouteBuilder() throws Exception {
+ return new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ from("kafka:" + TOPIC
+ + "?groupId=" + TOPIC + "_GROUP"
+ + "&autoCommitIntervalMs=1000"
+ + "&autoOffsetReset=earliest"
+ + "&consumersCount=1"
+ + "&offsetRepository=#offset")
+ .routeId("consumer-rebalance-route")
+ .to("mock:result");
+ }
+ };
+ }
+
+ public class OffsetStateRepository extends MemoryStateRepository {
+ CountDownLatch messagesLatch = null;
+
+ public OffsetStateRepository(CountDownLatch messagesLatch) {
+ this.messagesLatch = messagesLatch;
+ }
+
+ @Override
+ public void start() throws Exception {
+ }
+
+ @Override
+ public void stop() throws Exception {
+ }
+
+ @Override
+ public String getState(String key) {
+ return super.getState(key);
+ }
+
+ @Override
+ public void setState(String key, String value) {
+ if (key.contains(TOPIC) && messagesLatch.getCount() > 0
+ && Long.parseLong(value) >= 0) {
+ messagesLatch.countDown();
+ }
+ super.setState(key, value);
+ }
+ }
+}