RYA-377 Fixed a bug where streams jobs would not resume where they had left off after being resumed.
Project: http://git-wip-us.apache.org/repos/asf/incubator-rya/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-rya/commit/3ebf6db8 Tree: http://git-wip-us.apache.org/repos/asf/incubator-rya/tree/3ebf6db8 Diff: http://git-wip-us.apache.org/repos/asf/incubator-rya/diff/3ebf6db8 Branch: refs/heads/master Commit: 3ebf6db80d83757f199ae708d18f5e61f29d45af Parents: a51765e Author: kchilton2 <[email protected]> Authored: Tue Dec 12 20:12:12 2017 -0500 Committer: caleb <[email protected]> Committed: Tue Jan 9 15:13:01 2018 -0500 ---------------------------------------------------------------------- .../rya/streams/kafka/interactor/KafkaRunQuery.java | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/3ebf6db8/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/interactor/KafkaRunQuery.java ---------------------------------------------------------------------- diff --git a/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/interactor/KafkaRunQuery.java b/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/interactor/KafkaRunQuery.java index e587998..aef7c58 100644 --- a/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/interactor/KafkaRunQuery.java +++ b/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/interactor/KafkaRunQuery.java @@ -25,7 +25,7 @@ import java.util.Properties; import java.util.UUID; import java.util.concurrent.CountDownLatch; -import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.processor.TopologyBuilder; @@ -103,8 +103,13 @@ public class KafkaRunQuery implements RunQuery { // Setup the Kafka Stream program. final Properties streamsProps = new Properties(); - streamsProps.setProperty(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, kafkaHostname + ":" + kafkaPort); - streamsProps.put(StreamsConfig.APPLICATION_ID_CONFIG, "KafkaRunQuery-" + UUID.randomUUID()); + streamsProps.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaHostname + ":" + kafkaPort); + + // Use the Query ID as the Application ID to ensure we resume where we left off the last time this command was run. + streamsProps.put(StreamsConfig.APPLICATION_ID_CONFIG, "KafkaRunQuery-" + queryId); + + // Always start at the beginning of the input topic. + streamsProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); final KafkaStreams streams = new KafkaStreams(topologyBuilder, new StreamsConfig(streamsProps));
