This is an automated email from the ASF dual-hosted git repository.
nwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-heron.git
The following commit(s) were added to refs/heads/master by this push:
new a49fc55 Bugfix kafka spout (#3275)
a49fc55 is described below
commit a49fc55005c86537a7516b682ea5d8fe748cdec8
Author: SiMing Weng <[email protected]>
AuthorDate: Fri May 24 13:31:03 2019 -0400
Bugfix kafka spout (#3275)
* update KafkaSpout
* Revert "update KafkaSpout"
This reverts commit dc64e9ad
* no concurrent collection is needed
fix a bug that filtered consumer record is not acknowledged immediately
discard the acknowledgement or failure when the partition has been revoked
move code around to reduce method complexity
* refactor to switch statement according to review comment
* add return for better readability
---
.../org/apache/heron/spouts/kafka/KafkaSpout.java | 202 +++++++++++----------
.../apache/heron/spouts/kafka/KafkaSpoutTest.java | 27 ++-
2 files changed, 124 insertions(+), 105 deletions(-)
diff --git
a/contrib/spouts/kafka/java/heron-kafka-spout-parent/heron-kafka-spout/src/main/java/org/apache/heron/spouts/kafka/KafkaSpout.java
b/contrib/spouts/kafka/java/heron-kafka-spout-parent/heron-kafka-spout/src/main/java/org/apache/heron/spouts/kafka/KafkaSpout.java
index bc13d0a..215c84c 100644
---
a/contrib/spouts/kafka/java/heron-kafka-spout-parent/heron-kafka-spout/src/main/java/org/apache/heron/spouts/kafka/KafkaSpout.java
+++
b/contrib/spouts/kafka/java/heron-kafka-spout-parent/heron-kafka-spout/src/main/java/org/apache/heron/spouts/kafka/KafkaSpout.java
@@ -22,14 +22,15 @@ import java.time.Duration;
import java.util.ArrayDeque;
import java.util.Collection;
import java.util.Collections;
+import java.util.HashMap;
import java.util.HashSet;
+import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
import java.util.Queue;
import java.util.Set;
import java.util.SortedMap;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.TreeMap;
import java.util.stream.Collectors;
import org.slf4j.Logger;
@@ -52,7 +53,6 @@ import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.errors.TimeoutException;
/**
* Kafka spout to consume data from Kafka topic(s), each record is converted
into a tuple via {@link ConsumerRecordTransformer}, and emitted into a topology
@@ -74,8 +74,8 @@ public class KafkaSpout<K, V> extends BaseRichSpout
private transient TopologyContext topologyContext;
private transient Queue<ConsumerRecord<K, V>> buffer;
private transient Consumer<K, V> consumer;
- private transient Set<TopicPartition> assignedPartitions;
private transient Set<MetricName> reportedMetrics;
+ private transient Set<TopicPartition> assignedPartitions;
private transient Map<TopicPartition, NavigableMap<Long, Long>> ackRegistry;
private transient Map<TopicPartition, Long> failureRegistry;
private Config.TopologyReliabilityMode topologyReliabilityMode =
@@ -101,7 +101,6 @@ public class KafkaSpout<K, V> extends BaseRichSpout
* @param kafkaConsumerFactory kafka consumer factory
* @param topicPatternProvider provider of the topic matching pattern
*/
- @SuppressWarnings("WeakerAccess")
public KafkaSpout(KafkaConsumerFactory<K, V> kafkaConsumerFactory,
TopicPatternProvider topicPatternProvider) {
this.kafkaConsumerFactory = kafkaConsumerFactory;
@@ -130,6 +129,14 @@ public class KafkaSpout<K, V> extends BaseRichSpout
}
@Override
+ public void open(Map<String, Object> conf, TopologyContext context,
+ SpoutOutputCollector aCollector) {
+ this.collector = aCollector;
+ this.topologyContext = context;
+ initialize(conf);
+ }
+
+ @Override
public void initState(State<TopicPartition, Long> aState) {
this.state = aState;
LOG.info("initial state {}", aState);
@@ -139,32 +146,10 @@ public class KafkaSpout<K, V> extends BaseRichSpout
public void preSave(String checkpointId) {
LOG.info("save state {}", state);
consumer.commitAsync(state.entrySet()
- .stream()
- .collect(Collectors.toMap(Map.Entry::getKey, entry ->
- new OffsetAndMetadata(entry.getValue() + 1))), null);
- }
-
- @Override
- public void open(Map<String, Object> conf, TopologyContext context,
- SpoutOutputCollector aCollector) {
- this.collector = aCollector;
- this.topologyContext = context;
- this.topologyReliabilityMode = Config.TopologyReliabilityMode.valueOf(
- conf.get(Config.TOPOLOGY_RELIABILITY_MODE).toString());
- metricsIntervalInSecs = (int) ((SystemConfig) SingletonRegistry.INSTANCE
- .getSingleton(SystemConfig.HERON_SYSTEM_CONFIG))
- .getHeronMetricsExportInterval().getSeconds();
- consumer = kafkaConsumerFactory.create();
- if (topicNames != null) {
- consumer.subscribe(topicNames, new KafkaConsumerRebalanceListener());
- } else {
- consumer.subscribe(topicPatternProvider.create(), new
KafkaConsumerRebalanceListener());
- }
- buffer = new ArrayDeque<>(500);
- ackRegistry = new ConcurrentHashMap<>();
- failureRegistry = new ConcurrentHashMap<>();
- assignedPartitions = new HashSet<>();
- reportedMetrics = new HashSet<>();
+ .stream()
+ .collect(Collectors.toMap(Map.Entry::getKey,
+ entry -> new OffsetAndMetadata(entry.getValue() + 1))),
+ null);
}
@Override
@@ -192,7 +177,6 @@ public class KafkaSpout<K, V> extends BaseRichSpout
@Override
public void activate() {
- super.activate();
if (!assignedPartitions.isEmpty()) {
consumer.resume(assignedPartitions);
}
@@ -200,20 +184,23 @@ public class KafkaSpout<K, V> extends BaseRichSpout
@Override
public void deactivate() {
- super.deactivate();
if (!assignedPartitions.isEmpty()) {
consumer.pause(assignedPartitions);
}
}
+ @SuppressWarnings("Duplicates")
@Override
public void ack(Object msgId) {
- super.ack(msgId);
long start = System.nanoTime();
ConsumerRecordMessageId consumerRecordMessageId =
(ConsumerRecordMessageId) msgId;
TopicPartition topicPartition =
consumerRecordMessageId.getTopicPartition();
+ if (!assignedPartitions.contains(topicPartition)) {
+ LOG.info("ignore {} because it's been revoked", consumerRecordMessageId);
+ return;
+ }
long offset = consumerRecordMessageId.getOffset();
- ackRegistry.putIfAbsent(topicPartition, new ConcurrentSkipListMap<>());
+ ackRegistry.putIfAbsent(topicPartition, new TreeMap<>());
NavigableMap<Long, Long> navigableMap = ackRegistry.get(topicPartition);
Map.Entry<Long, Long> floorRange = navigableMap.floorEntry(offset);
@@ -226,7 +213,7 @@ public class KafkaSpout<K, V> extends BaseRichSpout
//the ack is for a message that has already been acknowledged.
//This happens when a failed tuple has caused
- //Kafka consumer to seek back to earlier position and some messages are
replayed.
+ //Kafka consumer to seek back to earlier position, and some messages are
replayed.
if ((offset >= floorBottom && offset <= floorTop)
|| (offset >= ceilingBottom && offset <= ceilingTop)) {
return;
@@ -254,12 +241,16 @@ public class KafkaSpout<K, V> extends BaseRichSpout
@Override
public void fail(Object msgId) {
- super.fail(msgId);
ConsumerRecordMessageId consumerRecordMessageId =
(ConsumerRecordMessageId) msgId;
TopicPartition topicPartition =
consumerRecordMessageId.getTopicPartition();
+ if (!assignedPartitions.contains(topicPartition)) {
+ LOG.info("ignore {} because it's been revoked", consumerRecordMessageId);
+ return;
+ }
long offset = consumerRecordMessageId.getOffset();
- failureRegistry.put(topicPartition,
Math.min(failureRegistry.getOrDefault(topicPartition,
- Long.MAX_VALUE), offset));
+ failureRegistry.put(topicPartition,
+ Math.min(failureRegistry.getOrDefault(topicPartition,
+ Long.MAX_VALUE), offset));
LOG.warn("fail {}", msgId);
}
@@ -275,27 +266,64 @@ public class KafkaSpout<K, V> extends BaseRichSpout
new Fields(consumerRecordTransformer.getFieldNames(s))));
}
+ @Override
+ public Map<String, Object> getComponentConfiguration() {
+ return null;
+ }
+
+ private void initialize(Map<String, Object> conf) {
+ topologyReliabilityMode = Config.TopologyReliabilityMode.valueOf(
+ conf.get(Config.TOPOLOGY_RELIABILITY_MODE).toString());
+ metricsIntervalInSecs = (int) ((SystemConfig) SingletonRegistry.INSTANCE
+ .getSingleton(SystemConfig.HERON_SYSTEM_CONFIG))
+ .getHeronMetricsExportInterval().getSeconds();
+ consumer = kafkaConsumerFactory.create();
+ if (topicNames != null) {
+ consumer.subscribe(topicNames, new KafkaConsumerRebalanceListener());
+ } else {
+ consumer.subscribe(topicPatternProvider.create(), new
KafkaConsumerRebalanceListener());
+ }
+ buffer = new ArrayDeque<>(500);
+ ackRegistry = new HashMap<>();
+ failureRegistry = new HashMap<>();
+ assignedPartitions = new HashSet<>();
+ reportedMetrics = new HashSet<>();
+ }
+
private void emitConsumerRecord(ConsumerRecord<K, V> record) {
- consumerRecordTransformer.transform(record)
- .forEach((s, objects) -> {
- if (topologyReliabilityMode !=
Config.TopologyReliabilityMode.ATLEAST_ONCE) {
- collector.emit(s, objects);
- //only in effective once mode, we need to track the offset of the
record that is just
- //emitted into the topology
- if (topologyReliabilityMode
- == Config.TopologyReliabilityMode.EFFECTIVELY_ONCE) {
- state.put(new TopicPartition(record.topic(), record.partition()),
- record.offset());
- }
- } else {
- //build message id based on topic, partition, offset of the
consumer record
- ConsumerRecordMessageId consumerRecordMessageId =
- new ConsumerRecordMessageId(new TopicPartition(record.topic(),
- record.partition()), record.offset());
- //emit tuple with the message id
- collector.emit(s, objects, consumerRecordMessageId);
- }
- });
+ Map<String, List<Object>> tupleByStream =
consumerRecordTransformer.transform(record);
+ //nothing worth emitting out of this record,
+ //so immediately acknowledge it if in ATLEAST_ONCE mode
+ if (tupleByStream.isEmpty() && topologyReliabilityMode
+ == Config.TopologyReliabilityMode.ATLEAST_ONCE) {
+ ack(new ConsumerRecordMessageId(new TopicPartition(record.topic(),
record.partition()),
+ record.offset()));
+ return;
+ }
+ tupleByStream.forEach((s, objects) -> {
+ switch (topologyReliabilityMode) {
+ case ATMOST_ONCE:
+ collector.emit(s, objects);
+ break;
+ case ATLEAST_ONCE:
+ //build message id based on topic, partition, offset of the consumer
record
+ ConsumerRecordMessageId consumerRecordMessageId =
+ new ConsumerRecordMessageId(new TopicPartition(record.topic(),
+ record.partition()), record.offset());
+ //emit tuple with the message id
+ collector.emit(s, objects, consumerRecordMessageId);
+ break;
+ case EFFECTIVELY_ONCE:
+ collector.emit(s, objects);
+ //only in effective once mode, we need to track the offset of the
record //that is just
+ //emitted into the topology
+ state.put(new TopicPartition(record.topic(), record.partition()),
+ record.offset());
+ break;
+ default:
+ LOG.warn("unsupported reliability mode {}", topologyReliabilityMode);
+ }
+ });
}
private void rewindAndDiscardAck(TopicPartition topicPartition,
@@ -304,14 +332,16 @@ public class KafkaSpout<K, V> extends BaseRichSpout
long earliestFailedOffset = failureRegistry.get(topicPartition);
//rewind back to the earliest failed offset
consumer.seek(topicPartition, earliestFailedOffset);
- //discard the ack whose offset is greater than the earliest failed
offset if there
+ //discard the ack whose offset is greater than the earliest failed offset
+ //if there
//is any because we've rewound the consumer back
SortedMap<Long, Long> sortedMap =
ackRanges.headMap(earliestFailedOffset);
if (!sortedMap.isEmpty()) {
- sortedMap.put(sortedMap.lastKey(), Math.min(earliestFailedOffset,
- sortedMap.get(sortedMap.lastKey())));
+ sortedMap.put(sortedMap.lastKey(),
+ Math.min(earliestFailedOffset,
+ sortedMap.get(sortedMap.lastKey())));
}
- ackRegistry.put(topicPartition, new ConcurrentSkipListMap<>(sortedMap));
+ ackRegistry.put(topicPartition, new TreeMap<>(sortedMap));
//failure for this partition has been dealt with
failureRegistry.remove(topicPartition);
}
@@ -319,7 +349,8 @@ public class KafkaSpout<K, V> extends BaseRichSpout
private void manualCommit(TopicPartition topicPartition, NavigableMap<Long,
Long> ackRanges) {
//the first entry in the acknowledgement registry keeps track of the
lowest possible
- //offset that can be committed
+ //offset
+ //that can be committed
Map.Entry<Long, Long> firstEntry = ackRanges.firstEntry();
if (firstEntry != null) {
consumer.commitAsync(Collections.singletonMap(topicPartition,
@@ -330,11 +361,8 @@ public class KafkaSpout<K, V> extends BaseRichSpout
private Iterable<ConsumerRecord<K, V>> poll() {
ConsumerRecords<K, V> records = consumer.poll(Duration.ofMillis(200));
if (!records.isEmpty()) {
- //since the Kafka Consumer metrics are built gradually based on the
partitions it consumes,
- //we need to periodically check whether there's any new metrics to
register after
- //each polling.
if (System.currentTimeMillis() - previousKafkaMetricsUpdatedTimestamp
- > metricsIntervalInSecs) {
+ > metricsIntervalInSecs * 1000) {
registerConsumerMetrics();
previousKafkaMetricsUpdatedTimestamp = System.currentTimeMillis();
}
@@ -421,55 +449,35 @@ public class KafkaSpout<K, V> extends BaseRichSpout
}
}
- class KafkaConsumerRebalanceListener implements ConsumerRebalanceListener {
+ public class KafkaConsumerRebalanceListener implements
ConsumerRebalanceListener {
@Override
public void onPartitionsRevoked(Collection<TopicPartition> collection) {
- assignedPartitions.removeAll(collection);
+ LOG.info("revoked partitions {}", collection);
if (topologyReliabilityMode ==
Config.TopologyReliabilityMode.ATLEAST_ONCE) {
collection.forEach(topicPartition -> {
- NavigableMap<Long, Long> navigableMap =
ackRegistry.remove(topicPartition);
- if (navigableMap != null) {
- Map.Entry<Long, Long> entry = navigableMap.firstEntry();
- if (entry != null) {
- consumer.commitAsync(Collections.singletonMap(topicPartition,
- new OffsetAndMetadata(Math.min(
- failureRegistry.getOrDefault(topicPartition,
Long.MAX_VALUE),
- entry.getValue()) + 1)), null);
- }
- }
+ ackRegistry.remove(topicPartition);
+
failureRegistry.remove(topicPartition);
});
} else if (topologyReliabilityMode ==
Config.TopologyReliabilityMode.EFFECTIVELY_ONCE) {
collection.forEach(topicPartition -> state.remove(topicPartition));
}
+ assignedPartitions.removeAll(collection);
}
@Override
public void onPartitionsAssigned(Collection<TopicPartition> collection) {
- assignedPartitions.addAll(collection);
- if (topologyReliabilityMode ==
Config.TopologyReliabilityMode.ATLEAST_ONCE) {
- collection.forEach(topicPartition -> {
- try {
- long nextRecordPosition = consumer.position(topicPartition,
- Duration.ofSeconds(5));
- ackRegistry.put(topicPartition, new ConcurrentSkipListMap<>(
- Collections.singletonMap(nextRecordPosition - 1,
nextRecordPosition - 1)
- ));
- } catch (TimeoutException e) {
- LOG.warn("can not get the position of the next record to consume
for partition {}",
- topicPartition);
- ackRegistry.remove(topicPartition);
- }
- failureRegistry.remove(topicPartition);
- });
- } else if (topologyReliabilityMode ==
Config.TopologyReliabilityMode.EFFECTIVELY_ONCE) {
+
+ LOG.info("assigned partitions {}", collection);
+ if (topologyReliabilityMode ==
Config.TopologyReliabilityMode.EFFECTIVELY_ONCE) {
collection.forEach(topicPartition -> {
if (state.containsKey(topicPartition)) {
consumer.seek(topicPartition, state.get(topicPartition));
}
});
}
+ assignedPartitions.addAll(collection);
}
}
}
diff --git
a/contrib/spouts/kafka/java/heron-kafka-spout-parent/heron-kafka-spout/src/test/java/org/apache/heron/spouts/kafka/KafkaSpoutTest.java
b/contrib/spouts/kafka/java/heron-kafka-spout-parent/heron-kafka-spout/src/test/java/org/apache/heron/spouts/kafka/KafkaSpoutTest.java
index dce386a..0c8a97a 100644
---
a/contrib/spouts/kafka/java/heron-kafka-spout-parent/heron-kafka-spout/src/test/java/org/apache/heron/spouts/kafka/KafkaSpoutTest.java
+++
b/contrib/spouts/kafka/java/heron-kafka-spout-parent/heron-kafka-spout/src/test/java/org/apache/heron/spouts/kafka/KafkaSpoutTest.java
@@ -38,6 +38,7 @@ import org.mockito.runners.MockitoJUnitRunner;
import org.apache.heron.api.Config;
import org.apache.heron.api.metric.IMetric;
import org.apache.heron.api.spout.SpoutOutputCollector;
+import org.apache.heron.api.state.State;
import org.apache.heron.api.topology.OutputFieldsDeclarer;
import org.apache.heron.api.topology.TopologyContext;
import org.apache.heron.api.tuple.Fields;
@@ -55,6 +56,7 @@ import org.apache.kafka.common.TopicPartition;
import static org.apache.heron.api.Config.TopologyReliabilityMode.ATLEAST_ONCE;
import static org.apache.heron.api.Config.TopologyReliabilityMode.ATMOST_ONCE;
+import static
org.apache.heron.api.Config.TopologyReliabilityMode.EFFECTIVELY_ONCE;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
@@ -93,6 +95,8 @@ public class KafkaSpoutTest {
private ArgumentCaptor<List<Object>> listArgumentCaptor;
@Captor
private ArgumentCaptor<ConsumerRebalanceListener>
consumerRebalanceListenerArgumentCaptor;
+ @Mock
+ private State<TopicPartition, Long> state;
@BeforeClass
public static void setUpAll() {
@@ -194,6 +198,11 @@ public class KafkaSpoutTest {
kafkaSpout.open(Collections.singletonMap(Config.TOPOLOGY_RELIABILITY_MODE,
ATLEAST_ONCE.name()), topologyContext, collector);
+ verify(consumer).subscribe(eq(Collections.singleton(DUMMY_TOPIC_NAME)),
+ consumerRebalanceListenerArgumentCaptor.capture());
+ ConsumerRebalanceListener consumerRebalanceListener =
+ consumerRebalanceListenerArgumentCaptor.getValue();
+
consumerRebalanceListener.onPartitionsAssigned(Collections.singleton(topicPartition));
//poll the topic
kafkaSpout.nextTuple();
//emit all of the five records
@@ -228,6 +237,11 @@ public class KafkaSpoutTest {
kafkaSpout.open(Collections.singletonMap(Config.TOPOLOGY_RELIABILITY_MODE,
ATLEAST_ONCE.name()), topologyContext, collector);
+ verify(consumer).subscribe(eq(Collections.singleton(DUMMY_TOPIC_NAME)),
+ consumerRebalanceListenerArgumentCaptor.capture());
+ ConsumerRebalanceListener consumerRebalanceListener =
+ consumerRebalanceListenerArgumentCaptor.getValue();
+
consumerRebalanceListener.onPartitionsAssigned(Collections.singleton(topicPartition));
//poll the topic
kafkaSpout.nextTuple();
//emit all of the five records
@@ -265,23 +279,20 @@ public class KafkaSpoutTest {
@Test
public void consumerRebalanceListener() {
+ kafkaSpout.initState(state);
when(kafkaConsumerFactory.create()).thenReturn(consumer);
kafkaSpout.open(Collections.singletonMap(Config.TOPOLOGY_RELIABILITY_MODE,
- ATLEAST_ONCE.name()), topologyContext, collector);
+ EFFECTIVELY_ONCE.name()), topologyContext, collector);
verify(consumer).subscribe(eq(Collections.singleton(DUMMY_TOPIC_NAME)),
consumerRebalanceListenerArgumentCaptor.capture());
ConsumerRebalanceListener consumerRebalanceListener =
consumerRebalanceListenerArgumentCaptor.getValue();
TopicPartition topicPartition = new TopicPartition(DUMMY_TOPIC_NAME, 0);
+ when(state.get(topicPartition)).thenReturn(5L);
+ when(state.containsKey(topicPartition)).thenReturn(true);
consumerRebalanceListener.onPartitionsAssigned(Collections.singleton(topicPartition));
- verify(consumer).position(topicPartition, Duration.ofSeconds(5));
-
- kafkaSpout.ack(new KafkaSpout.ConsumerRecordMessageId(topicPartition, 0));
- kafkaSpout.ack(new KafkaSpout.ConsumerRecordMessageId(topicPartition, 1));
-
consumerRebalanceListener.onPartitionsRevoked(Collections.singleton(topicPartition));
- verify(consumer).commitAsync(Collections.singletonMap(topicPartition,
- new OffsetAndMetadata(2)), null);
+ verify(consumer).seek(topicPartition, 5L);
}
@Test