RYA-377 Fixed a bug where streams jobs would not resume where they had left off 
after being resumed.


Project: http://git-wip-us.apache.org/repos/asf/incubator-rya/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-rya/commit/3ebf6db8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-rya/tree/3ebf6db8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-rya/diff/3ebf6db8

Branch: refs/heads/master
Commit: 3ebf6db80d83757f199ae708d18f5e61f29d45af
Parents: a51765e
Author: kchilton2 <[email protected]>
Authored: Tue Dec 12 20:12:12 2017 -0500
Committer: caleb <[email protected]>
Committed: Tue Jan 9 15:13:01 2018 -0500

----------------------------------------------------------------------
 .../rya/streams/kafka/interactor/KafkaRunQuery.java      | 11 ++++++++---
 1 file changed, 8 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/3ebf6db8/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/interactor/KafkaRunQuery.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/interactor/KafkaRunQuery.java
 
b/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/interactor/KafkaRunQuery.java
index e587998..aef7c58 100644
--- 
a/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/interactor/KafkaRunQuery.java
+++ 
b/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/interactor/KafkaRunQuery.java
@@ -25,7 +25,7 @@ import java.util.Properties;
 import java.util.UUID;
 import java.util.concurrent.CountDownLatch;
 
-import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.streams.KafkaStreams;
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.processor.TopologyBuilder;
@@ -103,8 +103,13 @@ public class KafkaRunQuery implements RunQuery {
 
         // Setup the Kafka Stream program.
         final Properties streamsProps = new Properties();
-        streamsProps.setProperty(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, 
kafkaHostname + ":" + kafkaPort);
-        streamsProps.put(StreamsConfig.APPLICATION_ID_CONFIG, "KafkaRunQuery-" 
+ UUID.randomUUID());
+        streamsProps.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, 
kafkaHostname + ":" + kafkaPort);
+
+        // Use the Query ID as the Application ID to ensure we resume where we 
left off the last time this command was run.
+        streamsProps.put(StreamsConfig.APPLICATION_ID_CONFIG, "KafkaRunQuery-" 
+ queryId);
+
+        // Always start at the beginning of the input topic.
+        streamsProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
 
         final KafkaStreams streams = new KafkaStreams(topologyBuilder, new 
StreamsConfig(streamsProps));
 

Reply via email to