Repository: metron Updated Branches: refs/heads/master 64473d4e8 -> 356881ad2
METRON-966: Pcap topology does not commit offsets closes apache/incubator-metron#597 Project: http://git-wip-us.apache.org/repos/asf/metron/repo Commit: http://git-wip-us.apache.org/repos/asf/metron/commit/356881ad Tree: http://git-wip-us.apache.org/repos/asf/metron/tree/356881ad Diff: http://git-wip-us.apache.org/repos/asf/metron/diff/356881ad Branch: refs/heads/master Commit: 356881ad22edc7c08d8a8f3812192333210f3c8e Parents: 64473d4 Author: cstella <[email protected]> Authored: Sat May 20 09:59:02 2017 -0400 Committer: cstella <[email protected]> Committed: Sat May 20 09:59:02 2017 -0400 ---------------------------------------------------------------------- .../metron/spout/pcap/KafkaToHDFSSpout.java | 50 ++++++++++++++++++++ pom.xml | 18 +------ 2 files changed, 51 insertions(+), 17 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/metron/blob/356881ad/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/spout/pcap/KafkaToHDFSSpout.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/spout/pcap/KafkaToHDFSSpout.java b/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/spout/pcap/KafkaToHDFSSpout.java index ddfd14a..5b3d6f6 100644 --- a/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/spout/pcap/KafkaToHDFSSpout.java +++ b/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/spout/pcap/KafkaToHDFSSpout.java @@ -22,9 +22,20 @@ import org.apache.metron.storm.kafka.flux.SimpleStormKafkaBuilder; import org.apache.storm.kafka.Callback; import org.apache.storm.kafka.CallbackKafkaSpout; +import java.util.ArrayList; +import java.util.List; + public class KafkaToHDFSSpout extends CallbackKafkaSpout<byte[], byte[]> { static final long serialVersionUID = 0xDEADBEEFL; HDFSWriterConfig config = null; + + private static ThreadLocal<List<Object>> messagesToBeAcked = new ThreadLocal<List<Object>>() { + @Override + protected List<Object> initialValue() { + return new ArrayList<>(); + } + }; + public KafkaToHDFSSpout( SimpleStormKafkaBuilder<byte[], byte[]> spoutConfig , HDFSWriterConfig config ) @@ -40,5 +51,44 @@ public class KafkaToHDFSSpout extends CallbackKafkaSpout<byte[], byte[]> { return new HDFSWriterCallback().withConfig(config); } + /** + * Clear all the messages that are queued to be acked. + */ + private void clearMessagesToBeAcked() { + for (Object messageId : messagesToBeAcked.get()) { + super.ack(messageId); + } + messagesToBeAcked.get().clear(); + } + @Override + public void nextTuple() { + /* + This bears some explanation; nextTuple for a spout-only topology sans ackers, will ack as part of the emit method. + The unfortunate part about this is that this will prevent the internal bookeeping of the KafkaSpout to keep add the + message ID to the offsets to commit. This is because it thinks it is not emitted by the time it gets to ack (because + ack is called *within* emit). The result is that no offsets are acked. + + What we have here is a correction. The ack method will add the message ID to a queue to be acked and then at the end + of nextTuple, we will clear the cache and ack. The net result is that the contract is adhered to for spout-only topologies, + ack happens in nextTuple(). + */ + super.nextTuple(); + clearMessagesToBeAcked(); + } + + @Override + public void ack(Object messageId) { + messagesToBeAcked.get().add(messageId); + } + + @Override + public void close() { + try { + clearMessagesToBeAcked(); + } + finally { + super.close(); + } + } } http://git-wip-us.apache.org/repos/asf/metron/blob/356881ad/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index 9f62249..4bbd8ae 100644 --- a/pom.xml +++ b/pom.xml @@ -39,22 +39,6 @@ <repositories> <repository> - <releases> - <enabled>true</enabled> - <updatePolicy>always</updatePolicy> - <checksumPolicy>warn</checksumPolicy> - </releases> - <snapshots> - <enabled>true</enabled> - <updatePolicy>never</updatePolicy> - <checksumPolicy>warn</checksumPolicy> - </snapshots> - <id>HDPPrivateReleases</id> - <name>HDP Private Releases</name> - <url>http://nexus-private.hortonworks.com/nexus/content/groups/public</url> - <layout>default</layout> - </repository> - <repository> <id>clojars.org</id> <url>http://clojars.org/repo</url> </repository> @@ -131,7 +115,7 @@ <properties> <hdp_version>2.5.0.0</hdp_version> <build_number>1245</build_number> - <global_storm_kafka_version>1.1.0.2.6.1.0-SNAPSHOT</global_storm_kafka_version> + <global_storm_kafka_version>1.1.0</global_storm_kafka_version> <global_storm_version>${base_storm_version}.${hdp_version}-${build_number}</global_storm_version> <global_kafka_version>${base_kafka_version}.${hdp_version}-${build_number}</global_kafka_version> </properties>
