simingweng commented on a change in pull request #3275: Bugfix kafka spout
URL: https://github.com/apache/incubator-heron/pull/3275#discussion_r287205008
 
 

 ##########
 File path: 
contrib/spouts/kafka/java/heron-kafka-spout-parent/heron-kafka-spout/src/main/java/org/apache/heron/spouts/kafka/KafkaSpout.java
 ##########
 @@ -275,27 +266,58 @@ public void declareOutputFields(OutputFieldsDeclarer 
declarer) {
             new Fields(consumerRecordTransformer.getFieldNames(s))));
   }
 
+  @Override
+  public Map<String, Object> getComponentConfiguration() {
+    return null;
+  }
+
+  private void initialize(Map<String, Object> conf) {
+    topologyReliabilityMode = Config.TopologyReliabilityMode.valueOf(
+        conf.get(Config.TOPOLOGY_RELIABILITY_MODE).toString());
+    metricsIntervalInSecs = (int) ((SystemConfig) SingletonRegistry.INSTANCE
+        .getSingleton(SystemConfig.HERON_SYSTEM_CONFIG))
+        .getHeronMetricsExportInterval().getSeconds();
+    consumer = kafkaConsumerFactory.create();
+    if (topicNames != null) {
+      consumer.subscribe(topicNames, new KafkaConsumerRebalanceListener());
+    } else {
+      consumer.subscribe(topicPatternProvider.create(), new 
KafkaConsumerRebalanceListener());
+    }
+    buffer = new ArrayDeque<>(500);
+    ackRegistry = new HashMap<>();
+    failureRegistry = new HashMap<>();
+    assignedPartitions = new HashSet<>();
+    reportedMetrics = new HashSet<>();
+  }
+
   private void emitConsumerRecord(ConsumerRecord<K, V> record) {
-    consumerRecordTransformer.transform(record)
-        .forEach((s, objects) -> {
-          if (topologyReliabilityMode != 
Config.TopologyReliabilityMode.ATLEAST_ONCE) {
-            collector.emit(s, objects);
-            //only in effective once mode, we need to track the offset of the 
record that is just
-            //emitted into the topology
-            if (topologyReliabilityMode
-                == Config.TopologyReliabilityMode.EFFECTIVELY_ONCE) {
-              state.put(new TopicPartition(record.topic(), record.partition()),
-                  record.offset());
-            }
-          } else {
-            //build message id based on topic, partition, offset of the 
consumer record
-            ConsumerRecordMessageId consumerRecordMessageId =
-                new ConsumerRecordMessageId(new TopicPartition(record.topic(),
-                    record.partition()), record.offset());
-            //emit tuple with the message id
-            collector.emit(s, objects, consumerRecordMessageId);
-          }
-        });
+    Map<String, List<Object>> tupleByStream = 
consumerRecordTransformer.transform(record);
+    //nothing worth emitting out of this record,
+    //so immediately acknowledge it if in ATLEAST_ONCE mode
+    if (tupleByStream.isEmpty() && topologyReliabilityMode
+        == Config.TopologyReliabilityMode.ATLEAST_ONCE) {
+      ack(new ConsumerRecordMessageId(new TopicPartition(record.topic(), 
record.partition()),
+          record.offset()));
+    }
+    tupleByStream.forEach((s, objects) -> {
+      if (topologyReliabilityMode != 
Config.TopologyReliabilityMode.ATLEAST_ONCE) {
 
 Review comment:
   addressed

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to