Repository: incubator-rya
Updated Branches:
  refs/heads/master 3d4a5d0e6 -> 59b20263c


http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/59b20263/extras/shell/src/main/java/org/apache/rya/shell/RyaStreamsCommands.java
----------------------------------------------------------------------
diff --git 
a/extras/shell/src/main/java/org/apache/rya/shell/RyaStreamsCommands.java 
b/extras/shell/src/main/java/org/apache/rya/shell/RyaStreamsCommands.java
new file mode 100644
index 0000000..5f7df84
--- /dev/null
+++ b/extras/shell/src/main/java/org/apache/rya/shell/RyaStreamsCommands.java
@@ -0,0 +1,297 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.shell;
+
+import static java.util.Objects.requireNonNull;
+
+import java.io.IOException;
+import java.util.Set;
+import java.util.UUID;
+
+import org.apache.rya.api.client.RyaClient;
+import org.apache.rya.api.client.RyaClientException;
+import org.apache.rya.api.instance.RyaDetails;
+import org.apache.rya.api.instance.RyaDetails.RyaStreamsDetails;
+import org.apache.rya.shell.SharedShellState.ConnectionState;
+import org.apache.rya.shell.util.SparqlPrompt;
+import org.apache.rya.shell.util.StreamsQueryFormatter;
+import org.apache.rya.streams.api.RyaStreamsClient;
+import org.apache.rya.streams.api.entity.StreamsQuery;
+import org.apache.rya.streams.api.exception.RyaStreamsException;
+import org.apache.rya.streams.kafka.KafkaRyaStreamsClientFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.shell.core.CommandMarker;
+import org.springframework.shell.core.annotation.CliAvailabilityIndicator;
+import org.springframework.shell.core.annotation.CliCommand;
+import org.springframework.shell.core.annotation.CliOption;
+import org.springframework.stereotype.Component;
+
+import com.google.common.base.Optional;
+
+/**
+ * Rya Shell commands used to interact with the Rya Streams subsystem.
+ */
+@Component
+public class RyaStreamsCommands implements CommandMarker {
+
+    public static final String STREAMS_CONFIGURE_CMD = "streams-configure";
+    public static final String STREAMS_DETAILS_CMD = "streams-details";
+    public static final String STREAM_QUERIES_ADD_CMD = "streams-queries-add";
+    public static final String STREAM_QUERIES_DELETE_CMD = 
"streams-queries-delete";
+    public static final String STREAM_QUERIES_START_CMD = 
"streams-queries-start";
+    public static final String STREAM_QUERIES_STOP_CMD = 
"streams-queries-stop";
+    public static final String STREAM_QUERIES_LIST_CMD = 
"streams-queries-list";
+    public static final String STREAM_QUERIES_DETAILS_CMD = 
"streams-queries-details";
+
+    private final SharedShellState state;
+    private final SparqlPrompt sparqlPrompt;
+
+    /**
+     * Constructs an instance of {@link RyaStreamsCommands}.
+     *
+     * @param state - Holds shared state between all of the command classes. 
(not null)
+     * @param sparqlPrompt - Prompts a user for a SPARQL query. (not null)
+     */
+    @Autowired
+    public RyaStreamsCommands(
+            final SharedShellState state,
+            final SparqlPrompt sparqlPrompt) {
+        this.state = requireNonNull(state);
+        this.sparqlPrompt = requireNonNull(sparqlPrompt);
+    }
+
+    /**
+     * Enables commands that become available when connected to a Rya instance.
+     */
+    @CliAvailabilityIndicator({
+        STREAMS_CONFIGURE_CMD,
+        STREAMS_DETAILS_CMD})
+    public boolean areConfigCommandsAvailable() {
+        return state.getShellState().getConnectionState() == 
ConnectionState.CONNECTED_TO_INSTANCE;
+    }
+
+    /**
+     * Enables commands that become available when a Rya instance has a 
configured Rya Streams subsystem to use.
+     */
+    @CliAvailabilityIndicator({
+        STREAM_QUERIES_ADD_CMD,
+        STREAM_QUERIES_DELETE_CMD,
+        STREAM_QUERIES_START_CMD,
+        STREAM_QUERIES_STOP_CMD,
+        STREAM_QUERIES_LIST_CMD,
+        STREAM_QUERIES_DETAILS_CMD})
+    public boolean areQueriesCommandsAvailable() {
+        return state.getShellState().getRyaStreamsCommands().isPresent();
+    }
+
+    @CliCommand(value = STREAMS_CONFIGURE_CMD, help = "Connect a Rya Streams 
subsystem to a Rya Instance.")
+    public String configureRyaStreams(
+            @CliOption(key = {"kafkaHostname"}, mandatory = true, help = "The 
hostname of the Kafka Broker.")
+            final String kafkaHostname,
+            @CliOption(key = {"kafkaPort"}, mandatory = true, help = "The port 
of the Kafka Broker.")
+            final int kafkaPort) {
+
+        // If this instance was connected to a different Rya Streams 
subsystem, then close that client.
+        final Optional<RyaStreamsClient> oldClient = 
state.getShellState().getRyaStreamsCommands();
+        if(oldClient.isPresent()) {
+            try {
+                oldClient.get().close();
+            } catch (final Exception e) {
+                System.err.print("Warning: Could not close the old Rya Streams 
Client.");
+                e.printStackTrace();
+            }
+        }
+
+        // Update the Rya Details for the connected Rya Instance.
+        final String ryaInstance = 
state.getShellState().getRyaInstanceName().get();
+        final RyaClient ryaClient = 
state.getShellState().getConnectedCommands().get();
+        try {
+            final RyaStreamsDetails streamsDetails = new 
RyaStreamsDetails(kafkaHostname, kafkaPort);
+            
ryaClient.getSetRyaStreamsConfiguration().setRyaStreamsConfiguration(ryaInstance,
 streamsDetails);
+        } catch (final RyaClientException e) {
+            throw new RuntimeException("Could not update the Rya instance's 
Rya Details to include the new " +
+                    "information. This command failed to complete.", e);
+        }
+
+        // Connect a Rya Streams Client and set it in the shared state.
+        final RyaStreamsClient newClient = 
KafkaRyaStreamsClientFactory.make(ryaInstance, kafkaHostname, kafkaPort);
+        state.connectedToRyaStreams(newClient);
+
+        // Return a message that indicates the operation was successful.
+        if(oldClient.isPresent()) {
+            return "The Rya Streams subsystem that this Rya instance uses has 
been changed. Any queries that were " +
+                    "maintained by the previous subsystem will need to be 
migrated to the new one.";
+        } else {
+            return "The Rya Instance has been updated to use the provided Rya 
Streams subsystem. " +
+                    "Rya Streams commands are now avaiable while connected to 
this instance.";
+        }
+    }
+
+    @CliCommand(value = STREAMS_DETAILS_CMD, help = "Print information about 
which Rya Streams subsystem the Rya instance is connected to.")
+    public String printRyaStreamsDetails() {
+        final String ryaInstance = 
state.getShellState().getRyaInstanceName().get();
+        final RyaClient client = 
state.getShellState().getConnectedCommands().get();
+        try {
+            // Handle the case where the instance does not have Rya Details.
+            final Optional<RyaDetails> details = 
client.getGetInstanceDetails().getDetails(ryaInstance);
+            if(!details.isPresent()) {
+                return "This instance does not have any Rya Details, so it is 
unable to be connected to the Rya Streams subsystem.";
+            }
+
+            // Print a message based on if the instance is connected to Rya 
Streams.
+            final Optional<RyaStreamsDetails> streamsDetails = 
details.get().getRyaStreamsDetails();
+            if(!streamsDetails.isPresent()) {
+                return "This instance of Rya has not been configured to use a 
Rya Streams subsystem.";
+            }
+
+            // Print the details about which Rya Streams subsystem is being 
used.
+            return "Kafka Hostname: " + streamsDetails.get().getHostname() + 
", Kafka Port: " + streamsDetails.get().getPort();
+
+        } catch (final RyaClientException e) {
+            throw new RuntimeException("Could not fetch the Rya Details for 
this Rya instance.", e);
+        }
+    }
+
+    @CliCommand(value = STREAM_QUERIES_ADD_CMD, help = "Add a SPARQL query to 
the Rya Streams subsystem.")
+    public String addQuery(
+            @CliOption(key = {"inactive"}, mandatory = false, 
unspecifiedDefaultValue = "false", specifiedDefaultValue = "true",
+                       help = "Setting this flag will add the query, but not 
run it. (default: false)")
+            final boolean inactive) {
+        final RyaStreamsClient streamsClient = 
state.getShellState().getRyaStreamsCommands().get();
+
+        // Prompt the user for the SPARQL that defines the query.
+        try {
+            final Optional<String> sparql = sparqlPrompt.getSparql();
+
+            // If the user aborted the prompt, return.
+            if(!sparql.isPresent()) {
+                return "";
+            }
+
+            final StreamsQuery streamsQuery = 
streamsClient.getAddQuery().addQuery(sparql.get(), !inactive);
+            return "The added query's ID is " + streamsQuery.getQueryId();
+
+        } catch (final IOException | RyaStreamsException e) {
+            throw new RuntimeException("Unable to add the SPARQL query to the 
Rya Streams subsystem.", e);
+        }
+    }
+
+    @CliCommand(value = STREAM_QUERIES_DELETE_CMD, help = "Delete a SPARQL 
query from the Rya Streams subsystem.")
+    public String deleteQuery(
+            @CliOption(key= {"queryId"}, mandatory = true, help = "The ID of 
the query to remove.")
+            final String queryId) {
+
+        final RyaStreamsClient streamsClient = 
state.getShellState().getRyaStreamsCommands().get();
+        final UUID id = UUID.fromString(queryId);
+        try {
+            streamsClient.getDeleteQuery().delete(id);
+        } catch (final RyaStreamsException e) {
+            throw new RuntimeException("Could not delete the query from the 
Rya Streams subsystem.", e);
+        }
+        return "The query has been deleted.";
+    }
+
+    @CliCommand(value = STREAM_QUERIES_START_CMD, help = "Start processing a 
SPARQL query using the Rya Streams subsystem.")
+    public String startQuery(
+            @CliOption(key= {"queryId"}, mandatory = true, help = "The ID of 
the query to start processing.")
+            final String queryId) {
+        final RyaStreamsClient streamsClient = 
state.getShellState().getRyaStreamsCommands().get();
+
+        try {
+            // Ensure the query exists.
+            final UUID id = UUID.fromString(queryId);
+            final java.util.Optional<StreamsQuery> streamsQuery = 
streamsClient.getGetQuery().getQuery(id);
+            if(!streamsQuery.isPresent()) {
+                throw new RuntimeException("No Rya Streams query exists for ID 
" + queryId);
+            }
+
+            // Ensure it isn't already started.
+            if(streamsQuery.get().isActive()) {
+                return "That query is already running.";
+            }
+
+            // Start it.
+            streamsClient.getStartQuery().start(id);
+            return "The query will be processed by the Rya Streams subsystem.";
+
+        } catch (final RyaStreamsException e) {
+            throw new RuntimeException("Unable to start the Query.", e);
+        }
+    }
+
+    @CliCommand(value = STREAM_QUERIES_STOP_CMD, help = "Stop processing a 
SPARQL query using the Rya Streams subsystem.")
+    public String stopQuery(
+            @CliOption(key= {"queryId"}, mandatory = true, help = "The ID of 
the query to stop processing.")
+            final String queryId) {
+        final RyaStreamsClient streamsClient = 
state.getShellState().getRyaStreamsCommands().get();
+
+        try {
+            // Ensure the query exists.
+            final UUID id = UUID.fromString(queryId);
+            final java.util.Optional<StreamsQuery> streamsQuery = 
streamsClient.getGetQuery().getQuery(id);
+            if(!streamsQuery.isPresent()) {
+                throw new RuntimeException("No Rya Streams query exists for ID 
" + queryId);
+            }
+
+            // Ensure it isn't already stopped.
+            if(!streamsQuery.get().isActive()) {
+                return "That query is already stopped.";
+            }
+
+            // Stop it.
+            streamsClient.getStopQuery().stop(id);
+            return "The query will no longer be processed by the Rya Streams 
subsystem.";
+
+        } catch (final RyaStreamsException e) {
+            throw new RuntimeException("Unable to start the Query.", e);
+        }
+    }
+
+    @CliCommand(value = STREAM_QUERIES_LIST_CMD, help = "List the queries that 
are being managed by the configured Rya Streams subsystem.")
+    public String listQueries() {
+        final RyaStreamsClient streamsClient = 
state.getShellState().getRyaStreamsCommands().get();
+        try {
+            final Set<StreamsQuery> queries = 
streamsClient.getListQueries().all();
+            return StreamsQueryFormatter.format(queries);
+        } catch (final RyaStreamsException e) {
+            throw new RuntimeException("Unable to fetch the queries from the 
Rya Streams subsystem.", e);
+        } catch (final Exception e) {
+            throw new RuntimeException("Unable to print the query to the 
console.", e);
+        }
+    }
+
+    @CliCommand(value = STREAM_QUERIES_DETAILS_CMD, help = "Print detailed 
information about a specific query managed by the Rya Streams subsystem.")
+    public String printQueryDetails(
+            @CliOption(key= {"queryId"}, mandatory = true, help = "The ID of 
the query whose details will be printed.")
+            final String queryId) {
+        final RyaStreamsClient streamsClient = 
state.getShellState().getRyaStreamsCommands().get();
+        final UUID id = UUID.fromString(queryId);
+        try {
+            final java.util.Optional<StreamsQuery> query = 
streamsClient.getGetQuery().getQuery(id);
+            if(!query.isPresent()) {
+                return "There is no query with the specified ID.";
+            }
+            return StreamsQueryFormatter.format(query.get());
+        } catch (final RyaStreamsException e) {
+            throw new RuntimeException("Unable to fetch the query from the Rya 
Streams subsystem.", e);
+        } catch (final Exception e) {
+            throw new RuntimeException("Unable to print the query to the 
console.", e);
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/59b20263/extras/shell/src/main/java/org/apache/rya/shell/SharedShellState.java
----------------------------------------------------------------------
diff --git 
a/extras/shell/src/main/java/org/apache/rya/shell/SharedShellState.java 
b/extras/shell/src/main/java/org/apache/rya/shell/SharedShellState.java
index bb22fd3..58bcfbe 100644
--- a/extras/shell/src/main/java/org/apache/rya/shell/SharedShellState.java
+++ b/extras/shell/src/main/java/org/apache/rya/shell/SharedShellState.java
@@ -26,6 +26,8 @@ import java.util.concurrent.locks.ReentrantLock;
 import org.apache.rya.api.client.RyaClient;
 import org.apache.rya.api.client.accumulo.AccumuloConnectionDetails;
 import org.apache.rya.api.client.mongo.MongoConnectionDetails;
+import org.apache.rya.api.instance.RyaDetails;
+import org.apache.rya.streams.api.RyaStreamsClient;
 
 import com.google.common.base.Optional;
 import com.mongodb.MongoClient;
@@ -159,6 +161,34 @@ public class SharedShellState {
     }
 
     /**
+     * This method indicates a shift into a state where the shell may have to 
interact with the Rya Streams subsystem.
+     * <p/>
+     * Stores the {@link RyaStreamsClient} all Rya Streams commands will be 
executed against.
+     *
+     * @param ryaStreamsCommands - Rya Streams commands that will execute 
against the Rya Streams subsystem. (not null)
+     */
+    public void connectedToRyaStreams(
+            final RyaStreamsClient ryaStreamsCommands) {
+        requireNonNull(ryaStreamsCommands);
+
+        lock.lock();
+        try {
+            // Verify the Rya Shell is connected to an instance.
+            if(shellState.getConnectionState() != 
ConnectionState.CONNECTED_TO_INSTANCE) {
+                throw new IllegalStateException("You can not set the connected 
Rya Streams Client before connected to a Rya Instance.");
+            }
+
+            // Set the connected Rya Streams commands.
+            shellState = ShellState.builder( shellState )
+                    .setRyaStreamsCommands(ryaStreamsCommands)
+                    .build();
+
+        } finally {
+            lock.unlock();
+        }
+    }
+
+    /**
      * This method indicates a shift into the {@link DISCONNECTED} state.
      * <p/>
      * Clears all of the values associated with a Rya Storage/Instance 
connection.
@@ -225,6 +255,7 @@ public class SharedShellState {
         private final Optional<MongoConnectionDetails> mongoDetails;
         private final Optional<MongoClient> mongoAdminClient;
         private final Optional<RyaClient> connectedCommands;
+        private final Optional<RyaStreamsClient> ryaStreamsCommands;
 
         // Instance specific values.
         private final Optional<String> instanceName;
@@ -236,7 +267,8 @@ public class SharedShellState {
                 final Optional<MongoConnectionDetails> mongoDetails,
                 final Optional<MongoClient> mongoAdminClient,
                 final Optional<RyaClient> connectedCommands,
-                final Optional<String> instanceName) {
+                final Optional<String> instanceName,
+                final Optional<RyaStreamsClient> ryaStreamsCommands) {
             this.connectionState = requireNonNull(connectionState);
             this.storageType = requireNonNull(storageType);
             this.accumuloDetails = requireNonNull(accumuloDetails);
@@ -244,6 +276,7 @@ public class SharedShellState {
             this.mongoAdminClient = requireNonNull(mongoAdminClient);
             this.connectedCommands = requireNonNull(connectedCommands);
             this.instanceName = requireNonNull(instanceName);
+            this.ryaStreamsCommands = requireNonNull(ryaStreamsCommands);
         }
 
         /**
@@ -294,6 +327,15 @@ public class SharedShellState {
         }
 
         /**
+         * @return The {@link RyaStreamsClient} to use when a command on the 
shell is issued.
+         *   The value will not be present if the Rya Shell is not connected 
to an instance
+         *   whose {@link RyaDetails} indicate when Rya Streams system to use.
+         */
+        public Optional<RyaStreamsClient> getRyaStreamsCommands() {
+            return ryaStreamsCommands;
+        }
+
+        /**
          * @return The name of the Rya Instance the Rya Shell is issuing 
commands to.
          *   The value will not be present if the Rya Shell is not connected 
to a
          *   storage or if a target instance has not been set yet.
@@ -353,6 +395,7 @@ public class SharedShellState {
             private MongoConnectionDetails mongoDetails;
             private MongoClient mongoAdminClient;
             private RyaClient connectedCommands;
+            private RyaStreamsClient ryaStreamsCommands;
 
             // Instance specific values.
             private String instanceName;
@@ -375,6 +418,7 @@ public class SharedShellState {
                 this.mongoDetails = shellState.getMongoDetails().orNull();
                 this.mongoAdminClient = 
shellState.getMongoAdminClient().orNull();
                 this.connectedCommands = 
shellState.getConnectedCommands().orNull();
+                this.ryaStreamsCommands = 
shellState.getRyaStreamsCommands().orNull();
                 this.instanceName = shellState.getRyaInstanceName().orNull();
             }
 
@@ -435,6 +479,15 @@ public class SharedShellState {
             }
 
             /**
+             * @param ryaStreamsCommands - The {@link RyaStreamsClient} to use 
when a command on the shell is issued.
+             * @return This {@link Builder} so that method invocations may be 
chained.
+             */
+            public Builder setRyaStreamsCommands(@Nullable final 
RyaStreamsClient ryaStreamsCommands) {
+                this.ryaStreamsCommands = ryaStreamsCommands;
+                return this;
+            }
+
+            /**
              * @param instanceName - The name of the Rya Instance the Rya 
Shell is issuing commands to.
              * @return This {@link Builder} so that method invocations may be 
chained.
              */
@@ -454,7 +507,8 @@ public class SharedShellState {
                         Optional.fromNullable(mongoDetails),
                         Optional.fromNullable(mongoAdminClient),
                         Optional.fromNullable(connectedCommands),
-                        Optional.fromNullable(instanceName));
+                        Optional.fromNullable(instanceName),
+                        Optional.fromNullable(ryaStreamsCommands));
             }
         }
     }

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/59b20263/extras/shell/src/main/java/org/apache/rya/shell/util/StreamsQueryFormatter.java
----------------------------------------------------------------------
diff --git 
a/extras/shell/src/main/java/org/apache/rya/shell/util/StreamsQueryFormatter.java
 
b/extras/shell/src/main/java/org/apache/rya/shell/util/StreamsQueryFormatter.java
new file mode 100644
index 0000000..6c06caf
--- /dev/null
+++ 
b/extras/shell/src/main/java/org/apache/rya/shell/util/StreamsQueryFormatter.java
@@ -0,0 +1,106 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.shell.util;
+
+import static java.util.Objects.requireNonNull;
+
+import java.util.Collection;
+import java.util.List;
+
+import org.apache.rya.streams.api.entity.StreamsQuery;
+import org.openrdf.query.parser.ParsedQuery;
+import org.openrdf.query.parser.sparql.SPARQLParser;
+import org.openrdf.queryrender.sparql.SPARQLQueryRenderer;
+
+import com.google.common.collect.Lists;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+
+/**
+ * Pretty formats {@link StreamsQuery} objects.
+ */
+@DefaultAnnotation(NonNull.class)
+public final class StreamsQueryFormatter {
+
+    /**
+     * Pretty formats a {@link StreamsQuery}.
+     *
+     * @param query - The query to format. (not null)
+     * @return The pretty formatted string.
+     * @throws Exception A problem was encountered while pretty formatting the 
SPARQL.
+     */
+    public static String format(final StreamsQuery query) throws Exception {
+        requireNonNull(query);
+
+        // Pretty format the SPARQL query.
+        final ParsedQuery parsedQuery = new 
SPARQLParser().parseQuery(query.getSparql(), null);
+        final String prettySparql = new 
SPARQLQueryRenderer().render(parsedQuery);
+        final String[] lines = prettySparql.split("\n");
+
+        // Create the formatted string.
+        query.getQueryId();
+        query.isActive();
+
+        String.format(" QueryId: %s", query.getQueryId());
+
+        final StringBuilder builder = new StringBuilder();
+        builder.append(" Query ID: ").append( query.getQueryId() ) 
.append("\n");
+        builder.append("Is Active: ").append( query.isActive() ).append("\n");
+        builder.append("   SPARQL: ").append( lines[0] ).append("\n");
+
+        for(int i = 1; i < lines.length; i++) {
+            builder.append("           ").append(lines[i]).append("\n");
+        }
+        return builder.toString();
+    }
+
+    /**
+     * Pretty formats a collection {@link StreamsQuery}s.
+     * They will be sorted based on their Query IDs.
+     *
+     * @param queries - The queries to format. (not null)
+     * @return The pretty formatted string.
+     * @throws Exception A problem was encountered while pretty formatting the 
SPARQL.
+     */
+    public static String format(final Collection<StreamsQuery> queries) throws 
Exception {
+        requireNonNull(queries);
+
+        if(queries.size() == 1) {
+            return format(queries.iterator().next());
+        }
+
+        // Sort the queries based on their IDs.
+        final List<StreamsQuery> sorted = Lists.newArrayList(queries);
+        sorted.sort((query1, query2) -> {
+            final String id1 = query1.getQueryId().toString();
+            final String id2 = query2.getQueryId().toString();
+            return id1.compareTo(id2);
+        });
+
+        // Format a list of the queries.
+        final StringBuilder builder = new StringBuilder();
+        builder.append("-----------------------------------------------\n");
+        for(final StreamsQuery query : sorted) {
+            builder.append( format(query) );
+            
builder.append("-----------------------------------------------\n");
+        }
+        return builder.toString();
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/59b20263/extras/shell/src/main/resources/META-INF/spring/spring-shell-plugin.xml
----------------------------------------------------------------------
diff --git 
a/extras/shell/src/main/resources/META-INF/spring/spring-shell-plugin.xml 
b/extras/shell/src/main/resources/META-INF/spring/spring-shell-plugin.xml
index 361bf27..2473af1 100644
--- a/extras/shell/src/main/resources/META-INF/spring/spring-shell-plugin.xml
+++ b/extras/shell/src/main/resources/META-INF/spring/spring-shell-plugin.xml
@@ -31,7 +31,6 @@
     <!-- Define the shell state bean that will be shared across all of the 
commands. -->
     <bean id="sharedShellState" class="org.apache.rya.shell.SharedShellState" 
/>
     <bean id="passwordPrompt" 
class="org.apache.rya.shell.util.PasswordPrompt.JLinePasswordPrompt" />
-<!--     <bean id="installPrompt" 
class="org.apache.rya.shell.util.InstallPrompt.JLineAccumuloInstallPrompt" /> 
-->
     <bean id="installPrompt" 
class="org.apache.rya.shell.util.InstallPrompt.JLineInstallPropmpt" />
     <bean id="uninstallPrompt" 
class="org.apache.rya.shell.util.UninstallPrompt.JLineUninstallPrompt" />
     <bean id="sparqlPrompt" 
class="org.apache.rya.shell.util.SparqlPrompt.JLineSparqlPrompt" />
@@ -41,11 +40,11 @@
     <bean id="ryaConnectionCommands" 
class="org.apache.rya.shell.RyaConnectionCommands" />
     <bean id="ryaAdminCommands" class="org.apache.rya.shell.RyaAdminCommands" 
/>
     <bean id="ryaCommands" class="org.apache.rya.shell.RyaCommands" />
+    <bean id="ryaStreamsCommands" 
class="org.apache.rya.shell.RyaStreamsCommands" />
     
     <!--  
     <bean id="springHelpCommands" 
class="org.springframework.shell.commands.HelpCommands" />
     <bean id="springScriptCommands" 
class="org.springframework.shell.commands.ScriptCommands" />
     <bean id="springExitCommands" 
class="org.springframework.shell.commands.ExitCommands" />
     -->
-    
 </beans>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/59b20263/extras/shell/src/test/java/org/apache/rya/shell/RyaStreamsCommandsTest.java
----------------------------------------------------------------------
diff --git 
a/extras/shell/src/test/java/org/apache/rya/shell/RyaStreamsCommandsTest.java 
b/extras/shell/src/test/java/org/apache/rya/shell/RyaStreamsCommandsTest.java
new file mode 100644
index 0000000..9f5a794
--- /dev/null
+++ 
b/extras/shell/src/test/java/org/apache/rya/shell/RyaStreamsCommandsTest.java
@@ -0,0 +1,461 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.shell;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.util.Set;
+import java.util.UUID;
+
+import org.apache.rya.api.client.GetInstanceDetails;
+import org.apache.rya.api.client.RyaClient;
+import org.apache.rya.api.client.SetRyaStreamsConfiguration;
+import org.apache.rya.api.client.accumulo.AccumuloConnectionDetails;
+import org.apache.rya.api.instance.RyaDetails;
+import org.apache.rya.api.instance.RyaDetails.RyaStreamsDetails;
+import org.apache.rya.shell.util.SparqlPrompt;
+import org.apache.rya.streams.api.RyaStreamsClient;
+import org.apache.rya.streams.api.entity.StreamsQuery;
+import org.apache.rya.streams.api.interactor.AddQuery;
+import org.apache.rya.streams.api.interactor.DeleteQuery;
+import org.apache.rya.streams.api.interactor.GetQuery;
+import org.apache.rya.streams.api.interactor.ListQueries;
+import org.apache.rya.streams.api.interactor.StartQuery;
+import org.apache.rya.streams.api.interactor.StopQuery;
+import org.junit.Test;
+
+import com.google.common.base.Optional;
+import com.google.common.collect.Sets;
+
+/**
+ * Unit tests the methods of {@link RyaStreamsCommands}.
+ */
+public class RyaStreamsCommandsTest {
+
+    @Test
+    public void configureRyaStreams() throws Exception {
+        // Mock the object that performs the configure operation.
+        final RyaClient mockCommands = mock(RyaClient.class);
+        final SetRyaStreamsConfiguration setStreams = 
mock(SetRyaStreamsConfiguration.class);
+        
when(mockCommands.getSetRyaStreamsConfiguration()).thenReturn(setStreams);
+
+        // Mock a shell state and connect it to a Rya instance.
+        final SharedShellState state = new SharedShellState();
+        state.connectedToAccumulo(mock(AccumuloConnectionDetails.class), 
mockCommands);
+        state.connectedToInstance("unitTest");
+
+        // Verify that no Rya Streams Client is set to the state.
+        assertFalse(state.getShellState().getRyaStreamsCommands().isPresent());
+
+        try {
+            // Execute the command.
+            final RyaStreamsCommands commands = new RyaStreamsCommands(state, 
mock(SparqlPrompt.class));
+            final String message = commands.configureRyaStreams("localhost", 
6);
+
+            // Verify the request was forwarded to the mocked interactor.
+            final RyaStreamsDetails expectedDetails = new 
RyaStreamsDetails("localhost", 6);
+            verify(setStreams).setRyaStreamsConfiguration(eq("unitTest"), 
eq(expectedDetails));
+
+            // Verify a RyaStreamsClient was created and added to the state.
+            
assertTrue(state.getShellState().getRyaStreamsCommands().isPresent());
+
+            // Verify the correct message is reported.
+            final String expected = "The Rya Instance has been updated to use 
the provided Rya Streams subsystem. " +
+                    "Rya Streams commands are now avaiable while connected to 
this instance.";
+            assertEquals(expected, message);
+        } finally {
+            state.getShellState().getRyaStreamsCommands().get().close();
+        }
+    }
+
+    @Test
+    public void printRyaStreamsDetails_noRyaDetails() throws Exception {
+        // Mock the object that performs the configure operation.
+        final RyaClient mockCommands = mock(RyaClient.class);
+        final GetInstanceDetails getDetails = mock(GetInstanceDetails.class);
+        when(mockCommands.getGetInstanceDetails()).thenReturn(getDetails);
+
+        // When getting the instance details, ensure they are not found.
+        
when(getDetails.getDetails(eq("unitTest"))).thenReturn(Optional.absent());
+
+        // Mock a shell state and connect it to a Rya instance.
+        final SharedShellState state = new SharedShellState();
+        state.connectedToAccumulo(mock(AccumuloConnectionDetails.class), 
mockCommands);
+        state.connectedToInstance("unitTest");
+
+        // Execute the command.
+        final RyaStreamsCommands commands = new RyaStreamsCommands(state, 
mock(SparqlPrompt.class));
+        final String message = commands.printRyaStreamsDetails();
+        final String expected = "This instance does not have any Rya Details, 
so it is unable to be connected to the Rya Streams subsystem.";
+        assertEquals(expected, message);
+    }
+
+    @Test
+    public void printRyaStreamsDetails_notConfigured() throws Exception {
+        // Mock the object that performs the configure operation.
+        final RyaClient mockCommands = mock(RyaClient.class);
+        final GetInstanceDetails getDetails = mock(GetInstanceDetails.class);
+        when(mockCommands.getGetInstanceDetails()).thenReturn(getDetails);
+
+        // When getting the instance details, ensure they do not have 
RyaStreamsDetails to print.
+        final RyaDetails details = mock(RyaDetails.class);
+        when(details.getRyaStreamsDetails()).thenReturn(Optional.absent());
+        
when(getDetails.getDetails(eq("unitTest"))).thenReturn(Optional.of(details));
+
+        // Mock a shell state and connect it to a Rya instance.
+        final SharedShellState state = new SharedShellState();
+        state.connectedToAccumulo(mock(AccumuloConnectionDetails.class), 
mockCommands);
+        state.connectedToInstance("unitTest");
+
+        // Execute the command.
+        final RyaStreamsCommands commands = new RyaStreamsCommands(state, 
mock(SparqlPrompt.class));
+        final String message = commands.printRyaStreamsDetails();
+        final String expected = "This instance of Rya has not been configured 
to use a Rya Streams subsystem.";
+        assertEquals(expected, message);
+    }
+
+    @Test
+    public void printRyaStreamsDetails_configured() throws Exception {
+        // Mock the object that performs the configure operation.
+        final RyaClient mockCommands = mock(RyaClient.class);
+        final GetInstanceDetails getDetails = mock(GetInstanceDetails.class);
+        when(mockCommands.getGetInstanceDetails()).thenReturn(getDetails);
+
+        // When getting the instance details, ensure they do have 
RyaStreamsDetails to print.
+        final RyaDetails details = mock(RyaDetails.class);
+        when(details.getRyaStreamsDetails()).thenReturn(Optional.of(new 
RyaStreamsDetails("localhost", 6)));
+        
when(getDetails.getDetails(eq("unitTest"))).thenReturn(Optional.of(details));
+
+        // Mock a shell state and connect it to a Rya instance.
+        final SharedShellState state = new SharedShellState();
+        state.connectedToAccumulo(mock(AccumuloConnectionDetails.class), 
mockCommands);
+        state.connectedToInstance("unitTest");
+
+        // Execute the command.
+        final RyaStreamsCommands commands = new RyaStreamsCommands(state, 
mock(SparqlPrompt.class));
+        final String message = commands.printRyaStreamsDetails();
+        final String expected = "Kafka Hostname: localhost, Kafka Port: 6";
+        assertEquals(expected, message);
+    }
+
+    @Test
+    public void addQuery_userAbortsSparqlPrompt() throws Exception {
+        // Mock the object that performs the rya streams operation.
+        final RyaStreamsClient mockClient = mock(RyaStreamsClient.class);
+        final AddQuery addQuery = mock(AddQuery.class);
+        when(mockClient.getAddQuery()).thenReturn(addQuery);
+
+        // Mock a SPARQL prompt that a user aborts.
+        final SparqlPrompt prompt = mock(SparqlPrompt.class);
+        when(prompt.getSparql()).thenReturn(Optional.absent());
+
+        // Mock a shell state and connect it to a Rya instance.
+        final SharedShellState state = new SharedShellState();
+        state.connectedToAccumulo(mock(AccumuloConnectionDetails.class), 
mock(RyaClient.class));
+        state.connectedToInstance("unitTest");
+        state.connectedToRyaStreams(mockClient);
+
+        // Execute the command.
+        final RyaStreamsCommands commands = new RyaStreamsCommands(state, 
prompt);
+        final String message = commands.addQuery(false);
+
+        // Verify a message is printed to the user.
+        assertEquals("", message);
+    }
+
+    @Test
+    public void addQuery() throws Exception {
+        // Mock the object that performs the rya streams operation.
+        final RyaStreamsClient mockClient = mock(RyaStreamsClient.class);
+        final AddQuery addQuery = mock(AddQuery.class);
+        when(mockClient.getAddQuery()).thenReturn(addQuery);
+
+        final StreamsQuery addedQuery = new StreamsQuery(UUID.randomUUID(), 
"query", true);
+        when(addQuery.addQuery(eq("query"), eq(true))).thenReturn(addedQuery);
+
+        // Mock a SPARQL prompt that a user entered a query through.
+        final SparqlPrompt prompt = mock(SparqlPrompt.class);
+        when(prompt.getSparql()).thenReturn(Optional.of("query"));
+
+        // Mock a shell state and connect it to a Rya instance.
+        final SharedShellState state = new SharedShellState();
+        state.connectedToAccumulo(mock(AccumuloConnectionDetails.class), 
mock(RyaClient.class));
+        state.connectedToInstance("unitTest");
+        state.connectedToRyaStreams(mockClient);
+
+        // Execute the command.
+        final RyaStreamsCommands commands = new RyaStreamsCommands(state, 
prompt);
+        final String message = commands.addQuery(false);
+
+        // Verify the interactor was invoked with the provided input.
+        verify(addQuery).addQuery("query", true);
+
+        // Verify a message is printed to the user.
+        final String expected = "The added query's ID is " + 
addedQuery.getQueryId();
+        assertEquals(expected, message);
+    }
+
+    @Test
+    public void deleteQuery() throws Exception {
+        // Mock the object that performs the rya streams operation.
+        final RyaStreamsClient mockClient = mock(RyaStreamsClient.class);
+        final DeleteQuery deleteQuery = mock(DeleteQuery.class);
+        when(mockClient.getDeleteQuery()).thenReturn(deleteQuery);
+
+        // Mock a shell state and connect it to a Rya instance.
+        final SharedShellState state = new SharedShellState();
+        state.connectedToAccumulo(mock(AccumuloConnectionDetails.class), 
mock(RyaClient.class));
+        state.connectedToInstance("unitTest");
+        state.connectedToRyaStreams(mockClient);
+
+        // Execute the command.
+        final RyaStreamsCommands commands = new RyaStreamsCommands(state, 
mock(SparqlPrompt.class));
+        final UUID queryId = UUID.randomUUID();
+        final String message = commands.deleteQuery(queryId.toString());
+
+        // Verify the interactor was invoked with the provided parameters.
+        verify(deleteQuery).delete(eq(queryId));
+
+        // Verify a message is printed to the user.
+        final String expected = "The query has been deleted.";
+        assertEquals(expected, message);
+    }
+
+    @Test
+    public void startQuery() throws Exception {
+        // Mock the object that performs the rya streams operation.
+        final RyaStreamsClient mockClient = mock(RyaStreamsClient.class);
+        final StartQuery startQuery = mock(StartQuery.class);
+        when(mockClient.getStartQuery()).thenReturn(startQuery);
+        final GetQuery getQuery = mock(GetQuery.class);
+        when(mockClient.getGetQuery()).thenReturn(getQuery);
+
+        // Mock a shell state and connect it to a Rya instance.
+        final SharedShellState state = new SharedShellState();
+        state.connectedToAccumulo(mock(AccumuloConnectionDetails.class), 
mock(RyaClient.class));
+        state.connectedToInstance("unitTest");
+        state.connectedToRyaStreams(mockClient);
+
+        // Report the query as not running.
+        final UUID queryId = UUID.randomUUID();
+        
when(getQuery.getQuery(eq(queryId))).thenReturn(java.util.Optional.of(new 
StreamsQuery(queryId, "sparql", false)));
+
+        // Execute the command.
+        final RyaStreamsCommands commands = new RyaStreamsCommands(state, 
mock(SparqlPrompt.class));
+        final String message = commands.startQuery(queryId.toString());
+
+        // Verify the interactor was invoked with the provided parameters.
+        verify(startQuery).start(queryId);
+
+        // Verify a message is printed to the user.
+        final String expected = "The query will be processed by the Rya 
Streams subsystem.";
+        assertEquals(expected, message);
+    }
+
+    @Test
+    public void startQuery_alreadyRunning() throws Exception{
+        // Mock the object that performs the rya streams operation.
+        final RyaStreamsClient mockClient = mock(RyaStreamsClient.class);
+        final StartQuery startQuery = mock(StartQuery.class);
+        when(mockClient.getStartQuery()).thenReturn(startQuery);
+        final GetQuery getQuery = mock(GetQuery.class);
+        when(mockClient.getGetQuery()).thenReturn(getQuery);
+
+        // Mock a shell state and connect it to a Rya instance.
+        final SharedShellState state = new SharedShellState();
+        state.connectedToAccumulo(mock(AccumuloConnectionDetails.class), 
mock(RyaClient.class));
+        state.connectedToInstance("unitTest");
+        state.connectedToRyaStreams(mockClient);
+
+        // Report the query as running.
+        final UUID queryId = UUID.randomUUID();
+        
when(getQuery.getQuery(eq(queryId))).thenReturn(java.util.Optional.of(new 
StreamsQuery(queryId, "sparql", true)));
+
+        // Execute the command.
+        final RyaStreamsCommands commands = new RyaStreamsCommands(state, 
mock(SparqlPrompt.class));
+        final String message = commands.startQuery(queryId.toString());
+
+        // Verify the interactor was not invoked.
+        verify(startQuery, never()).start(queryId);
+
+        // Verify a message is printed to the user.
+        final String expected = "That query is already running.";
+        assertEquals(expected, message);
+    }
+
+    @Test
+    public void stopQuery() throws Exception {
+        // Mock the object that performs the rya streams operation.
+        final RyaStreamsClient mockClient = mock(RyaStreamsClient.class);
+        final StopQuery stopQuery = mock(StopQuery.class);
+        when(mockClient.getStopQuery()).thenReturn(stopQuery);
+        final GetQuery getQuery = mock(GetQuery.class);
+        when(mockClient.getGetQuery()).thenReturn(getQuery);
+
+        // Mock a shell state and connect it to a Rya instance.
+        final SharedShellState state = new SharedShellState();
+        state.connectedToAccumulo(mock(AccumuloConnectionDetails.class), 
mock(RyaClient.class));
+        state.connectedToInstance("unitTest");
+        state.connectedToRyaStreams(mockClient);
+
+        // Report the query as running.
+        final UUID queryId = UUID.randomUUID();
+        
when(getQuery.getQuery(eq(queryId))).thenReturn(java.util.Optional.of(new 
StreamsQuery(queryId, "sparql", true)));
+
+        // Execute the command.
+        final RyaStreamsCommands commands = new RyaStreamsCommands(state, 
mock(SparqlPrompt.class));
+        final String message = commands.stopQuery(queryId.toString());
+
+        // Verify the interactor was invoked with the provided parameters.
+        verify(stopQuery).stop(queryId);
+
+        // Verify a message is printed to the user.
+        final String expected = "The query will no longer be processed by the 
Rya Streams subsystem.";
+        assertEquals(expected, message);
+    }
+
+    @Test
+    public void stopQuery_alreadyStopped() throws Exception {
+        // Mock the object that performs the rya streams operation.
+        final RyaStreamsClient mockClient = mock(RyaStreamsClient.class);
+        final StopQuery stopQuery = mock(StopQuery.class);
+        when(mockClient.getStopQuery()).thenReturn(stopQuery);
+        final GetQuery getQuery = mock(GetQuery.class);
+        when(mockClient.getGetQuery()).thenReturn(getQuery);
+
+        // Mock a shell state and connect it to a Rya instance.
+        final SharedShellState state = new SharedShellState();
+        state.connectedToAccumulo(mock(AccumuloConnectionDetails.class), 
mock(RyaClient.class));
+        state.connectedToInstance("unitTest");
+        state.connectedToRyaStreams(mockClient);
+
+        // Report the query as not running.
+        final UUID queryId = UUID.randomUUID();
+        
when(getQuery.getQuery(eq(queryId))).thenReturn(java.util.Optional.of(new 
StreamsQuery(queryId, "sparql", false)));
+
+        // Execute the command.
+        final RyaStreamsCommands commands = new RyaStreamsCommands(state, 
mock(SparqlPrompt.class));
+        final String message = commands.stopQuery(queryId.toString());
+
+        // Verify the interactor was not invoked with the provided parameters.
+        verify(stopQuery, never()).stop(queryId);
+
+        // Verify a message is printed to the user.
+        final String expected = "That query is already stopped.";
+        assertEquals(expected, message);
+    }
+
+    @Test
+    public void listQueries() throws Exception {
+        // Mock the object that performs the rya streams operation.
+        final RyaStreamsClient mockClient = mock(RyaStreamsClient.class);
+        final ListQueries listQueries = mock(ListQueries.class);
+        when(mockClient.getListQueries()).thenReturn(listQueries);
+
+        final Set<StreamsQuery> queries = Sets.newHashSet(
+                new StreamsQuery(
+                        
UUID.fromString("33333333-3333-3333-3333-333333333333"),
+                        "SELECT * WHERE { ?person <urn:worksAt> ?business . }",
+                        true),
+                new StreamsQuery(
+                        
UUID.fromString("11111111-1111-1111-1111-111111111111"),
+                        "SELECT * WHERE { ?a ?b ?c . }",
+                        true),
+                new StreamsQuery(
+                        
UUID.fromString("22222222-2222-2222-2222-222222222222"),
+                        "SELECT * WHERE { ?d ?e ?f . }",
+                        false));
+        when(listQueries.all()).thenReturn(queries);
+
+        // Mock a shell state and connect it to a Rya instance.
+        final SharedShellState state = new SharedShellState();
+        state.connectedToAccumulo(mock(AccumuloConnectionDetails.class), 
mock(RyaClient.class));
+        state.connectedToInstance("unitTest");
+        state.connectedToRyaStreams(mockClient);
+
+        // Execute the command.
+        final RyaStreamsCommands commands = new RyaStreamsCommands(state, 
mock(SparqlPrompt.class));
+        final String message = commands.listQueries();
+
+        // Verify the correct report is returned.
+        final String expected =
+                "-----------------------------------------------\n" +
+                " Query ID: 11111111-1111-1111-1111-111111111111\n" +
+                "Is Active: true\n" +
+                "   SPARQL: select ?a ?b ?c\n" +
+                "           where {\n" +
+                "             ?a ?b ?c.\n" +
+                "           }\n" +
+                "-----------------------------------------------\n" +
+                " Query ID: 22222222-2222-2222-2222-222222222222\n" +
+                "Is Active: false\n" +
+                "   SPARQL: select ?d ?e ?f\n" +
+                "           where {\n" +
+                "             ?d ?e ?f.\n" +
+                "           }\n" +
+                "-----------------------------------------------\n" +
+                " Query ID: 33333333-3333-3333-3333-333333333333\n" +
+                "Is Active: true\n" +
+                "   SPARQL: select ?person ?business\n" +
+                "           where {\n" +
+                "             ?person <urn:worksAt> ?business.\n" +
+                "           }\n" +
+                "-----------------------------------------------\n";
+        assertEquals(expected, message);
+    }
+
+    @Test
+    public void printQueryDetails() throws Exception {
+        // Mock the object that performs the rya streams operation.
+        final RyaStreamsClient mockClient = mock(RyaStreamsClient.class);
+        final GetQuery getQuery = mock(GetQuery.class);
+        when(mockClient.getGetQuery()).thenReturn(getQuery);
+
+        final UUID queryId = 
UUID.fromString("da55cea5-c21c-46a5-ab79-5433eef4efaa");
+        final StreamsQuery query = new StreamsQuery(queryId, "SELECT * WHERE { 
?a ?b ?c . }", true);
+        
when(getQuery.getQuery(queryId)).thenReturn(java.util.Optional.of(query));
+
+        // Mock a shell state and connect it to a Rya instance.
+        final SharedShellState state = new SharedShellState();
+        state.connectedToAccumulo(mock(AccumuloConnectionDetails.class), 
mock(RyaClient.class));
+        state.connectedToInstance("unitTest");
+        state.connectedToRyaStreams(mockClient);
+
+        // Execute the command.
+        final RyaStreamsCommands commands = new RyaStreamsCommands(state, 
mock(SparqlPrompt.class));
+        final String message = commands.printQueryDetails(queryId.toString());
+
+        // Verify the correct report is returned.
+        final String expected =
+                " Query ID: da55cea5-c21c-46a5-ab79-5433eef4efaa\n" +
+                "Is Active: true\n" +
+                "   SPARQL: select ?a ?b ?c\n" +
+                "           where {\n" +
+                "             ?a ?b ?c.\n" +
+                "           }\n";
+        assertEquals(expected, message);
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/59b20263/extras/shell/src/test/java/org/apache/rya/shell/SharedShellStateTest.java
----------------------------------------------------------------------
diff --git 
a/extras/shell/src/test/java/org/apache/rya/shell/SharedShellStateTest.java 
b/extras/shell/src/test/java/org/apache/rya/shell/SharedShellStateTest.java
index b5f136c..e1ad039 100644
--- a/extras/shell/src/test/java/org/apache/rya/shell/SharedShellStateTest.java
+++ b/extras/shell/src/test/java/org/apache/rya/shell/SharedShellStateTest.java
@@ -25,6 +25,7 @@ import org.apache.rya.api.client.RyaClient;
 import org.apache.rya.api.client.accumulo.AccumuloConnectionDetails;
 import org.apache.rya.shell.SharedShellState.ConnectionState;
 import org.apache.rya.shell.SharedShellState.ShellState;
+import org.apache.rya.streams.api.RyaStreamsClient;
 import org.junit.Test;
 
 /**
@@ -163,4 +164,34 @@ public class SharedShellStateTest {
                 .build();
         assertEquals(expected, state.getShellState());
     }
+
+    @Test(expected = IllegalStateException.class)
+    public void connectedToRyaStreams_notConnectedToInstance() throws 
Exception {
+        // Create a shell state that is not connected to an instance.
+        final SharedShellState state = new SharedShellState();
+
+        // Connecting to a Rya Streams system fails.
+        state.connectedToRyaStreams( mock(RyaStreamsClient.class) );
+    }
+
+    @Test
+    public void connectedToRyaStreams() {
+        // Create a shell state.
+        final SharedShellState state = new SharedShellState();
+
+        // Connect to Accumulo.
+        final AccumuloConnectionDetails connectionDetails = 
mock(AccumuloConnectionDetails.class);
+        final RyaClient connectedCommands = mock(RyaClient.class);
+        state.connectedToAccumulo(connectionDetails, connectedCommands);
+
+        // Connect to an Instance.
+        state.connectedToInstance("instance");
+
+        // Connect to Rya Streams for the instance.
+        final RyaStreamsClient streamsClient = mock(RyaStreamsClient.class);
+        state.connectedToRyaStreams(streamsClient);
+
+        // Verify the state.
+        assertEquals(streamsClient, 
state.getShellState().getRyaStreamsCommands().get());
+    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/59b20263/extras/shell/src/test/java/org/apache/rya/shell/util/StreamsQueryFormatterTest.java
----------------------------------------------------------------------
diff --git 
a/extras/shell/src/test/java/org/apache/rya/shell/util/StreamsQueryFormatterTest.java
 
b/extras/shell/src/test/java/org/apache/rya/shell/util/StreamsQueryFormatterTest.java
new file mode 100644
index 0000000..8e5d251
--- /dev/null
+++ 
b/extras/shell/src/test/java/org/apache/rya/shell/util/StreamsQueryFormatterTest.java
@@ -0,0 +1,102 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.shell.util;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.Set;
+import java.util.UUID;
+
+import org.apache.rya.streams.api.entity.StreamsQuery;
+import org.junit.Test;
+
+import com.google.common.collect.Sets;
+
+/**
+ * Unit tests the methods of {@link StreamsQueryFormatter}.
+ */
+public class StreamsQueryFormatterTest {
+
+    @Test
+    public void formatQuery() throws Exception {
+        // Format the query.
+        final StreamsQuery query = new StreamsQuery(
+                UUID.fromString("da55cea5-c21c-46a5-ab79-5433eef4efaa"),
+                "SELECT * WHERE { ?a ?b ?c . }",
+                true);
+        final String formatted = StreamsQueryFormatter.format(query);
+
+        // Ensure it has the expected format.
+        final String expected =
+                " Query ID: da55cea5-c21c-46a5-ab79-5433eef4efaa\n" +
+                "Is Active: true\n" +
+                "   SPARQL: select ?a ?b ?c\n" +
+                "           where {\n" +
+                "             ?a ?b ?c.\n" +
+                "           }\n";
+
+        assertEquals(expected, formatted);
+    }
+
+    @Test
+    public void formatQueries() throws Exception {
+        // Format the queries.
+        final Set<StreamsQuery> queries = Sets.newHashSet(
+                new StreamsQuery(
+                        
UUID.fromString("33333333-3333-3333-3333-333333333333"),
+                        "SELECT * WHERE { ?person <urn:worksAt> ?business . }",
+                        true),
+                new StreamsQuery(
+                        
UUID.fromString("11111111-1111-1111-1111-111111111111"),
+                        "SELECT * WHERE { ?a ?b ?c . }",
+                        true),
+                new StreamsQuery(
+                        
UUID.fromString("22222222-2222-2222-2222-222222222222"),
+                        "SELECT * WHERE { ?d ?e ?f . }",
+                        false));
+
+        final String formatted = StreamsQueryFormatter.format(queries);
+
+        // Ensure it has the expected format.
+        final String expected =
+                "-----------------------------------------------\n" +
+                " Query ID: 11111111-1111-1111-1111-111111111111\n" +
+                "Is Active: true\n" +
+                "   SPARQL: select ?a ?b ?c\n" +
+                "           where {\n" +
+                "             ?a ?b ?c.\n" +
+                "           }\n" +
+                "-----------------------------------------------\n" +
+                " Query ID: 22222222-2222-2222-2222-222222222222\n" +
+                "Is Active: false\n" +
+                "   SPARQL: select ?d ?e ?f\n" +
+                "           where {\n" +
+                "             ?d ?e ?f.\n" +
+                "           }\n" +
+                "-----------------------------------------------\n" +
+                " Query ID: 33333333-3333-3333-3333-333333333333\n" +
+                "Is Active: true\n" +
+                "   SPARQL: select ?person ?business\n" +
+                "           where {\n" +
+                "             ?person <urn:worksAt> ?business.\n" +
+                "           }\n" +
+                "-----------------------------------------------\n";
+        assertEquals(expected, formatted);
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/59b20263/extras/shell/src/test/resources/RyaShellTest-context.xml
----------------------------------------------------------------------
diff --git a/extras/shell/src/test/resources/RyaShellTest-context.xml 
b/extras/shell/src/test/resources/RyaShellTest-context.xml
index f7ffe0f..54c44cf 100644
--- a/extras/shell/src/test/resources/RyaShellTest-context.xml
+++ b/extras/shell/src/test/resources/RyaShellTest-context.xml
@@ -60,4 +60,5 @@
     <bean id="ryaConnectionCommands" 
class="org.apache.rya.shell.RyaConnectionCommands" />
     <bean id="ryaCommands" class="org.apache.rya.shell.RyaCommands" />
     <bean id="ryaAdminCommands" class="org.apache.rya.shell.RyaAdminCommands" 
/>
+    <bean id="ryaStreamsCommands" 
class="org.apache.rya.shell.RyaStreamsCommands" />
 </beans>
\ No newline at end of file

Reply via email to