RYA-377 Implement the stream query command in the client and fix a bunch of 
client bugs.


Project: http://git-wip-us.apache.org/repos/asf/incubator-rya/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-rya/commit/f3ac7df1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-rya/tree/f3ac7df1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-rya/diff/f3ac7df1

Branch: refs/heads/master
Commit: f3ac7df1e9e707eb2e28e1c1c70212b89b716163
Parents: fc9775e
Author: kchilton2 <kevin.e.chil...@gmail.com>
Authored: Thu Nov 2 12:10:31 2017 -0400
Committer: caleb <caleb.me...@parsons.com>
Committed: Tue Jan 9 15:13:00 2018 -0500

----------------------------------------------------------------------
 .../api/queries/InMemoryQueryChangeLog.java     |   5 +
 .../api/queries/InMemoryQueryRepository.java    |  10 ++
 .../rya/streams/api/queries/QueryChangeLog.java |   2 +-
 .../streams/api/queries/QueryRepository.java    |   2 +-
 .../apache/rya/streams/client/CLIDriver.java    |  40 ++++--
 .../rya/streams/client/RyaStreamsCommand.java   |   8 ++
 .../streams/client/command/AddQueryCommand.java |  77 ++++------
 .../client/command/DeleteQueryCommand.java      |  72 ++++------
 .../client/command/ListQueriesCommand.java      |  70 ++++-----
 .../client/command/LoadStatementsCommand.java   |  25 ++--
 .../client/command/StreamResultsCommand.java    | 141 +++++++++++++++++++
 .../client/src/main/resources/log4j.properties  |  27 ++++
 .../client/command/AddQueryCommandIT.java       |   3 +-
 .../client/command/DeleteQueryCommandIT.java    |   3 +-
 .../client/command/ListQueryCommandIT.java      |   3 +-
 .../kafka/queries/KafkaQueryChangeLog.java      |  30 +++-
 .../queries/KafkaQueryChangeLogFactory.java     |  74 ++++++++++
 17 files changed, 427 insertions(+), 165 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/f3ac7df1/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/queries/InMemoryQueryChangeLog.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/queries/InMemoryQueryChangeLog.java
 
b/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/queries/InMemoryQueryChangeLog.java
index 71af850..f0f628e 100644
--- 
a/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/queries/InMemoryQueryChangeLog.java
+++ 
b/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/queries/InMemoryQueryChangeLog.java
@@ -71,6 +71,11 @@ public class InMemoryQueryChangeLog implements 
QueryChangeLog {
         }
     }
 
+    @Override
+    public void close() throws Exception {
+        // Nothing to do here.
+    }
+
     /**
      * A {@link CloseableIteration} that iterates over a list.
      *

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/f3ac7df1/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/queries/InMemoryQueryRepository.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/queries/InMemoryQueryRepository.java
 
b/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/queries/InMemoryQueryRepository.java
index 652b16f..c1048fc 100644
--- 
a/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/queries/InMemoryQueryRepository.java
+++ 
b/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/queries/InMemoryQueryRepository.java
@@ -131,6 +131,16 @@ public class InMemoryQueryRepository implements 
QueryRepository {
         }
     }
 
+    @Override
+    public void close() throws Exception {
+        lock.lock();
+        try {
+            changeLog.close();
+        } finally {
+            lock.unlock();
+        }
+    }
+
     /**
      * A {@link Map} from query id to the {@link StreamsQuery} that is 
represented by that id based on what
      * is already in a {@link QueryChangeLog}.

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/f3ac7df1/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/queries/QueryChangeLog.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/queries/QueryChangeLog.java
 
b/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/queries/QueryChangeLog.java
index 824eebc..5765366 100644
--- 
a/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/queries/QueryChangeLog.java
+++ 
b/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/queries/QueryChangeLog.java
@@ -26,7 +26,7 @@ import info.aduna.iteration.CloseableIteration;
  * An ordered log of all of the changes that have been applied to the SPARQL 
Queries that are managed by Rya Streams.
  */
 @DefaultAnnotation(NonNull.class)
