This is an automated email from the ASF dual-hosted git repository.

nwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-heron.git


The following commit(s) were added to refs/heads/master by this push:
     new a49fc55  Bugfix kafka spout (#3275)
a49fc55 is described below

commit a49fc55005c86537a7516b682ea5d8fe748cdec8
Author: SiMing Weng <[email protected]>
AuthorDate: Fri May 24 13:31:03 2019 -0400

    Bugfix kafka spout (#3275)
    
    * update KafkaSpout
    
    * Revert "update KafkaSpout"
    
    This reverts commit dc64e9ad
    
    * no concurrent collection is needed
    fix a bug that filtered consumer record is not acknowledged immediately
    discard the acknowledgement or failure when the partition has been revoked
    move code around to reduce method complexity
    
    * refactor to switch statement according to review comment
    
    * add return for better readability
---
 .../org/apache/heron/spouts/kafka/KafkaSpout.java  | 202 +++++++++++----------
 .../apache/heron/spouts/kafka/KafkaSpoutTest.java  |  27 ++-
 2 files changed, 124 insertions(+), 105 deletions(-)

diff --git 
a/contrib/spouts/kafka/java/heron-kafka-spout-parent/heron-kafka-spout/src/main/java/org/apache/heron/spouts/kafka/KafkaSpout.java
 
b/contrib/spouts/kafka/java/heron-kafka-spout-parent/heron-kafka-spout/src/main/java/org/apache/heron/spouts/kafka/KafkaSpout.java
index bc13d0a..215c84c 100644
--- 
a/contrib/spouts/kafka/java/heron-kafka-spout-parent/heron-kafka-spout/src/main/java/org/apache/heron/spouts/kafka/KafkaSpout.java
+++ 
b/contrib/spouts/kafka/java/heron-kafka-spout-parent/heron-kafka-spout/src/main/java/org/apache/heron/spouts/kafka/KafkaSpout.java
@@ -22,14 +22,15 @@ import java.time.Duration;
 import java.util.ArrayDeque;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.HashSet;
+import java.util.List;
 import java.util.Map;
 import java.util.NavigableMap;
 import java.util.Queue;
 import java.util.Set;
 import java.util.SortedMap;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.TreeMap;
 import java.util.stream.Collectors;
 
 import org.slf4j.Logger;
@@ -52,7 +53,6 @@ import org.apache.kafka.clients.consumer.ConsumerRecords;
 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 import org.apache.kafka.common.MetricName;
 import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.errors.TimeoutException;
 
 /**
  * Kafka spout to consume data from Kafka topic(s), each record is converted 
into a tuple via {@link ConsumerRecordTransformer}, and emitted into a topology
@@ -74,8 +74,8 @@ public class KafkaSpout<K, V> extends BaseRichSpout
   private transient TopologyContext topologyContext;
   private transient Queue<ConsumerRecord<K, V>> buffer;
   private transient Consumer<K, V> consumer;
-  private transient Set<TopicPartition> assignedPartitions;
   private transient Set<MetricName> reportedMetrics;
+  private transient Set<TopicPartition> assignedPartitions;
   private transient Map<TopicPartition, NavigableMap<Long, Long>> ackRegistry;
   private transient Map<TopicPartition, Long> failureRegistry;
   private Config.TopologyReliabilityMode topologyReliabilityMode =
@@ -101,7 +101,6 @@ public class KafkaSpout<K, V> extends BaseRichSpout
    * @param kafkaConsumerFactory kafka consumer factory
    * @param topicPatternProvider provider of the topic matching pattern
    */
-  @SuppressWarnings("WeakerAccess")
   public KafkaSpout(KafkaConsumerFactory<K, V> kafkaConsumerFactory,
                     TopicPatternProvider topicPatternProvider) {
     this.kafkaConsumerFactory = kafkaConsumerFactory;
@@ -130,6 +129,14 @@ public class KafkaSpout<K, V> extends BaseRichSpout
   }
 
   @Override
+  public void open(Map<String, Object> conf, TopologyContext context,
+                   SpoutOutputCollector aCollector) {
+    this.collector = aCollector;
+    this.topologyContext = context;
+    initialize(conf);
+  }
+
+  @Override
   public void initState(State<TopicPartition, Long> aState) {
     this.state = aState;
     LOG.info("initial state {}", aState);
@@ -139,32 +146,10 @@ public class KafkaSpout<K, V> extends BaseRichSpout
   public void preSave(String checkpointId) {
     LOG.info("save state {}", state);
     consumer.commitAsync(state.entrySet()
-        .stream()
-        .collect(Collectors.toMap(Map.Entry::getKey, entry ->
-            new OffsetAndMetadata(entry.getValue() + 1))), null);
-  }
-
-  @Override
-  public void open(Map<String, Object> conf, TopologyContext context,
-                   SpoutOutputCollector aCollector) {
-    this.collector = aCollector;
-    this.topologyContext = context;
-    this.topologyReliabilityMode = Config.TopologyReliabilityMode.valueOf(
-        conf.get(Config.TOPOLOGY_RELIABILITY_MODE).toString());
-    metricsIntervalInSecs = (int) ((SystemConfig) SingletonRegistry.INSTANCE
-        .getSingleton(SystemConfig.HERON_SYSTEM_CONFIG))
-        .getHeronMetricsExportInterval().getSeconds();
-    consumer = kafkaConsumerFactory.create();
-    if (topicNames != null) {
-      consumer.subscribe(topicNames, new KafkaConsumerRebalanceListener());
-    } else {
-      consumer.subscribe(topicPatternProvider.create(), new 
KafkaConsumerRebalanceListener());
-    }
-    buffer = new ArrayDeque<>(500);
-    ackRegistry = new ConcurrentHashMap<>();
-    failureRegistry = new ConcurrentHashMap<>();
-    assignedPartitions = new HashSet<>();
-    reportedMetrics = new HashSet<>();
+            .stream()
+            .collect(Collectors.toMap(Map.Entry::getKey,
+                entry -> new OffsetAndMetadata(entry.getValue() + 1))),
+        null);
   }
 
   @Override
@@ -192,7 +177,6 @@ public class KafkaSpout<K, V> extends BaseRichSpout
 
   @Override
   public void activate() {
-    super.activate();
     if (!assignedPartitions.isEmpty()) {
       consumer.resume(assignedPartitions);
     }
@@ -200,20 +184,23 @@ public class KafkaSpout<K, V> extends BaseRichSpout
 
   @Override
   public void deactivate() {
-    super.deactivate();
     if (!assignedPartitions.isEmpty()) {
       consumer.pause(assignedPartitions);
     }
   }
 
+  @SuppressWarnings("Duplicates")
   @Override
   public void ack(Object msgId) {
-    super.ack(msgId);
     long start = System.nanoTime();
     ConsumerRecordMessageId consumerRecordMessageId = 
(ConsumerRecordMessageId) msgId;
     TopicPartition topicPartition = 
consumerRecordMessageId.getTopicPartition();
+    if (!assignedPartitions.contains(topicPartition)) {
+      LOG.info("ignore {} because it's been revoked", consumerRecordMessageId);
+      return;
+    }
     long offset = consumerRecordMessageId.getOffset();
-    ackRegistry.putIfAbsent(topicPartition, new ConcurrentSkipListMap<>());
+    ackRegistry.putIfAbsent(topicPartition, new TreeMap<>());
     NavigableMap<Long, Long> navigableMap = ackRegistry.get(topicPartition);
 
     Map.Entry<Long, Long> floorRange = navigableMap.floorEntry(offset);
@@ -226,7 +213,7 @@ public class KafkaSpout<K, V> extends BaseRichSpout
 
     //the ack is for a message that has already been acknowledged.
     //This happens when a failed tuple has caused
-    //Kafka consumer to seek back to earlier position and some messages are 
replayed.
+    //Kafka consumer to seek back to earlier position, and some messages are 
replayed.
     if ((offset >= floorBottom && offset <= floorTop)
         || (offset >= ceilingBottom && offset <= ceilingTop)) {
       return;
@@ -254,12 +241,16 @@ public class KafkaSpout<K, V> extends BaseRichSpout
 
   @Override
   public void fail(Object msgId) {
-    super.fail(msgId);
     ConsumerRecordMessageId consumerRecordMessageId = 
(ConsumerRecordMessageId) msgId;
     TopicPartition topicPartition = 
consumerRecordMessageId.getTopicPartition();
+    if (!assignedPartitions.contains(topicPartition)) {
+      LOG.info("ignore {} because it's been revoked", consumerRecordMessageId);
+      return;
+    }
     long offset = consumerRecordMessageId.getOffset();
-    failureRegistry.put(topicPartition, 
Math.min(failureRegistry.getOrDefault(topicPartition,
-        Long.MAX_VALUE), offset));
+    failureRegistry.put(topicPartition,
+        Math.min(failureRegistry.getOrDefault(topicPartition,
+            Long.MAX_VALUE), offset));
     LOG.warn("fail {}", msgId);
   }
 
@@ -275,27 +266,64 @@ public class KafkaSpout<K, V> extends BaseRichSpout
             new Fields(consumerRecordTransformer.getFieldNames(s))));
   }
 
+  @Override
+  public Map<String, Object> getComponentConfiguration() {
+    return null;
+  }
+
+  private void initialize(Map<String, Object> conf) {
+    topologyReliabilityMode = Config.TopologyReliabilityMode.valueOf(
+        conf.get(Config.TOPOLOGY_RELIABILITY_MODE).toString());
+    metricsIntervalInSecs = (int) ((SystemConfig) SingletonRegistry.INSTANCE
+        .getSingleton(SystemConfig.HERON_SYSTEM_CONFIG))
+        .getHeronMetricsExportInterval().getSeconds();
+    consumer = kafkaConsumerFactory.create();
+    if (topicNames != null) {
+      consumer.subscribe(topicNames, new KafkaConsumerRebalanceListener());
+    } else {
+      consumer.subscribe(topicPatternProvider.create(), new 
KafkaConsumerRebalanceListener());
+    }
+    buffer = new ArrayDeque<>(500);
+    ackRegistry = new HashMap<>();
+    failureRegistry = new HashMap<>();
+    assignedPartitions = new HashSet<>();
+    reportedMetrics = new HashSet<>();
+  }
+
   private void emitConsumerRecord(ConsumerRecord<K, V> record) {
-    consumerRecordTransformer.transform(record)
-        .forEach((s, objects) -> {
-          if (topologyReliabilityMode != 
Config.TopologyReliabilityMode.ATLEAST_ONCE) {
-            collector.emit(s, objects);
-            //only in effective once mode, we need to track the offset of the 
record that is just
-            //emitted into the topology
-            if (topologyReliabilityMode
-                == Config.TopologyReliabilityMode.EFFECTIVELY_ONCE) {
-              state.put(new TopicPartition(record.topic(), record.partition()),
-                  record.offset());
-            }
-          } else {
-            //build message id based on topic, partition, offset of the 
consumer record
-            ConsumerRecordMessageId consumerRecordMessageId =
-                new ConsumerRecordMessageId(new TopicPartition(record.topic(),
-                    record.partition()), record.offset());
-            //emit tuple with the message id
-            collector.emit(s, objects, consumerRecordMessageId);
-          }
-        });
+    Map<String, List<Object>> tupleByStream = 
consumerRecordTransformer.transform(record);
+    //nothing worth emitting out of this record,
+    //so immediately acknowledge it if in ATLEAST_ONCE mode
+    if (tupleByStream.isEmpty() && topologyReliabilityMode
+        == Config.TopologyReliabilityMode.ATLEAST_ONCE) {
+      ack(new ConsumerRecordMessageId(new TopicPartition(record.topic(), 
record.partition()),
+          record.offset()));
+      return;
+    }
+    tupleByStream.forEach((s, objects) -> {
+      switch (topologyReliabilityMode) {
+        case ATMOST_ONCE:
+          collector.emit(s, objects);
+          break;
+        case ATLEAST_ONCE:
+          //build message id based on topic, partition, offset of the consumer 
record
+          ConsumerRecordMessageId consumerRecordMessageId =
+              new ConsumerRecordMessageId(new TopicPartition(record.topic(),
+                  record.partition()), record.offset());
+          //emit tuple with the message id
+          collector.emit(s, objects, consumerRecordMessageId);
+          break;
+        case EFFECTIVELY_ONCE:
+          collector.emit(s, objects);
+          //only in effective once mode, we need to track the offset of the 
record //that is just
+          //emitted into the topology
+          state.put(new TopicPartition(record.topic(), record.partition()),
+              record.offset());
+          break;
+        default:
+          LOG.warn("unsupported reliability mode {}", topologyReliabilityMode);
+      }
+    });
   }
 
   private void rewindAndDiscardAck(TopicPartition topicPartition,
@@ -304,14 +332,16 @@ public class KafkaSpout<K, V> extends BaseRichSpout
       long earliestFailedOffset = failureRegistry.get(topicPartition);
       //rewind back to the earliest failed offset
       consumer.seek(topicPartition, earliestFailedOffset);
-      //discard the ack whose offset is greater than the earliest failed 
offset if there
+      //discard the ack whose offset is greater than the earliest failed offset
+      //if there
       //is any because we've rewound the consumer back
       SortedMap<Long, Long> sortedMap = 
ackRanges.headMap(earliestFailedOffset);
       if (!sortedMap.isEmpty()) {
-        sortedMap.put(sortedMap.lastKey(), Math.min(earliestFailedOffset,
-            sortedMap.get(sortedMap.lastKey())));
+        sortedMap.put(sortedMap.lastKey(),
+            Math.min(earliestFailedOffset,
+                sortedMap.get(sortedMap.lastKey())));
       }
-      ackRegistry.put(topicPartition, new ConcurrentSkipListMap<>(sortedMap));
+      ackRegistry.put(topicPartition, new TreeMap<>(sortedMap));
       //failure for this partition has been dealt with
       failureRegistry.remove(topicPartition);
     }
@@ -319,7 +349,8 @@ public class KafkaSpout<K, V> extends BaseRichSpout
 
   private void manualCommit(TopicPartition topicPartition, NavigableMap<Long, 
Long> ackRanges) {
     //the first entry in the acknowledgement registry keeps track of the 
lowest possible
-    //offset that can be committed
+    //offset
+    //that can be committed
     Map.Entry<Long, Long> firstEntry = ackRanges.firstEntry();
     if (firstEntry != null) {
       consumer.commitAsync(Collections.singletonMap(topicPartition,
@@ -330,11 +361,8 @@ public class KafkaSpout<K, V> extends BaseRichSpout
   private Iterable<ConsumerRecord<K, V>> poll() {
     ConsumerRecords<K, V> records = consumer.poll(Duration.ofMillis(200));
     if (!records.isEmpty()) {
-      //since the Kafka Consumer metrics are built gradually based on the 
partitions it consumes,
-      //we need to periodically check whether there's any new metrics to 
register after
-      //each polling.
       if (System.currentTimeMillis() - previousKafkaMetricsUpdatedTimestamp
-          > metricsIntervalInSecs) {
+          > metricsIntervalInSecs * 1000) {
         registerConsumerMetrics();
         previousKafkaMetricsUpdatedTimestamp = System.currentTimeMillis();
       }
@@ -421,55 +449,35 @@ public class KafkaSpout<K, V> extends BaseRichSpout
     }
   }
 
-  class KafkaConsumerRebalanceListener implements ConsumerRebalanceListener {
+  public class KafkaConsumerRebalanceListener implements 
ConsumerRebalanceListener {
 
     @Override
     public void onPartitionsRevoked(Collection<TopicPartition> collection) {
-      assignedPartitions.removeAll(collection);
+      LOG.info("revoked partitions {}", collection);
       if (topologyReliabilityMode == 
Config.TopologyReliabilityMode.ATLEAST_ONCE) {
         collection.forEach(topicPartition -> {
-          NavigableMap<Long, Long> navigableMap = 
ackRegistry.remove(topicPartition);
-          if (navigableMap != null) {
-            Map.Entry<Long, Long> entry = navigableMap.firstEntry();
-            if (entry != null) {
-              consumer.commitAsync(Collections.singletonMap(topicPartition,
-                  new OffsetAndMetadata(Math.min(
-                      failureRegistry.getOrDefault(topicPartition, 
Long.MAX_VALUE),
-                      entry.getValue()) + 1)), null);
-            }
-          }
+          ackRegistry.remove(topicPartition);
+
           failureRegistry.remove(topicPartition);
         });
       } else if (topologyReliabilityMode == 
Config.TopologyReliabilityMode.EFFECTIVELY_ONCE) {
         collection.forEach(topicPartition -> state.remove(topicPartition));
       }
+      assignedPartitions.removeAll(collection);
     }
 
     @Override
     public void onPartitionsAssigned(Collection<TopicPartition> collection) {
-      assignedPartitions.addAll(collection);
-      if (topologyReliabilityMode == 
Config.TopologyReliabilityMode.ATLEAST_ONCE) {
-        collection.forEach(topicPartition -> {
-          try {
-            long nextRecordPosition = consumer.position(topicPartition,
-                Duration.ofSeconds(5));
-            ackRegistry.put(topicPartition, new ConcurrentSkipListMap<>(
-                Collections.singletonMap(nextRecordPosition - 1, 
nextRecordPosition - 1)
-            ));
-          } catch (TimeoutException e) {
-            LOG.warn("can not get the position of the next record to consume 
for partition {}",
-                topicPartition);
-            ackRegistry.remove(topicPartition);
-          }
-          failureRegistry.remove(topicPartition);
-        });
-      } else if (topologyReliabilityMode == 
Config.TopologyReliabilityMode.EFFECTIVELY_ONCE) {
+
+      LOG.info("assigned partitions {}", collection);
+      if (topologyReliabilityMode == 
Config.TopologyReliabilityMode.EFFECTIVELY_ONCE) {
         collection.forEach(topicPartition -> {
           if (state.containsKey(topicPartition)) {
             consumer.seek(topicPartition, state.get(topicPartition));
           }
         });
       }
+      assignedPartitions.addAll(collection);
     }
   }
 }
diff --git 
a/contrib/spouts/kafka/java/heron-kafka-spout-parent/heron-kafka-spout/src/test/java/org/apache/heron/spouts/kafka/KafkaSpoutTest.java
 
b/contrib/spouts/kafka/java/heron-kafka-spout-parent/heron-kafka-spout/src/test/java/org/apache/heron/spouts/kafka/KafkaSpoutTest.java
index dce386a..0c8a97a 100644
--- 
a/contrib/spouts/kafka/java/heron-kafka-spout-parent/heron-kafka-spout/src/test/java/org/apache/heron/spouts/kafka/KafkaSpoutTest.java
+++ 
b/contrib/spouts/kafka/java/heron-kafka-spout-parent/heron-kafka-spout/src/test/java/org/apache/heron/spouts/kafka/KafkaSpoutTest.java
@@ -38,6 +38,7 @@ import org.mockito.runners.MockitoJUnitRunner;
 import org.apache.heron.api.Config;
 import org.apache.heron.api.metric.IMetric;
 import org.apache.heron.api.spout.SpoutOutputCollector;
+import org.apache.heron.api.state.State;
 import org.apache.heron.api.topology.OutputFieldsDeclarer;
 import org.apache.heron.api.topology.TopologyContext;
 import org.apache.heron.api.tuple.Fields;
@@ -55,6 +56,7 @@ import org.apache.kafka.common.TopicPartition;
 
 import static org.apache.heron.api.Config.TopologyReliabilityMode.ATLEAST_ONCE;
 import static org.apache.heron.api.Config.TopologyReliabilityMode.ATMOST_ONCE;
+import static 
org.apache.heron.api.Config.TopologyReliabilityMode.EFFECTIVELY_ONCE;
 
 import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
@@ -93,6 +95,8 @@ public class KafkaSpoutTest {
   private ArgumentCaptor<List<Object>> listArgumentCaptor;
   @Captor
   private ArgumentCaptor<ConsumerRebalanceListener> 
consumerRebalanceListenerArgumentCaptor;
+  @Mock
+  private State<TopicPartition, Long> state;
 
   @BeforeClass
   public static void setUpAll() {
@@ -194,6 +198,11 @@ public class KafkaSpoutTest {
 
     kafkaSpout.open(Collections.singletonMap(Config.TOPOLOGY_RELIABILITY_MODE,
         ATLEAST_ONCE.name()), topologyContext, collector);
+    verify(consumer).subscribe(eq(Collections.singleton(DUMMY_TOPIC_NAME)),
+        consumerRebalanceListenerArgumentCaptor.capture());
+    ConsumerRebalanceListener consumerRebalanceListener =
+        consumerRebalanceListenerArgumentCaptor.getValue();
+    
consumerRebalanceListener.onPartitionsAssigned(Collections.singleton(topicPartition));
     //poll the topic
     kafkaSpout.nextTuple();
     //emit all of the five records
@@ -228,6 +237,11 @@ public class KafkaSpoutTest {
 
     kafkaSpout.open(Collections.singletonMap(Config.TOPOLOGY_RELIABILITY_MODE,
         ATLEAST_ONCE.name()), topologyContext, collector);
+    verify(consumer).subscribe(eq(Collections.singleton(DUMMY_TOPIC_NAME)),
+        consumerRebalanceListenerArgumentCaptor.capture());
+    ConsumerRebalanceListener consumerRebalanceListener =
+        consumerRebalanceListenerArgumentCaptor.getValue();
+    
consumerRebalanceListener.onPartitionsAssigned(Collections.singleton(topicPartition));
     //poll the topic
     kafkaSpout.nextTuple();
     //emit all of the five records
@@ -265,23 +279,20 @@ public class KafkaSpoutTest {
 
   @Test
   public void consumerRebalanceListener() {
+    kafkaSpout.initState(state);
     when(kafkaConsumerFactory.create()).thenReturn(consumer);
 
     kafkaSpout.open(Collections.singletonMap(Config.TOPOLOGY_RELIABILITY_MODE,
-        ATLEAST_ONCE.name()), topologyContext, collector);
+        EFFECTIVELY_ONCE.name()), topologyContext, collector);
     verify(consumer).subscribe(eq(Collections.singleton(DUMMY_TOPIC_NAME)),
         consumerRebalanceListenerArgumentCaptor.capture());
     ConsumerRebalanceListener consumerRebalanceListener =
         consumerRebalanceListenerArgumentCaptor.getValue();
     TopicPartition topicPartition = new TopicPartition(DUMMY_TOPIC_NAME, 0);
+    when(state.get(topicPartition)).thenReturn(5L);
+    when(state.containsKey(topicPartition)).thenReturn(true);
     
consumerRebalanceListener.onPartitionsAssigned(Collections.singleton(topicPartition));
-    verify(consumer).position(topicPartition, Duration.ofSeconds(5));
-
-    kafkaSpout.ack(new KafkaSpout.ConsumerRecordMessageId(topicPartition, 0));
-    kafkaSpout.ack(new KafkaSpout.ConsumerRecordMessageId(topicPartition, 1));
-    
consumerRebalanceListener.onPartitionsRevoked(Collections.singleton(topicPartition));
-    verify(consumer).commitAsync(Collections.singletonMap(topicPartition,
-        new OffsetAndMetadata(2)), null);
+    verify(consumer).seek(topicPartition, 5L);
   }
 
   @Test

Reply via email to