-public interface QueryChangeLog {
+public interface QueryChangeLog extends AutoCloseable {
 
     /**
      * Write a new {@link QueryChange} to the end of the change log.

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/f3ac7df1/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/queries/QueryRepository.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/queries/QueryRepository.java
 
b/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/queries/QueryRepository.java
index 54f98fc..850b2bc 100644
--- 
a/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/queries/QueryRepository.java
+++ 
b/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/queries/QueryRepository.java
@@ -31,7 +31,7 @@ import edu.umd.cs.findbugs.annotations.NonNull;
  * Repository for adding, deleting, and listing active queries in Rya Streams.
  */
 @DefaultAnnotation(NonNull.class)
-public interface QueryRepository {
+public interface QueryRepository extends AutoCloseable {
     /**
      * Adds a new query to Rya Streams.
      *

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/f3ac7df1/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/CLIDriver.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/CLIDriver.java
 
b/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/CLIDriver.java
index 93df2ae..5c0816f 100644
--- 
a/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/CLIDriver.java
+++ 
b/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/CLIDriver.java
@@ -26,9 +26,11 @@ import java.util.Set;
 
 import org.apache.rya.streams.client.RyaStreamsCommand.ArgumentsException;
 import org.apache.rya.streams.client.RyaStreamsCommand.ExecutionException;
+import org.apache.rya.streams.client.command.AddQueryCommand;
+import org.apache.rya.streams.client.command.DeleteQueryCommand;
+import org.apache.rya.streams.client.command.ListQueriesCommand;
 import org.apache.rya.streams.client.command.LoadStatementsCommand;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.apache.rya.streams.client.command.StreamResultsCommand;
 
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Lists;
@@ -37,23 +39,31 @@ import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
 import edu.umd.cs.findbugs.annotations.NonNull;
 
 /**
- * CLI tool for interacting with rya streams.
- * <p>
+ * CLI tool for interacting with Rya Streams.
+ * </p>
  * This tool can be used to:
  * <ul>
- * <li>Load a file of statements into rya streams</li>
+ *   <li>Add a Query to Rya Streams</li>
+ *   <li>Delete a Query from Rya Streams</li>
+ *   <li>List the Queries that are being managed by Rya Streams</li>
+ *   <li>Load a file of RDF Statements into Rya Streams</li>
+ *   <li>Stream the results of a Query to the console</li>
  * </ul>
  */
 @DefaultAnnotation(NonNull.class)
 public class CLIDriver {
-    private static final Logger LOG = LoggerFactory.getLogger(CLIDriver.class);
+
     /**
      * Maps from command strings to the object that performs the command.
      */
     private static final ImmutableMap<String, RyaStreamsCommand> COMMANDS;
     static {
         final Set<Class<? extends RyaStreamsCommand>> commandClasses = new 
HashSet<>();
+        commandClasses.add(AddQueryCommand.class);
+        commandClasses.add(DeleteQueryCommand.class);
+        commandClasses.add(ListQueriesCommand.class);
         commandClasses.add(LoadStatementsCommand.class);
+        commandClasses.add(StreamResultsCommand.class);
         final ImmutableMap.Builder<String, RyaStreamsCommand> builder = 
ImmutableMap.builder();
         for(final Class<? extends RyaStreamsCommand> commandClass : 
commandClasses) {
             try {
@@ -70,10 +80,7 @@ public class CLIDriver {
     private static final String USAGE = makeUsage(COMMANDS);
 
     public static void main(final String[] args) {
-        LOG.trace("Starting up the Rya Streams Client.");
-
-        // If no command provided or the command isn't recognized, then print
-        // the usage.
+        // If no command provided or the command isn't recognized, then print 
the usage.
         if (args.length == 0 || !COMMANDS.containsKey(args[0])) {
             System.out.println(USAGE);
             System.exit(1);
@@ -84,14 +91,19 @@ public class CLIDriver {
         final String[] commandArgs = Arrays.copyOfRange(args, 1, args.length);
         final RyaStreamsCommand streamsCommand = COMMANDS.get(command);
 
+        // Print usage if the arguments are invalid for the command.
+        if(!streamsCommand.validArguments(commandArgs)) {
+            System.out.println(streamsCommand.getUsage());
+            System.exit(1);
+        }
+
         // Execute the command.
         try {
             streamsCommand.execute(commandArgs);
         } catch (ArgumentsException | ExecutionException e) {
-            LOG.error("The command: " + command + " failed to execute 
properly.", e);
+            System.err.println("The command: " + command + " failed to execute 
properly.");
+            e.printStackTrace();
             System.exit(2);
-        } finally {
-            LOG.trace("Shutting down the Rya Streams Client.");
         }
     }
 
@@ -119,4 +131,4 @@ public class CLIDriver {
 
         return usage.toString();
     }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/f3ac7df1/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/RyaStreamsCommand.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/RyaStreamsCommand.java
 
b/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/RyaStreamsCommand.java
index 5d64785..5b05d0a 100644
--- 
a/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/RyaStreamsCommand.java
+++ 
b/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/RyaStreamsCommand.java
@@ -89,6 +89,14 @@ public interface RyaStreamsCommand {
     }
 
     /**
+     * Validates a set of arguments that may be passed into the command.
+     *
+     * @param args - The arguments that will be validated. (not null)
+     * @return {@code true} if the arguments are valid, otherwise {@code 
false}.
+     */
+    public boolean validArguments(String[] args);
+
+    /**
      * Execute the command using the command line arguments.
      *
      * @param args - Command line arguments that configure how the command 
will execute. (not null)

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/f3ac7df1/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/AddQueryCommand.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/AddQueryCommand.java
 
b/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/AddQueryCommand.java
index 8439f20..c72e6a2 100644
--- 
a/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/AddQueryCommand.java
+++ 
b/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/AddQueryCommand.java
@@ -20,32 +20,16 @@ package org.apache.rya.streams.client.command;
 
 import static java.util.Objects.requireNonNull;
 
-import java.util.Properties;
-
-import org.apache.kafka.clients.CommonClientConfigs;
-import org.apache.kafka.clients.consumer.Consumer;
-import org.apache.kafka.clients.consumer.ConsumerConfig;
-import org.apache.kafka.clients.consumer.KafkaConsumer;
-import org.apache.kafka.clients.producer.KafkaProducer;
-import org.apache.kafka.clients.producer.Producer;
-import org.apache.kafka.clients.producer.ProducerConfig;
-import org.apache.kafka.common.serialization.StringDeserializer;
-import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.rya.streams.api.entity.StreamsQuery;
 import org.apache.rya.streams.api.exception.RyaStreamsException;
 import org.apache.rya.streams.api.interactor.AddQuery;
 import org.apache.rya.streams.api.interactor.defaults.DefaultAddQuery;
 import org.apache.rya.streams.api.queries.InMemoryQueryRepository;
-import org.apache.rya.streams.api.queries.QueryChange;
 import org.apache.rya.streams.api.queries.QueryChangeLog;
 import org.apache.rya.streams.api.queries.QueryRepository;
 import org.apache.rya.streams.client.RyaStreamsCommand;
 import org.apache.rya.streams.kafka.KafkaTopics;
-import org.apache.rya.streams.kafka.queries.KafkaQueryChangeLog;
-import 
org.apache.rya.streams.kafka.serialization.queries.QueryChangeDeserializer;
-import 
org.apache.rya.streams.kafka.serialization.queries.QueryChangeSerializer;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.apache.rya.streams.kafka.queries.KafkaQueryChangeLogFactory;
 
 import com.beust.jcommander.JCommander;
 import com.beust.jcommander.Parameter;
@@ -60,7 +44,6 @@ import edu.umd.cs.findbugs.annotations.NonNull;
  */
 @DefaultAnnotation(NonNull.class)
 public class AddQueryCommand implements RyaStreamsCommand {
-    private static final Logger log = 
LoggerFactory.getLogger(AddQueryCommand.class);
 
     /**
      * Command line parameters that are used by this command to configure 
itself.
@@ -73,11 +56,9 @@ public class AddQueryCommand implements RyaStreamsCommand {
         public String toString() {
             final StringBuilder parameters = new StringBuilder();
             parameters.append(super.toString());
-            parameters.append("\n");
 
             if (!Strings.isNullOrEmpty(query)) {
-                parameters.append("\tQuery: " + query);
-                parameters.append("\n");
+                parameters.append("\tQuery: " + query + "\n");
             }
             return parameters.toString();
         }
@@ -103,6 +84,17 @@ public class AddQueryCommand implements RyaStreamsCommand {
     }
 
     @Override
+    public boolean validArguments(final String[] args) {
+        boolean valid = true;
+        try {
+            new JCommander(new AddParameters(), args);
+        } catch(final ParameterException e) {
+            valid = false;
+        }
+        return valid;
+    }
+
+    @Override
     public void execute(final String[] args) throws ArgumentsException, 
ExecutionException {
         requireNonNull(args);
 
@@ -113,34 +105,27 @@ public class AddQueryCommand implements RyaStreamsCommand 
{
         } catch(final ParameterException e) {
             throw new ArgumentsException("Could not add a new query because of 
invalid command line parameters.", e);
         }
-        log.trace("Executing the Add Query Command\n" + params.toString());
 
-        // Create properties for interacting with Kafka.
-        final Properties producerProperties = new Properties();
-        
producerProperties.setProperty(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, 
params.kafkaIP + ":" + params.kafkaPort);
-        
producerProperties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
StringSerializer.class.getName());
-        
producerProperties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
QueryChangeSerializer.class.getName());
-
-        final Properties consumerProperties = new Properties();
-        
consumerProperties.setProperty(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, 
params.kafkaIP + ":" + params.kafkaPort);
-        
consumerProperties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
StringDeserializer.class.getName());
-        
consumerProperties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
QueryChangeDeserializer.class.getName());
-
-        final Producer<?, QueryChange> queryProducer = new 
KafkaProducer<>(producerProperties);
-        final Consumer<?, QueryChange> queryConsumer = new 
KafkaConsumer<>(consumerProperties);
-
-        final QueryChangeLog changeLog = new 
KafkaQueryChangeLog(queryProducer, queryConsumer, 
KafkaTopics.queryChangeLogTopic(params.ryaInstance));
-        final QueryRepository repo = new InMemoryQueryRepository(changeLog);
+        // Create the Kafka backed QueryChangeLog.
+        final String bootstrapServers = params.kafkaIP + ":" + 
params.kafkaPort;
+        final String topic = 
KafkaTopics.queryChangeLogTopic(params.ryaInstance);
+        final QueryChangeLog queryChangeLog = 
KafkaQueryChangeLogFactory.make(bootstrapServers, topic);
 
         // Execute the add query command.
-        final AddQuery addQuery = new DefaultAddQuery(repo);
-        try {
-            final StreamsQuery query = addQuery.addQuery(params.query);
-            log.trace("Added query: " + query.getSparql());
-        } catch (final RyaStreamsException e) {
-            log.error("Unable to parse query: " + params.query, e);
+        try(QueryRepository queryRepo = new 
InMemoryQueryRepository(queryChangeLog)) {
+            final AddQuery addQuery = new DefaultAddQuery(queryRepo);
+            try {
+                final StreamsQuery query = addQuery.addQuery(params.query);
+                System.out.println("Added query: " + query.getSparql());
+            } catch (final RyaStreamsException e) {
+                System.err.println("Unable to parse query: " + params.query);
+                e.printStackTrace();
+                System.exit(1);
+            }
+        } catch (final Exception e) {
+            System.err.println("Problem encountered while closing the 
QueryRepository.");
+            e.printStackTrace();
+            System.exit(1);
         }
-
-        log.trace("Finished executing the Add Query Command.");
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/f3ac7df1/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/DeleteQueryCommand.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/DeleteQueryCommand.java
 
b/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/DeleteQueryCommand.java
index b101a0f..2aeb90c 100644
--- 
a/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/DeleteQueryCommand.java
+++ 
b/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/DeleteQueryCommand.java
@@ -20,32 +20,17 @@ package org.apache.rya.streams.client.command;
 
 import static java.util.Objects.requireNonNull;
 
-import java.util.Properties;
 import java.util.UUID;
 
-import org.apache.kafka.clients.CommonClientConfigs;
-import org.apache.kafka.clients.consumer.Consumer;
-import org.apache.kafka.clients.consumer.ConsumerConfig;
-import org.apache.kafka.clients.consumer.KafkaConsumer;
-import org.apache.kafka.clients.producer.KafkaProducer;
-import org.apache.kafka.clients.producer.Producer;
-import org.apache.kafka.clients.producer.ProducerConfig;
-import org.apache.kafka.common.serialization.StringDeserializer;
-import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.rya.streams.api.exception.RyaStreamsException;
 import org.apache.rya.streams.api.interactor.DeleteQuery;
 import org.apache.rya.streams.api.interactor.defaults.DefaultDeleteQuery;
 import org.apache.rya.streams.api.queries.InMemoryQueryRepository;
-import org.apache.rya.streams.api.queries.QueryChange;
 import org.apache.rya.streams.api.queries.QueryChangeLog;
 import org.apache.rya.streams.api.queries.QueryRepository;
 import org.apache.rya.streams.client.RyaStreamsCommand;
 import org.apache.rya.streams.kafka.KafkaTopics;
-import org.apache.rya.streams.kafka.queries.KafkaQueryChangeLog;
-import 
org.apache.rya.streams.kafka.serialization.queries.QueryChangeDeserializer;
-import 
org.apache.rya.streams.kafka.serialization.queries.QueryChangeSerializer;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.apache.rya.streams.kafka.queries.KafkaQueryChangeLogFactory;
 
 import com.beust.jcommander.JCommander;
 import com.beust.jcommander.Parameter;
@@ -60,7 +45,6 @@ import edu.umd.cs.findbugs.annotations.NonNull;
  */
 @DefaultAnnotation(NonNull.class)
 public class DeleteQueryCommand implements RyaStreamsCommand {
-    private static final Logger log = 
LoggerFactory.getLogger(DeleteQueryCommand.class);
 
     /**
      * Command line parameters that are used by this command to configure 
itself.
@@ -102,6 +86,17 @@ public class DeleteQueryCommand implements 
RyaStreamsCommand {
     }
 
     @Override
+    public boolean validArguments(final String[] args) {
+        boolean valid = true;
+        try {
+            new JCommander(new RemoveParameters(), args);
+        } catch(final ParameterException e) {
+            valid = false;
+        }
+        return valid;
+    }
+
+    @Override
     public void execute(final String[] args) throws ArgumentsException, 
ExecutionException {
         requireNonNull(args);
 
@@ -112,34 +107,27 @@ public class DeleteQueryCommand implements 
RyaStreamsCommand {
         } catch(final ParameterException e) {
             throw new ArgumentsException("Could not add a new query because of 
invalid command line parameters.", e);
         }
-        log.trace("Executing the Add Query Command\n" + params.toString());
 
-        // Create properties for interacting with Kafka.
-        final Properties producerProperties = new Properties();
-        
producerProperties.setProperty(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, 
params.kafkaIP + ":" + params.kafkaPort);
-        
producerProperties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
StringSerializer.class.getName());
-        
producerProperties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
QueryChangeSerializer.class.getName());
-
-        final Properties consumerProperties = new Properties();
-        
consumerProperties.setProperty(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, 
params.kafkaIP + ":" + params.kafkaPort);
-        
consumerProperties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
StringDeserializer.class.getName());
-        
consumerProperties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
QueryChangeDeserializer.class.getName());
-
-        final Producer<?, QueryChange> queryProducer = new 
KafkaProducer<>(producerProperties);
-        final Consumer<?, QueryChange> queryConsumer = new 
KafkaConsumer<>(consumerProperties);
-
-        final QueryChangeLog changeLog = new 
KafkaQueryChangeLog(queryProducer, queryConsumer, 
KafkaTopics.queryChangeLogTopic(params.ryaInstance));
-        final QueryRepository repo = new InMemoryQueryRepository(changeLog);
+        // Create the Kafka backed QueryChangeLog.
+        final String bootstrapServers = params.kafkaIP + ":" + 
params.kafkaPort;
+        final String topic = 
KafkaTopics.queryChangeLogTopic(params.ryaInstance);
+        final QueryChangeLog queryChangeLog = 
KafkaQueryChangeLogFactory.make(bootstrapServers, topic);
 
         // Execute the delete query command.
-        final DeleteQuery deleteQuery = new DefaultDeleteQuery(repo);
-        try {
-            deleteQuery.delete(UUID.fromString(params.queryId));
-            log.trace("Deleted query: " + params.queryId);
-        } catch (final RyaStreamsException e) {
-            log.error("Unable to delete query with ID: " + params.queryId, e);
+        try(QueryRepository queryRepo = new 
InMemoryQueryRepository(queryChangeLog)) {
+            final DeleteQuery deleteQuery = new DefaultDeleteQuery(queryRepo);
+            try {
+                deleteQuery.delete(UUID.fromString(params.queryId));
+                System.out.println("Deleted query: " + params.queryId);
+            } catch (final RyaStreamsException e) {
+                System.err.println("Unable to delete query with ID: " + 
params.queryId);
+                e.printStackTrace();
+                System.exit(1);
+            }
+        } catch (final Exception e) {
+            System.err.println("Problem encountered while closing the 
QueryRepository.");
+            e.printStackTrace();
+            System.exit(1);
         }
-
-        log.trace("Finished executing the Delete Query Command.");
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/f3ac7df1/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/ListQueriesCommand.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/ListQueriesCommand.java
 
b/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/ListQueriesCommand.java
index c4e5de6..670007b 100644
--- 
a/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/ListQueriesCommand.java
+++ 
b/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/ListQueriesCommand.java
@@ -20,33 +20,18 @@ package org.apache.rya.streams.client.command;
 
 import static java.util.Objects.requireNonNull;
 
-import java.util.Properties;
 import java.util.Set;
 
-import org.apache.kafka.clients.CommonClientConfigs;
-import org.apache.kafka.clients.consumer.Consumer;
-import org.apache.kafka.clients.consumer.ConsumerConfig;
-import org.apache.kafka.clients.consumer.KafkaConsumer;
-import org.apache.kafka.clients.producer.KafkaProducer;
-import org.apache.kafka.clients.producer.Producer;
-import org.apache.kafka.clients.producer.ProducerConfig;
-import org.apache.kafka.common.serialization.StringDeserializer;
-import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.rya.streams.api.entity.StreamsQuery;
 import org.apache.rya.streams.api.exception.RyaStreamsException;
 import org.apache.rya.streams.api.interactor.ListQueries;
 import org.apache.rya.streams.api.interactor.defaults.DefaultListQueries;
 import org.apache.rya.streams.api.queries.InMemoryQueryRepository;
-import org.apache.rya.streams.api.queries.QueryChange;
 import org.apache.rya.streams.api.queries.QueryChangeLog;
 import org.apache.rya.streams.api.queries.QueryRepository;
 import org.apache.rya.streams.client.RyaStreamsCommand;
 import org.apache.rya.streams.kafka.KafkaTopics;
-import org.apache.rya.streams.kafka.queries.KafkaQueryChangeLog;
-import 
org.apache.rya.streams.kafka.serialization.queries.QueryChangeDeserializer;
-import 
org.apache.rya.streams.kafka.serialization.queries.QueryChangeSerializer;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.apache.rya.streams.kafka.queries.KafkaQueryChangeLogFactory;
 
 import com.beust.jcommander.JCommander;
 import com.beust.jcommander.ParameterException;
@@ -59,7 +44,6 @@ import edu.umd.cs.findbugs.annotations.NonNull;
  */
 @DefaultAnnotation(NonNull.class)
 public class ListQueriesCommand implements RyaStreamsCommand {
-    private static final Logger log = 
LoggerFactory.getLogger(ListQueriesCommand.class);
 
     @Override
     public String getCommand() {
@@ -72,6 +56,17 @@ public class ListQueriesCommand implements RyaStreamsCommand 
{
     }
 
     @Override
+    public boolean validArguments(final String[] args) {
+        boolean valid = true;
+        try {
+            new JCommander(new KafkaParameters(), args);
+        } catch(final ParameterException e) {
+            valid = false;
+        }
+        return valid;
+    }
+
+    @Override
     public void execute(final String[] args) throws ArgumentsException, 
ExecutionException {
         requireNonNull(args);
 
@@ -82,32 +77,27 @@ public class ListQueriesCommand implements 
RyaStreamsCommand {
         } catch (final ParameterException e) {
             throw new ArgumentsException("Could not list the queries because 
of invalid command line parameters.", e);
         }
-        log.trace("Executing the List Query Command.\n" + params.toString());
 
-        // Create properties for interacting with Kafka.
-        final Properties producerProperties = new Properties();
-        
producerProperties.setProperty(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, 
params.kafkaIP + ":" + params.kafkaPort);
-        
producerProperties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
StringSerializer.class.getName());
-        
producerProperties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
QueryChangeSerializer.class.getName());
-
-        final Properties consumerProperties = new Properties();
-        
consumerProperties.setProperty(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, 
params.kafkaIP + ":" + params.kafkaPort);
-        
consumerProperties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
StringDeserializer.class.getName());
-        
consumerProperties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
QueryChangeDeserializer.class.getName());
-
-        final Producer<?, QueryChange> queryProducer = new 
KafkaProducer<>(producerProperties);
-        final Consumer<?, QueryChange> queryConsumer = new 
KafkaConsumer<>(consumerProperties);
-
-        final QueryChangeLog changeLog = new 
KafkaQueryChangeLog(queryProducer, queryConsumer, 
KafkaTopics.queryChangeLogTopic(params.ryaInstance));
-        final QueryRepository repo = new InMemoryQueryRepository(changeLog);
+        // Create the Kafka backed QueryChangeLog.
+        final String bootstrapServers = params.kafkaIP + ":" + 
params.kafkaPort;
+        final String topic = 
KafkaTopics.queryChangeLogTopic(params.ryaInstance);
+        final QueryChangeLog queryChangeLog = 
KafkaQueryChangeLogFactory.make(bootstrapServers, topic);
 
         // Execute the list queries command.
-        final ListQueries listQueries = new DefaultListQueries(repo);
-        try {
-            final Set<StreamsQuery> queries = listQueries.all();
-            System.out.println( formatQueries(queries) );
-        } catch (final RyaStreamsException e) {
-            log.error("Unable to retrieve the queries.", e);
+        try(QueryRepository queryRepo = new 
InMemoryQueryRepository(queryChangeLog)) {
+            final ListQueries listQueries = new DefaultListQueries(queryRepo);
+            try {
+                final Set<StreamsQuery> queries = listQueries.all();
+                System.out.println( formatQueries(queries) );
+            } catch (final RyaStreamsException e) {
+                System.err.println("Unable to retrieve the queries.");
+                e.printStackTrace();
+                System.exit(1);
+            }
+        } catch (final Exception e) {
+            System.err.println("Problem encountered while closing the 
QueryRepository.");
+            e.printStackTrace();
+            System.exit(1);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/f3ac7df1/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/LoadStatementsCommand.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/LoadStatementsCommand.java
 
b/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/LoadStatementsCommand.java
index 4763bd8..6ae63da 100644
--- 
a/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/LoadStatementsCommand.java
+++ 
b/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/LoadStatementsCommand.java
@@ -34,8 +34,6 @@ import org.apache.rya.streams.client.RyaStreamsCommand;
 import org.apache.rya.streams.kafka.KafkaTopics;
 import org.apache.rya.streams.kafka.interactor.KafkaLoadStatements;
 import 
org.apache.rya.streams.kafka.serialization.VisibilityStatementSerializer;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import com.beust.jcommander.JCommander;
 import com.beust.jcommander.Parameter;
@@ -51,7 +49,6 @@ import edu.umd.cs.findbugs.annotations.NonNull;
  */
 @DefaultAnnotation(NonNull.class)
 public class LoadStatementsCommand implements RyaStreamsCommand {
-    private static final Logger log = 
LoggerFactory.getLogger(LoadStatementsCommand.class);
 
     /**
      * Command line parameters that are used by this command to configure 
itself.
@@ -90,7 +87,7 @@ public class LoadStatementsCommand implements 
RyaStreamsCommand {
 
     @Override
     public String getDescription() {
-        return "Load RDF Statements into Rya Streams";
+        return "Load RDF Statements into Rya Streams.";
     }
 
     @Override
@@ -103,10 +100,20 @@ public class LoadStatementsCommand implements 
RyaStreamsCommand {
     }
 
     @Override
+    public boolean validArguments(final String[] args) {
+        boolean valid = true;
+        try {
+            new JCommander(new LoadStatementsParameters(), args);
+        } catch(final ParameterException e) {
+            valid = false;
+        }
+        return valid;
+    }
+
+    @Override
     public void execute(final String[] args) throws ArgumentsException, 
ExecutionException {
         requireNonNull(args);
 
-
         // Parse the command line arguments.
         final LoadStatementsParameters params = new LoadStatementsParameters();
         try {
@@ -114,9 +121,7 @@ public class LoadStatementsCommand implements 
RyaStreamsCommand {
         } catch(final ParameterException e) {
             throw new ArgumentsException("Could not load the Statements file 
because of invalid command line parameters.", e);
         }
-        log.trace("Executing the Load Statements Command\n" + 
params.toString());
 
-        log.trace("Loading Statements from the file '" + params.statementsFile 
+ "'.");
         final Path statementsPath = Paths.get(params.statementsFile);
 
         final Properties producerProps = buildProperties(params);
@@ -124,10 +129,8 @@ public class LoadStatementsCommand implements 
RyaStreamsCommand {
             final LoadStatements statements = new 
KafkaLoadStatements(KafkaTopics.statementsTopic(params.ryaInstance), producer);
             statements.load(statementsPath, params.visibilities);
         } catch (final Exception e) {
-            log.error("Unable to parse statements file: " + 
statementsPath.toString(), e);
+            System.err.println("Unable to parse statements file: " + 
statementsPath.toString());
         }
-
-        log.trace("Finished executing the Load Statements Command.");
     }
 
     private Properties buildProperties(final LoadStatementsParameters params) {
@@ -138,4 +141,4 @@ public class LoadStatementsCommand implements 
RyaStreamsCommand {
         props.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
VisibilityStatementSerializer.class.getName());
         return props;
     }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/f3ac7df1/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/StreamResultsCommand.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/StreamResultsCommand.java
 
b/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/StreamResultsCommand.java
new file mode 100644
index 0000000..9de978b
--- /dev/null
+++ 
b/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/StreamResultsCommand.java
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.streams.client.command;
+
+import static java.util.Objects.requireNonNull;
+
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.rya.api.model.VisibilityBindingSet;
+import org.apache.rya.streams.api.entity.QueryResultStream;
+import org.apache.rya.streams.api.interactor.GetQueryResultStream;
+import org.apache.rya.streams.client.RyaStreamsCommand;
+import org.apache.rya.streams.kafka.interactor.KafkaGetQueryResultStream;
+
+import com.beust.jcommander.JCommander;
+import com.beust.jcommander.Parameter;
+import com.beust.jcommander.ParameterException;
+import com.google.common.base.Strings;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+
+/**
+ * A command that streams the results of a query to the console.
+ */
+@DefaultAnnotation(NonNull.class)
+public class StreamResultsCommand implements RyaStreamsCommand {
+
+    /**
+     * Command line parameters that are used by this command to configure 
itself.
+     */
+    private static final class StreamResultsParameters extends 
RyaStreamsCommand.KafkaParameters {
+
+        @Parameter(names = {"--queryId", "-q"}, required = true, description = 
"The query whose results will be streamed to the console.")
+        private String queryId;
+
+        @Override
+        public String toString() {
+            final StringBuilder parameters = new StringBuilder();
+            parameters.append(super.toString());
+
+            if (!Strings.isNullOrEmpty(queryId)) {
+                parameters.append("\tQuery ID: " + queryId);
+                parameters.append("\n");
+            }
+
+            return parameters.toString();
+        }
+    }
+
+    @Override
+    public String getCommand() {
+        return "stream-results";
+    }
+
+    @Override
+    public String getDescription() {
+        return "Stream the results of a query to the console.";
+    }
+
+    @Override
+    public String getUsage() {
+        final JCommander parser = new JCommander(new 
StreamResultsParameters());
+
+        final StringBuilder usage = new StringBuilder();
+        parser.usage(usage);
+        return usage.toString();
+    }
+
+    @Override
+    public boolean validArguments(final String[] args) {
+        boolean valid = true;
+        try {
+            new JCommander(new StreamResultsParameters(), args);
+        } catch(final ParameterException e) {
+            valid = false;
+        }
+        return valid;
+    }
+
+    @Override
+    public void execute(final String[] args) throws ArgumentsException, 
ExecutionException {
+        requireNonNull(args);
+
+        // Parse the command line arguments.
+        final StreamResultsParameters params = new StreamResultsParameters();
+        try {
+            new JCommander(params, args);
+        } catch(final ParameterException e) {
+            throw new ArgumentsException("Could not stream the query's results 
because of invalid command line parameters.", e);
+        }
+
+        final UUID queryId;
+        try {
+            queryId = UUID.fromString( params.queryId );
+        } catch(final IllegalArgumentException e) {
+            throw new ArgumentsException("Invalid Query ID " + params.queryId);
+        }
+
+        // This command executes until the application is killed, so create a 
kill boolean.
+        final AtomicBoolean finished = new AtomicBoolean(false);
+        Runtime.getRuntime().addShutdownHook(new Thread() {
+            @Override
+            public void run() {
+                finished.set(true);
+            }
+        });
+
+        // Execute the command.
+        final GetQueryResultStream getQueryResultStream = new 
KafkaGetQueryResultStream(params.kafkaIP, params.kafkaPort);
+
+        try (final QueryResultStream stream = 
getQueryResultStream.fromNow(queryId)) {
+            while(!finished.get()) {
+                for(final VisibilityBindingSet visBs : stream.poll(1000)) {
+                    System.out.println(visBs);
+                }
+            }
+        } catch (final Exception e) {
+            System.err.println("Error while reading the results from the 
stream.");
+            e.printStackTrace();
+            System.exit(1);
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/f3ac7df1/extras/rya.streams/client/src/main/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/extras/rya.streams/client/src/main/resources/log4j.properties 
b/extras/rya.streams/client/src/main/resources/log4j.properties
new file mode 100644
index 0000000..b07468c
--- /dev/null
+++ b/extras/rya.streams/client/src/main/resources/log4j.properties
@@ -0,0 +1,27 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+# Root logger option
+log4j.rootLogger=INFO, stdout
+
+# Direct log messages to stdout
+log4j.appender.stdout=org.apache.log4j.ConsoleAppender
+log4j.appender.stdout.Target=System.out
+log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
+log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %-5p 
%c{1}:%L - %m%n

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/f3ac7df1/extras/rya.streams/client/src/test/java/org/apache/rya/streams/client/command/AddQueryCommandIT.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.streams/client/src/test/java/org/apache/rya/streams/client/command/AddQueryCommandIT.java
 
b/extras/rya.streams/client/src/test/java/org/apache/rya/streams/client/command/AddQueryCommandIT.java
index 09e874c..ee4378e 100644
--- 
a/extras/rya.streams/client/src/test/java/org/apache/rya/streams/client/command/AddQueryCommandIT.java
+++ 
b/extras/rya.streams/client/src/test/java/org/apache/rya/streams/client/command/AddQueryCommandIT.java
@@ -42,7 +42,6 @@ import org.apache.rya.streams.kafka.KafkaTopics;
 import org.apache.rya.streams.kafka.queries.KafkaQueryChangeLog;
 import 
org.apache.rya.streams.kafka.serialization.queries.QueryChangeDeserializer;
 import 
org.apache.rya.streams.kafka.serialization.queries.QueryChangeSerializer;
-import org.apache.rya.test.kafka.KafkaITBase;
 import org.apache.rya.test.kafka.KafkaTestInstanceRule;
 import org.junit.After;
 import org.junit.Before;
@@ -52,7 +51,7 @@ import org.junit.Test;
 /**
  * integration Test for adding a new query through a command.
  */
-public class AddQueryCommandIT extends KafkaITBase {
+public class AddQueryCommandIT {
 
     private final String ryaInstance = UUID.randomUUID().toString();
 

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/f3ac7df1/extras/rya.streams/client/src/test/java/org/apache/rya/streams/client/command/DeleteQueryCommandIT.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.streams/client/src/test/java/org/apache/rya/streams/client/command/DeleteQueryCommandIT.java
 
b/extras/rya.streams/client/src/test/java/org/apache/rya/streams/client/command/DeleteQueryCommandIT.java
index 0079371..c5dad3d 100644
--- 
a/extras/rya.streams/client/src/test/java/org/apache/rya/streams/client/command/DeleteQueryCommandIT.java
+++ 
b/extras/rya.streams/client/src/test/java/org/apache/rya/streams/client/command/DeleteQueryCommandIT.java
@@ -43,7 +43,6 @@ import org.apache.rya.streams.kafka.KafkaTopics;
 import org.apache.rya.streams.kafka.queries.KafkaQueryChangeLog;
 import 
org.apache.rya.streams.kafka.serialization.queries.QueryChangeDeserializer;
 import 
org.apache.rya.streams.kafka.serialization.queries.QueryChangeSerializer;
-import org.apache.rya.test.kafka.KafkaITBase;
 import org.apache.rya.test.kafka.KafkaTestInstanceRule;
 import org.junit.After;
 import org.junit.Before;
@@ -53,7 +52,7 @@ import org.junit.Test;
 /**
  * Integration Test for deleting a query from Rya Streams through a command.
  */
-public class DeleteQueryCommandIT extends KafkaITBase {
+public class DeleteQueryCommandIT {
 
     private final String ryaInstance = UUID.randomUUID().toString();
 

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/f3ac7df1/extras/rya.streams/client/src/test/java/org/apache/rya/streams/client/command/ListQueryCommandIT.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.streams/client/src/test/java/org/apache/rya/streams/client/command/ListQueryCommandIT.java
 
b/extras/rya.streams/client/src/test/java/org/apache/rya/streams/client/command/ListQueryCommandIT.java
index eb759ba..b32967e 100644
--- 
a/extras/rya.streams/client/src/test/java/org/apache/rya/streams/client/command/ListQueryCommandIT.java
+++ 
b/extras/rya.streams/client/src/test/java/org/apache/rya/streams/client/command/ListQueryCommandIT.java
@@ -38,7 +38,6 @@ import org.apache.rya.streams.kafka.KafkaTopics;
 import org.apache.rya.streams.kafka.queries.KafkaQueryChangeLog;
 import 
org.apache.rya.streams.kafka.serialization.queries.QueryChangeDeserializer;
 import 
org.apache.rya.streams.kafka.serialization.queries.QueryChangeSerializer;
-import org.apache.rya.test.kafka.KafkaITBase;
 import org.apache.rya.test.kafka.KafkaTestInstanceRule;
 import org.junit.After;
 import org.junit.Before;
@@ -48,7 +47,7 @@ import org.junit.Test;
 /**
  * integration Test for listing queries through a command.
  */
-public class ListQueryCommandIT extends KafkaITBase {
+public class ListQueryCommandIT {
 
     private final String ryaInstance = UUID.randomUUID().toString();
 

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/f3ac7df1/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/queries/KafkaQueryChangeLog.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/queries/KafkaQueryChangeLog.java
 
b/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/queries/KafkaQueryChangeLog.java
index 19622ae..9403e4b 100644
--- 
a/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/queries/KafkaQueryChangeLog.java
+++ 
b/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/queries/KafkaQueryChangeLog.java
@@ -23,11 +23,14 @@ import static java.util.Objects.requireNonNull;
 import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
 
 import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
 import org.apache.kafka.clients.producer.Producer;
 import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.rya.streams.api.queries.ChangeLogEntry;
 import org.apache.rya.streams.api.queries.QueryChange;
@@ -76,7 +79,17 @@ public class KafkaQueryChangeLog implements QueryChangeLog {
     @Override
     public void write(final QueryChange newChange) throws 
QueryChangeLogException {
         requireNonNull(newChange);
-        producer.send(new ProducerRecord<>(topic, newChange));
+
+        // Write the change to the log immediately.
+        final Future<RecordMetadata> future = producer.send(new 
ProducerRecord<>(topic, newChange));
+        producer.flush();
+
+        // Don't return until the write has been completed.
+        try {
+            future.get();
+        } catch (InterruptedException | ExecutionException e) {
+            throw new QueryChangeLogException("Could not record a new query 
change to the Kafka Query Change Log.", e);
+        }
     }
 
     @Override
@@ -96,6 +109,15 @@ public class KafkaQueryChangeLog implements QueryChangeLog {
     }
 
     /**
+     * Closing this class will also close the {@link Producer} and {@link 
Consumer} that were passed into it.
+     */
+    @Override
+    public void close() throws Exception {
+        producer.close();
+        consumer.close();
+    }
+
+    /**
      * A {@link CloseableIteration} to iterate over a consumer's results. Since
      * the consumer returns in bulk when poll(), a cache of recent polling is
      * maintained.
@@ -149,10 +171,10 @@ public class KafkaQueryChangeLog implements 
QueryChangeLog {
             final ConsumerRecords<?, QueryChange> records = 
consumer.poll(3000L);
             final List<ChangeLogEntry<QueryChange>> changes = new 
ArrayList<>();
             records.forEach(
-                    record -> 
-                        changes.add(new 
ChangeLogEntry<QueryChange>(record.offset(), record.value()))
+                    record ->
+                        changes.add(new ChangeLogEntry<>(record.offset(), 
record.value()))
                     );
             iterCache = changes.iterator();
         }
     }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/f3ac7df1/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/queries/KafkaQueryChangeLogFactory.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/queries/KafkaQueryChangeLogFactory.java
 
b/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/queries/KafkaQueryChangeLogFactory.java
new file mode 100644
index 0000000..5042b30
--- /dev/null
+++ 
b/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/queries/KafkaQueryChangeLogFactory.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.streams.kafka.queries;
+
+import static java.util.Objects.requireNonNull;
+
+import java.util.Properties;
+
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.rya.streams.api.queries.QueryChange;
+import 
org.apache.rya.streams.kafka.serialization.queries.QueryChangeDeserializer;
+import 
org.apache.rya.streams.kafka.serialization.queries.QueryChangeSerializer;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+
+/**
+ * Creates instances of {@link KafkaQueryChangeLog}.
+ */
+@DefaultAnnotation(NonNull.class)
+public class KafkaQueryChangeLogFactory {
+
+    /**
+     * Creates an instance of {@link KafkaQueryChangeLog} using a new {@link 
Producer} and {@link Consumer}.
+     *
+     * @param bootstrapServers - Indicates which instance of Kafka that will 
be connected to. (not null)
+     * @param topic - The topic the QueryChangeLog is persisted to. (not null)
+     * @return A new instance of {@link KafkaQueryChangeLog}.
+     */
+    public static KafkaQueryChangeLog make(
+            final String bootstrapServers,
+            final String topic) {
+        requireNonNull(bootstrapServers);
+        requireNonNull(topic);
+
+        final Properties producerProperties = new Properties();
+        
producerProperties.setProperty(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, 
bootstrapServers);
+        
producerProperties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
StringSerializer.class.getName());
+        
producerProperties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
QueryChangeSerializer.class.getName());
+
+        final Properties consumerProperties = new Properties();
+        
consumerProperties.setProperty(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, 
bootstrapServers);
+        
consumerProperties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
StringDeserializer.class.getName());
+        
consumerProperties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
QueryChangeDeserializer.class.getName());
+
+        final Producer<?, QueryChange> producer = new 
KafkaProducer<>(producerProperties);
+        final Consumer<?, QueryChange> consumer = new 
KafkaConsumer<>(consumerProperties);
+        return new KafkaQueryChangeLog(producer, consumer, topic);
+    }
+}
\ No newline at end of file

Reply via email to