Repository: incubator-rya Updated Branches: refs/heads/master 3d4a5d0e6 -> 59b20263c
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/59b20263/extras/shell/src/main/java/org/apache/rya/shell/RyaStreamsCommands.java ---------------------------------------------------------------------- diff --git a/extras/shell/src/main/java/org/apache/rya/shell/RyaStreamsCommands.java b/extras/shell/src/main/java/org/apache/rya/shell/RyaStreamsCommands.java new file mode 100644 index 0000000..5f7df84 --- /dev/null +++ b/extras/shell/src/main/java/org/apache/rya/shell/RyaStreamsCommands.java @@ -0,0 +1,297 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.shell; + +import static java.util.Objects.requireNonNull; + +import java.io.IOException; +import java.util.Set; +import java.util.UUID; + +import org.apache.rya.api.client.RyaClient; +import org.apache.rya.api.client.RyaClientException; +import org.apache.rya.api.instance.RyaDetails; +import org.apache.rya.api.instance.RyaDetails.RyaStreamsDetails; +import org.apache.rya.shell.SharedShellState.ConnectionState; +import org.apache.rya.shell.util.SparqlPrompt; +import org.apache.rya.shell.util.StreamsQueryFormatter; +import org.apache.rya.streams.api.RyaStreamsClient; +import org.apache.rya.streams.api.entity.StreamsQuery; +import org.apache.rya.streams.api.exception.RyaStreamsException; +import org.apache.rya.streams.kafka.KafkaRyaStreamsClientFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.shell.core.CommandMarker; +import org.springframework.shell.core.annotation.CliAvailabilityIndicator; +import org.springframework.shell.core.annotation.CliCommand; +import org.springframework.shell.core.annotation.CliOption; +import org.springframework.stereotype.Component; + +import com.google.common.base.Optional; + +/** + * Rya Shell commands used to interact with the Rya Streams subsystem. + */ +@Component +public class RyaStreamsCommands implements CommandMarker { + + public static final String STREAMS_CONFIGURE_CMD = "streams-configure"; + public static final String STREAMS_DETAILS_CMD = "streams-details"; + public static final String STREAM_QUERIES_ADD_CMD = "streams-queries-add"; + public static final String STREAM_QUERIES_DELETE_CMD = "streams-queries-delete"; + public static final String STREAM_QUERIES_START_CMD = "streams-queries-start"; + public static final String STREAM_QUERIES_STOP_CMD = "streams-queries-stop"; + public static final String STREAM_QUERIES_LIST_CMD = "streams-queries-list"; + public static final String STREAM_QUERIES_DETAILS_CMD = "streams-queries-details"; + + private final SharedShellState state; + private final SparqlPrompt sparqlPrompt; + + /** + * Constructs an instance of {@link RyaStreamsCommands}. + * + * @param state - Holds shared state between all of the command classes. (not null) + * @param sparqlPrompt - Prompts a user for a SPARQL query. (not null) + */ + @Autowired + public RyaStreamsCommands( + final SharedShellState state, + final SparqlPrompt sparqlPrompt) { + this.state = requireNonNull(state); + this.sparqlPrompt = requireNonNull(sparqlPrompt); + } + + /** + * Enables commands that become available when connected to a Rya instance. + */ + @CliAvailabilityIndicator({ + STREAMS_CONFIGURE_CMD, + STREAMS_DETAILS_CMD}) + public boolean areConfigCommandsAvailable() { + return state.getShellState().getConnectionState() == ConnectionState.CONNECTED_TO_INSTANCE; + } + + /** + * Enables commands that become available when a Rya instance has a configured Rya Streams subsystem to use. + */ + @CliAvailabilityIndicator({ + STREAM_QUERIES_ADD_CMD, + STREAM_QUERIES_DELETE_CMD, + STREAM_QUERIES_START_CMD, + STREAM_QUERIES_STOP_CMD, + STREAM_QUERIES_LIST_CMD, + STREAM_QUERIES_DETAILS_CMD}) + public boolean areQueriesCommandsAvailable() { + return state.getShellState().getRyaStreamsCommands().isPresent(); + } + + @CliCommand(value = STREAMS_CONFIGURE_CMD, help = "Connect a Rya Streams subsystem to a Rya Instance.") + public String configureRyaStreams( + @CliOption(key = {"kafkaHostname"}, mandatory = true, help = "The hostname of the Kafka Broker.") + final String kafkaHostname, + @CliOption(key = {"kafkaPort"}, mandatory = true, help = "The port of the Kafka Broker.") + final int kafkaPort) { + + // If this instance was connected to a different Rya Streams subsystem, then close that client. + final Optional<RyaStreamsClient> oldClient = state.getShellState().getRyaStreamsCommands(); + if(oldClient.isPresent()) { + try { + oldClient.get().close(); + } catch (final Exception e) { + System.err.print("Warning: Could not close the old Rya Streams Client."); + e.printStackTrace(); + } + } + + // Update the Rya Details for the connected Rya Instance. + final String ryaInstance = state.getShellState().getRyaInstanceName().get(); + final RyaClient ryaClient = state.getShellState().getConnectedCommands().get(); + try { + final RyaStreamsDetails streamsDetails = new RyaStreamsDetails(kafkaHostname, kafkaPort); + ryaClient.getSetRyaStreamsConfiguration().setRyaStreamsConfiguration(ryaInstance, streamsDetails); + } catch (final RyaClientException e) { + throw new RuntimeException("Could not update the Rya instance's Rya Details to include the new " + + "information. This command failed to complete.", e); + } + + // Connect a Rya Streams Client and set it in the shared state. + final RyaStreamsClient newClient = KafkaRyaStreamsClientFactory.make(ryaInstance, kafkaHostname, kafkaPort); + state.connectedToRyaStreams(newClient); + + // Return a message that indicates the operation was successful. + if(oldClient.isPresent()) { + return "The Rya Streams subsystem that this Rya instance uses has been changed. Any queries that were " + + "maintained by the previous subsystem will need to be migrated to the new one."; + } else { + return "The Rya Instance has been updated to use the provided Rya Streams subsystem. " + + "Rya Streams commands are now avaiable while connected to this instance."; + } + } + + @CliCommand(value = STREAMS_DETAILS_CMD, help = "Print information about which Rya Streams subsystem the Rya instance is connected to.") + public String printRyaStreamsDetails() { + final String ryaInstance = state.getShellState().getRyaInstanceName().get(); + final RyaClient client = state.getShellState().getConnectedCommands().get(); + try { + // Handle the case where the instance does not have Rya Details. + final Optional<RyaDetails> details = client.getGetInstanceDetails().getDetails(ryaInstance); + if(!details.isPresent()) { + return "This instance does not have any Rya Details, so it is unable to be connected to the Rya Streams subsystem."; + } + + // Print a message based on if the instance is connected to Rya Streams. + final Optional<RyaStreamsDetails> streamsDetails = details.get().getRyaStreamsDetails(); + if(!streamsDetails.isPresent()) { + return "This instance of Rya has not been configured to use a Rya Streams subsystem."; + } + + // Print the details about which Rya Streams subsystem is being used. + return "Kafka Hostname: " + streamsDetails.get().getHostname() + ", Kafka Port: " + streamsDetails.get().getPort(); + + } catch (final RyaClientException e) { + throw new RuntimeException("Could not fetch the Rya Details for this Rya instance.", e); + } + } + + @CliCommand(value = STREAM_QUERIES_ADD_CMD, help = "Add a SPARQL query to the Rya Streams subsystem.") + public String addQuery( + @CliOption(key = {"inactive"}, mandatory = false, unspecifiedDefaultValue = "false", specifiedDefaultValue = "true", + help = "Setting this flag will add the query, but not run it. (default: false)") + final boolean inactive) { + final RyaStreamsClient streamsClient = state.getShellState().getRyaStreamsCommands().get(); + + // Prompt the user for the SPARQL that defines the query. + try { + final Optional<String> sparql = sparqlPrompt.getSparql(); + + // If the user aborted the prompt, return. + if(!sparql.isPresent()) { + return ""; + } + + final StreamsQuery streamsQuery = streamsClient.getAddQuery().addQuery(sparql.get(), !inactive); + return "The added query's ID is " + streamsQuery.getQueryId(); + + } catch (final IOException | RyaStreamsException e) { + throw new RuntimeException("Unable to add the SPARQL query to the Rya Streams subsystem.", e); + } + } + + @CliCommand(value = STREAM_QUERIES_DELETE_CMD, help = "Delete a SPARQL query from the Rya Streams subsystem.") + public String deleteQuery( + @CliOption(key= {"queryId"}, mandatory = true, help = "The ID of the query to remove.") + final String queryId) { + + final RyaStreamsClient streamsClient = state.getShellState().getRyaStreamsCommands().get(); + final UUID id = UUID.fromString(queryId); + try { + streamsClient.getDeleteQuery().delete(id); + } catch (final RyaStreamsException e) { + throw new RuntimeException("Could not delete the query from the Rya Streams subsystem.", e); + } + return "The query has been deleted."; + } + + @CliCommand(value = STREAM_QUERIES_START_CMD, help = "Start processing a SPARQL query using the Rya Streams subsystem.") + public String startQuery( + @CliOption(key= {"queryId"}, mandatory = true, help = "The ID of the query to start processing.") + final String queryId) { + final RyaStreamsClient streamsClient = state.getShellState().getRyaStreamsCommands().get(); + + try { + // Ensure the query exists. + final UUID id = UUID.fromString(queryId); + final java.util.Optional<StreamsQuery> streamsQuery = streamsClient.getGetQuery().getQuery(id); + if(!streamsQuery.isPresent()) { + throw new RuntimeException("No Rya Streams query exists for ID " + queryId); + } + + // Ensure it isn't already started. + if(streamsQuery.get().isActive()) { + return "That query is already running."; + } + + // Start it. + streamsClient.getStartQuery().start(id); + return "The query will be processed by the Rya Streams subsystem."; + + } catch (final RyaStreamsException e) { + throw new RuntimeException("Unable to start the Query.", e); + } + } + + @CliCommand(value = STREAM_QUERIES_STOP_CMD, help = "Stop processing a SPARQL query using the Rya Streams subsystem.") + public String stopQuery( + @CliOption(key= {"queryId"}, mandatory = true, help = "The ID of the query to stop processing.") + final String queryId) { + final RyaStreamsClient streamsClient = state.getShellState().getRyaStreamsCommands().get(); + + try { + // Ensure the query exists. + final UUID id = UUID.fromString(queryId); + final java.util.Optional<StreamsQuery> streamsQuery = streamsClient.getGetQuery().getQuery(id); + if(!streamsQuery.isPresent()) { + throw new RuntimeException("No Rya Streams query exists for ID " + queryId); + } + + // Ensure it isn't already stopped. + if(!streamsQuery.get().isActive()) { + return "That query is already stopped."; + } + + // Stop it. + streamsClient.getStopQuery().stop(id); + return "The query will no longer be processed by the Rya Streams subsystem."; + + } catch (final RyaStreamsException e) { + throw new RuntimeException("Unable to start the Query.", e); + } + } + + @CliCommand(value = STREAM_QUERIES_LIST_CMD, help = "List the queries that are being managed by the configured Rya Streams subsystem.") + public String listQueries() { + final RyaStreamsClient streamsClient = state.getShellState().getRyaStreamsCommands().get(); + try { + final Set<StreamsQuery> queries = streamsClient.getListQueries().all(); + return StreamsQueryFormatter.format(queries); + } catch (final RyaStreamsException e) { + throw new RuntimeException("Unable to fetch the queries from the Rya Streams subsystem.", e); + } catch (final Exception e) { + throw new RuntimeException("Unable to print the query to the console.", e); + } + } + + @CliCommand(value = STREAM_QUERIES_DETAILS_CMD, help = "Print detailed information about a specific query managed by the Rya Streams subsystem.") + public String printQueryDetails( + @CliOption(key= {"queryId"}, mandatory = true, help = "The ID of the query whose details will be printed.") + final String queryId) { + final RyaStreamsClient streamsClient = state.getShellState().getRyaStreamsCommands().get(); + final UUID id = UUID.fromString(queryId); + try { + final java.util.Optional<StreamsQuery> query = streamsClient.getGetQuery().getQuery(id); + if(!query.isPresent()) { + return "There is no query with the specified ID."; + } + return StreamsQueryFormatter.format(query.get()); + } catch (final RyaStreamsException e) { + throw new RuntimeException("Unable to fetch the query from the Rya Streams subsystem.", e); + } catch (final Exception e) { + throw new RuntimeException("Unable to print the query to the console.", e); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/59b20263/extras/shell/src/main/java/org/apache/rya/shell/SharedShellState.java ---------------------------------------------------------------------- diff --git a/extras/shell/src/main/java/org/apache/rya/shell/SharedShellState.java b/extras/shell/src/main/java/org/apache/rya/shell/SharedShellState.java index bb22fd3..58bcfbe 100644 --- a/extras/shell/src/main/java/org/apache/rya/shell/SharedShellState.java +++ b/extras/shell/src/main/java/org/apache/rya/shell/SharedShellState.java @@ -26,6 +26,8 @@ import java.util.concurrent.locks.ReentrantLock; import org.apache.rya.api.client.RyaClient; import org.apache.rya.api.client.accumulo.AccumuloConnectionDetails; import org.apache.rya.api.client.mongo.MongoConnectionDetails; +import org.apache.rya.api.instance.RyaDetails; +import org.apache.rya.streams.api.RyaStreamsClient; import com.google.common.base.Optional; import com.mongodb.MongoClient; @@ -159,6 +161,34 @@ public class SharedShellState { } /** + * This method indicates a shift into a state where the shell may have to interact with the Rya Streams subsystem. + * <p/> + * Stores the {@link RyaStreamsClient} all Rya Streams commands will be executed against. + * + * @param ryaStreamsCommands - Rya Streams commands that will execute against the Rya Streams subsystem. (not null) + */ + public void connectedToRyaStreams( + final RyaStreamsClient ryaStreamsCommands) { + requireNonNull(ryaStreamsCommands); + + lock.lock(); + try { + // Verify the Rya Shell is connected to an instance. + if(shellState.getConnectionState() != ConnectionState.CONNECTED_TO_INSTANCE) { + throw new IllegalStateException("You can not set the connected Rya Streams Client before connected to a Rya Instance."); + } + + // Set the connected Rya Streams commands. + shellState = ShellState.builder( shellState ) + .setRyaStreamsCommands(ryaStreamsCommands) + .build(); + + } finally { + lock.unlock(); + } + } + + /** * This method indicates a shift into the {@link DISCONNECTED} state. * <p/> * Clears all of the values associated with a Rya Storage/Instance connection. @@ -225,6 +255,7 @@ public class SharedShellState { private final Optional<MongoConnectionDetails> mongoDetails; private final Optional<MongoClient> mongoAdminClient; private final Optional<RyaClient> connectedCommands; + private final Optional<RyaStreamsClient> ryaStreamsCommands; // Instance specific values. private final Optional<String> instanceName; @@ -236,7 +267,8 @@ public class SharedShellState { final Optional<MongoConnectionDetails> mongoDetails, final Optional<MongoClient> mongoAdminClient, final Optional<RyaClient> connectedCommands, - final Optional<String> instanceName) { + final Optional<String> instanceName, + final Optional<RyaStreamsClient> ryaStreamsCommands) { this.connectionState = requireNonNull(connectionState); this.storageType = requireNonNull(storageType); this.accumuloDetails = requireNonNull(accumuloDetails); @@ -244,6 +276,7 @@ public class SharedShellState { this.mongoAdminClient = requireNonNull(mongoAdminClient); this.connectedCommands = requireNonNull(connectedCommands); this.instanceName = requireNonNull(instanceName); + this.ryaStreamsCommands = requireNonNull(ryaStreamsCommands); } /** @@ -294,6 +327,15 @@ public class SharedShellState { } /** + * @return The {@link RyaStreamsClient} to use when a command on the shell is issued. + * The value will not be present if the Rya Shell is not connected to an instance + * whose {@link RyaDetails} indicate when Rya Streams system to use. + */ + public Optional<RyaStreamsClient> getRyaStreamsCommands() { + return ryaStreamsCommands; + } + + /** * @return The name of the Rya Instance the Rya Shell is issuing commands to. * The value will not be present if the Rya Shell is not connected to a * storage or if a target instance has not been set yet. @@ -353,6 +395,7 @@ public class SharedShellState { private MongoConnectionDetails mongoDetails; private MongoClient mongoAdminClient; private RyaClient connectedCommands; + private RyaStreamsClient ryaStreamsCommands; // Instance specific values. private String instanceName; @@ -375,6 +418,7 @@ public class SharedShellState { this.mongoDetails = shellState.getMongoDetails().orNull(); this.mongoAdminClient = shellState.getMongoAdminClient().orNull(); this.connectedCommands = shellState.getConnectedCommands().orNull(); + this.ryaStreamsCommands = shellState.getRyaStreamsCommands().orNull(); this.instanceName = shellState.getRyaInstanceName().orNull(); } @@ -435,6 +479,15 @@ public class SharedShellState { } /** + * @param ryaStreamsCommands - The {@link RyaStreamsClient} to use when a command on the shell is issued. + * @return This {@link Builder} so that method invocations may be chained. + */ + public Builder setRyaStreamsCommands(@Nullable final RyaStreamsClient ryaStreamsCommands) { + this.ryaStreamsCommands = ryaStreamsCommands; + return this; + } + + /** * @param instanceName - The name of the Rya Instance the Rya Shell is issuing commands to. * @return This {@link Builder} so that method invocations may be chained. */ @@ -454,7 +507,8 @@ public class SharedShellState { Optional.fromNullable(mongoDetails), Optional.fromNullable(mongoAdminClient), Optional.fromNullable(connectedCommands), - Optional.fromNullable(instanceName)); + Optional.fromNullable(instanceName), + Optional.fromNullable(ryaStreamsCommands)); } } } http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/59b20263/extras/shell/src/main/java/org/apache/rya/shell/util/StreamsQueryFormatter.java ---------------------------------------------------------------------- diff --git a/extras/shell/src/main/java/org/apache/rya/shell/util/StreamsQueryFormatter.java b/extras/shell/src/main/java/org/apache/rya/shell/util/StreamsQueryFormatter.java new file mode 100644 index 0000000..6c06caf --- /dev/null +++ b/extras/shell/src/main/java/org/apache/rya/shell/util/StreamsQueryFormatter.java @@ -0,0 +1,106 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.shell.util; + +import static java.util.Objects.requireNonNull; + +import java.util.Collection; +import java.util.List; + +import org.apache.rya.streams.api.entity.StreamsQuery; +import org.openrdf.query.parser.ParsedQuery; +import org.openrdf.query.parser.sparql.SPARQLParser; +import org.openrdf.queryrender.sparql.SPARQLQueryRenderer; + +import com.google.common.collect.Lists; + +import edu.umd.cs.findbugs.annotations.DefaultAnnotation; +import edu.umd.cs.findbugs.annotations.NonNull; + +/** + * Pretty formats {@link StreamsQuery} objects. + */ +@DefaultAnnotation(NonNull.class) +public final class StreamsQueryFormatter { + + /** + * Pretty formats a {@link StreamsQuery}. + * + * @param query - The query to format. (not null) + * @return The pretty formatted string. + * @throws Exception A problem was encountered while pretty formatting the SPARQL. + */ + public static String format(final StreamsQuery query) throws Exception { + requireNonNull(query); + + // Pretty format the SPARQL query. + final ParsedQuery parsedQuery = new SPARQLParser().parseQuery(query.getSparql(), null); + final String prettySparql = new SPARQLQueryRenderer().render(parsedQuery); + final String[] lines = prettySparql.split("\n"); + + // Create the formatted string. + query.getQueryId(); + query.isActive(); + + String.format(" QueryId: %s", query.getQueryId()); + + final StringBuilder builder = new StringBuilder(); + builder.append(" Query ID: ").append( query.getQueryId() ) .append("\n"); + builder.append("Is Active: ").append( query.isActive() ).append("\n"); + builder.append(" SPARQL: ").append( lines[0] ).append("\n"); + + for(int i = 1; i < lines.length; i++) { + builder.append(" ").append(lines[i]).append("\n"); + } + return builder.toString(); + } + + /** + * Pretty formats a collection {@link StreamsQuery}s. + * They will be sorted based on their Query IDs. + * + * @param queries - The queries to format. (not null) + * @return The pretty formatted string. + * @throws Exception A problem was encountered while pretty formatting the SPARQL. + */ + public static String format(final Collection<StreamsQuery> queries) throws Exception { + requireNonNull(queries); + + if(queries.size() == 1) { + return format(queries.iterator().next()); + } + + // Sort the queries based on their IDs. + final List<StreamsQuery> sorted = Lists.newArrayList(queries); + sorted.sort((query1, query2) -> { + final String id1 = query1.getQueryId().toString(); + final String id2 = query2.getQueryId().toString(); + return id1.compareTo(id2); + }); + + // Format a list of the queries. + final StringBuilder builder = new StringBuilder(); + builder.append("-----------------------------------------------\n"); + for(final StreamsQuery query : sorted) { + builder.append( format(query) ); + builder.append("-----------------------------------------------\n"); + } + return builder.toString(); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/59b20263/extras/shell/src/main/resources/META-INF/spring/spring-shell-plugin.xml ---------------------------------------------------------------------- diff --git a/extras/shell/src/main/resources/META-INF/spring/spring-shell-plugin.xml b/extras/shell/src/main/resources/META-INF/spring/spring-shell-plugin.xml index 361bf27..2473af1 100644 --- a/extras/shell/src/main/resources/META-INF/spring/spring-shell-plugin.xml +++ b/extras/shell/src/main/resources/META-INF/spring/spring-shell-plugin.xml @@ -31,7 +31,6 @@ <!-- Define the shell state bean that will be shared across all of the commands. --> <bean id="sharedShellState" class="org.apache.rya.shell.SharedShellState" /> <bean id="passwordPrompt" class="org.apache.rya.shell.util.PasswordPrompt.JLinePasswordPrompt" /> -<!-- <bean id="installPrompt" class="org.apache.rya.shell.util.InstallPrompt.JLineAccumuloInstallPrompt" /> --> <bean id="installPrompt" class="org.apache.rya.shell.util.InstallPrompt.JLineInstallPropmpt" /> <bean id="uninstallPrompt" class="org.apache.rya.shell.util.UninstallPrompt.JLineUninstallPrompt" /> <bean id="sparqlPrompt" class="org.apache.rya.shell.util.SparqlPrompt.JLineSparqlPrompt" /> @@ -41,11 +40,11 @@ <bean id="ryaConnectionCommands" class="org.apache.rya.shell.RyaConnectionCommands" /> <bean id="ryaAdminCommands" class="org.apache.rya.shell.RyaAdminCommands" /> <bean id="ryaCommands" class="org.apache.rya.shell.RyaCommands" /> + <bean id="ryaStreamsCommands" class="org.apache.rya.shell.RyaStreamsCommands" /> <!-- <bean id="springHelpCommands" class="org.springframework.shell.commands.HelpCommands" /> <bean id="springScriptCommands" class="org.springframework.shell.commands.ScriptCommands" /> <bean id="springExitCommands" class="org.springframework.shell.commands.ExitCommands" /> --> - </beans> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/59b20263/extras/shell/src/test/java/org/apache/rya/shell/RyaStreamsCommandsTest.java ---------------------------------------------------------------------- diff --git a/extras/shell/src/test/java/org/apache/rya/shell/RyaStreamsCommandsTest.java b/extras/shell/src/test/java/org/apache/rya/shell/RyaStreamsCommandsTest.java new file mode 100644 index 0000000..9f5a794 --- /dev/null +++ b/extras/shell/src/test/java/org/apache/rya/shell/RyaStreamsCommandsTest.java @@ -0,0 +1,461 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.shell; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import java.util.Set; +import java.util.UUID; + +import org.apache.rya.api.client.GetInstanceDetails; +import org.apache.rya.api.client.RyaClient; +import org.apache.rya.api.client.SetRyaStreamsConfiguration; +import org.apache.rya.api.client.accumulo.AccumuloConnectionDetails; +import org.apache.rya.api.instance.RyaDetails; +import org.apache.rya.api.instance.RyaDetails.RyaStreamsDetails; +import org.apache.rya.shell.util.SparqlPrompt; +import org.apache.rya.streams.api.RyaStreamsClient; +import org.apache.rya.streams.api.entity.StreamsQuery; +import org.apache.rya.streams.api.interactor.AddQuery; +import org.apache.rya.streams.api.interactor.DeleteQuery; +import org.apache.rya.streams.api.interactor.GetQuery; +import org.apache.rya.streams.api.interactor.ListQueries; +import org.apache.rya.streams.api.interactor.StartQuery; +import org.apache.rya.streams.api.interactor.StopQuery; +import org.junit.Test; + +import com.google.common.base.Optional; +import com.google.common.collect.Sets; + +/** + * Unit tests the methods of {@link RyaStreamsCommands}. + */ +public class RyaStreamsCommandsTest { + + @Test + public void configureRyaStreams() throws Exception { + // Mock the object that performs the configure operation. + final RyaClient mockCommands = mock(RyaClient.class); + final SetRyaStreamsConfiguration setStreams = mock(SetRyaStreamsConfiguration.class); + when(mockCommands.getSetRyaStreamsConfiguration()).thenReturn(setStreams); + + // Mock a shell state and connect it to a Rya instance. + final SharedShellState state = new SharedShellState(); + state.connectedToAccumulo(mock(AccumuloConnectionDetails.class), mockCommands); + state.connectedToInstance("unitTest"); + + // Verify that no Rya Streams Client is set to the state. + assertFalse(state.getShellState().getRyaStreamsCommands().isPresent()); + + try { + // Execute the command. + final RyaStreamsCommands commands = new RyaStreamsCommands(state, mock(SparqlPrompt.class)); + final String message = commands.configureRyaStreams("localhost", 6); + + // Verify the request was forwarded to the mocked interactor. + final RyaStreamsDetails expectedDetails = new RyaStreamsDetails("localhost", 6); + verify(setStreams).setRyaStreamsConfiguration(eq("unitTest"), eq(expectedDetails)); + + // Verify a RyaStreamsClient was created and added to the state. + assertTrue(state.getShellState().getRyaStreamsCommands().isPresent()); + + // Verify the correct message is reported. + final String expected = "The Rya Instance has been updated to use the provided Rya Streams subsystem. " + + "Rya Streams commands are now avaiable while connected to this instance."; + assertEquals(expected, message); + } finally { + state.getShellState().getRyaStreamsCommands().get().close(); + } + } + + @Test + public void printRyaStreamsDetails_noRyaDetails() throws Exception { + // Mock the object that performs the configure operation. + final RyaClient mockCommands = mock(RyaClient.class); + final GetInstanceDetails getDetails = mock(GetInstanceDetails.class); + when(mockCommands.getGetInstanceDetails()).thenReturn(getDetails); + + // When getting the instance details, ensure they are not found. + when(getDetails.getDetails(eq("unitTest"))).thenReturn(Optional.absent()); + + // Mock a shell state and connect it to a Rya instance. + final SharedShellState state = new SharedShellState(); + state.connectedToAccumulo(mock(AccumuloConnectionDetails.class), mockCommands); + state.connectedToInstance("unitTest"); + + // Execute the command. + final RyaStreamsCommands commands = new RyaStreamsCommands(state, mock(SparqlPrompt.class)); + final String message = commands.printRyaStreamsDetails(); + final String expected = "This instance does not have any Rya Details, so it is unable to be connected to the Rya Streams subsystem."; + assertEquals(expected, message); + } + + @Test + public void printRyaStreamsDetails_notConfigured() throws Exception { + // Mock the object that performs the configure operation. + final RyaClient mockCommands = mock(RyaClient.class); + final GetInstanceDetails getDetails = mock(GetInstanceDetails.class); + when(mockCommands.getGetInstanceDetails()).thenReturn(getDetails); + + // When getting the instance details, ensure they do not have RyaStreamsDetails to print. + final RyaDetails details = mock(RyaDetails.class); + when(details.getRyaStreamsDetails()).thenReturn(Optional.absent()); + when(getDetails.getDetails(eq("unitTest"))).thenReturn(Optional.of(details)); + + // Mock a shell state and connect it to a Rya instance. + final SharedShellState state = new SharedShellState(); + state.connectedToAccumulo(mock(AccumuloConnectionDetails.class), mockCommands); + state.connectedToInstance("unitTest"); + + // Execute the command. + final RyaStreamsCommands commands = new RyaStreamsCommands(state, mock(SparqlPrompt.class)); + final String message = commands.printRyaStreamsDetails(); + final String expected = "This instance of Rya has not been configured to use a Rya Streams subsystem."; + assertEquals(expected, message); + } + + @Test + public void printRyaStreamsDetails_configured() throws Exception { + // Mock the object that performs the configure operation. + final RyaClient mockCommands = mock(RyaClient.class); + final GetInstanceDetails getDetails = mock(GetInstanceDetails.class); + when(mockCommands.getGetInstanceDetails()).thenReturn(getDetails); + + // When getting the instance details, ensure they do have RyaStreamsDetails to print. + final RyaDetails details = mock(RyaDetails.class); + when(details.getRyaStreamsDetails()).thenReturn(Optional.of(new RyaStreamsDetails("localhost", 6))); + when(getDetails.getDetails(eq("unitTest"))).thenReturn(Optional.of(details)); + + // Mock a shell state and connect it to a Rya instance. + final SharedShellState state = new SharedShellState(); + state.connectedToAccumulo(mock(AccumuloConnectionDetails.class), mockCommands); + state.connectedToInstance("unitTest"); + + // Execute the command. + final RyaStreamsCommands commands = new RyaStreamsCommands(state, mock(SparqlPrompt.class)); + final String message = commands.printRyaStreamsDetails(); + final String expected = "Kafka Hostname: localhost, Kafka Port: 6"; + assertEquals(expected, message); + } + + @Test + public void addQuery_userAbortsSparqlPrompt() throws Exception { + // Mock the object that performs the rya streams operation. + final RyaStreamsClient mockClient = mock(RyaStreamsClient.class); + final AddQuery addQuery = mock(AddQuery.class); + when(mockClient.getAddQuery()).thenReturn(addQuery); + + // Mock a SPARQL prompt that a user aborts. + final SparqlPrompt prompt = mock(SparqlPrompt.class); + when(prompt.getSparql()).thenReturn(Optional.absent()); + + // Mock a shell state and connect it to a Rya instance. + final SharedShellState state = new SharedShellState(); + state.connectedToAccumulo(mock(AccumuloConnectionDetails.class), mock(RyaClient.class)); + state.connectedToInstance("unitTest"); + state.connectedToRyaStreams(mockClient); + + // Execute the command. + final RyaStreamsCommands commands = new RyaStreamsCommands(state, prompt); + final String message = commands.addQuery(false); + + // Verify a message is printed to the user. + assertEquals("", message); + } + + @Test + public void addQuery() throws Exception { + // Mock the object that performs the rya streams operation. + final RyaStreamsClient mockClient = mock(RyaStreamsClient.class); + final AddQuery addQuery = mock(AddQuery.class); + when(mockClient.getAddQuery()).thenReturn(addQuery); + + final StreamsQuery addedQuery = new StreamsQuery(UUID.randomUUID(), "query", true); + when(addQuery.addQuery(eq("query"), eq(true))).thenReturn(addedQuery); + + // Mock a SPARQL prompt that a user entered a query through. + final SparqlPrompt prompt = mock(SparqlPrompt.class); + when(prompt.getSparql()).thenReturn(Optional.of("query")); + + // Mock a shell state and connect it to a Rya instance. + final SharedShellState state = new SharedShellState(); + state.connectedToAccumulo(mock(AccumuloConnectionDetails.class), mock(RyaClient.class)); + state.connectedToInstance("unitTest"); + state.connectedToRyaStreams(mockClient); + + // Execute the command. + final RyaStreamsCommands commands = new RyaStreamsCommands(state, prompt); + final String message = commands.addQuery(false); + + // Verify the interactor was invoked with the provided input. + verify(addQuery).addQuery("query", true); + + // Verify a message is printed to the user. + final String expected = "The added query's ID is " + addedQuery.getQueryId(); + assertEquals(expected, message); + } + + @Test + public void deleteQuery() throws Exception { + // Mock the object that performs the rya streams operation. + final RyaStreamsClient mockClient = mock(RyaStreamsClient.class); + final DeleteQuery deleteQuery = mock(DeleteQuery.class); + when(mockClient.getDeleteQuery()).thenReturn(deleteQuery); + + // Mock a shell state and connect it to a Rya instance. + final SharedShellState state = new SharedShellState(); + state.connectedToAccumulo(mock(AccumuloConnectionDetails.class), mock(RyaClient.class)); + state.connectedToInstance("unitTest"); + state.connectedToRyaStreams(mockClient); + + // Execute the command. + final RyaStreamsCommands commands = new RyaStreamsCommands(state, mock(SparqlPrompt.class)); + final UUID queryId = UUID.randomUUID(); + final String message = commands.deleteQuery(queryId.toString()); + + // Verify the interactor was invoked with the provided parameters. + verify(deleteQuery).delete(eq(queryId)); + + // Verify a message is printed to the user. + final String expected = "The query has been deleted."; + assertEquals(expected, message); + } + + @Test + public void startQuery() throws Exception { + // Mock the object that performs the rya streams operation. + final RyaStreamsClient mockClient = mock(RyaStreamsClient.class); + final StartQuery startQuery = mock(StartQuery.class); + when(mockClient.getStartQuery()).thenReturn(startQuery); + final GetQuery getQuery = mock(GetQuery.class); + when(mockClient.getGetQuery()).thenReturn(getQuery); + + // Mock a shell state and connect it to a Rya instance. + final SharedShellState state = new SharedShellState(); + state.connectedToAccumulo(mock(AccumuloConnectionDetails.class), mock(RyaClient.class)); + state.connectedToInstance("unitTest"); + state.connectedToRyaStreams(mockClient); + + // Report the query as not running. + final UUID queryId = UUID.randomUUID(); + when(getQuery.getQuery(eq(queryId))).thenReturn(java.util.Optional.of(new StreamsQuery(queryId, "sparql", false))); + + // Execute the command. + final RyaStreamsCommands commands = new RyaStreamsCommands(state, mock(SparqlPrompt.class)); + final String message = commands.startQuery(queryId.toString()); + + // Verify the interactor was invoked with the provided parameters. + verify(startQuery).start(queryId); + + // Verify a message is printed to the user. + final String expected = "The query will be processed by the Rya Streams subsystem."; + assertEquals(expected, message); + } + + @Test + public void startQuery_alreadyRunning() throws Exception{ + // Mock the object that performs the rya streams operation. + final RyaStreamsClient mockClient = mock(RyaStreamsClient.class); + final StartQuery startQuery = mock(StartQuery.class); + when(mockClient.getStartQuery()).thenReturn(startQuery); + final GetQuery getQuery = mock(GetQuery.class); + when(mockClient.getGetQuery()).thenReturn(getQuery); + + // Mock a shell state and connect it to a Rya instance. + final SharedShellState state = new SharedShellState(); + state.connectedToAccumulo(mock(AccumuloConnectionDetails.class), mock(RyaClient.class)); + state.connectedToInstance("unitTest"); + state.connectedToRyaStreams(mockClient); + + // Report the query as running. + final UUID queryId = UUID.randomUUID(); + when(getQuery.getQuery(eq(queryId))).thenReturn(java.util.Optional.of(new StreamsQuery(queryId, "sparql", true))); + + // Execute the command. + final RyaStreamsCommands commands = new RyaStreamsCommands(state, mock(SparqlPrompt.class)); + final String message = commands.startQuery(queryId.toString()); + + // Verify the interactor was not invoked. + verify(startQuery, never()).start(queryId); + + // Verify a message is printed to the user. + final String expected = "That query is already running."; + assertEquals(expected, message); + } + + @Test + public void stopQuery() throws Exception { + // Mock the object that performs the rya streams operation. + final RyaStreamsClient mockClient = mock(RyaStreamsClient.class); + final StopQuery stopQuery = mock(StopQuery.class); + when(mockClient.getStopQuery()).thenReturn(stopQuery); + final GetQuery getQuery = mock(GetQuery.class); + when(mockClient.getGetQuery()).thenReturn(getQuery); + + // Mock a shell state and connect it to a Rya instance. + final SharedShellState state = new SharedShellState(); + state.connectedToAccumulo(mock(AccumuloConnectionDetails.class), mock(RyaClient.class)); + state.connectedToInstance("unitTest"); + state.connectedToRyaStreams(mockClient); + + // Report the query as running. + final UUID queryId = UUID.randomUUID(); + when(getQuery.getQuery(eq(queryId))).thenReturn(java.util.Optional.of(new StreamsQuery(queryId, "sparql", true))); + + // Execute the command. + final RyaStreamsCommands commands = new RyaStreamsCommands(state, mock(SparqlPrompt.class)); + final String message = commands.stopQuery(queryId.toString()); + + // Verify the interactor was invoked with the provided parameters. + verify(stopQuery).stop(queryId); + + // Verify a message is printed to the user. + final String expected = "The query will no longer be processed by the Rya Streams subsystem."; + assertEquals(expected, message); + } + + @Test + public void stopQuery_alreadyStopped() throws Exception { + // Mock the object that performs the rya streams operation. + final RyaStreamsClient mockClient = mock(RyaStreamsClient.class); + final StopQuery stopQuery = mock(StopQuery.class); + when(mockClient.getStopQuery()).thenReturn(stopQuery); + final GetQuery getQuery = mock(GetQuery.class); + when(mockClient.getGetQuery()).thenReturn(getQuery); + + // Mock a shell state and connect it to a Rya instance. + final SharedShellState state = new SharedShellState(); + state.connectedToAccumulo(mock(AccumuloConnectionDetails.class), mock(RyaClient.class)); + state.connectedToInstance("unitTest"); + state.connectedToRyaStreams(mockClient); + + // Report the query as not running. + final UUID queryId = UUID.randomUUID(); + when(getQuery.getQuery(eq(queryId))).thenReturn(java.util.Optional.of(new StreamsQuery(queryId, "sparql", false))); + + // Execute the command. + final RyaStreamsCommands commands = new RyaStreamsCommands(state, mock(SparqlPrompt.class)); + final String message = commands.stopQuery(queryId.toString()); + + // Verify the interactor was not invoked with the provided parameters. + verify(stopQuery, never()).stop(queryId); + + // Verify a message is printed to the user. + final String expected = "That query is already stopped."; + assertEquals(expected, message); + } + + @Test + public void listQueries() throws Exception { + // Mock the object that performs the rya streams operation. + final RyaStreamsClient mockClient = mock(RyaStreamsClient.class); + final ListQueries listQueries = mock(ListQueries.class); + when(mockClient.getListQueries()).thenReturn(listQueries); + + final Set<StreamsQuery> queries = Sets.newHashSet( + new StreamsQuery( + UUID.fromString("33333333-3333-3333-3333-333333333333"), + "SELECT * WHERE { ?person <urn:worksAt> ?business . }", + true), + new StreamsQuery( + UUID.fromString("11111111-1111-1111-1111-111111111111"), + "SELECT * WHERE { ?a ?b ?c . }", + true), + new StreamsQuery( + UUID.fromString("22222222-2222-2222-2222-222222222222"), + "SELECT * WHERE { ?d ?e ?f . }", + false)); + when(listQueries.all()).thenReturn(queries); + + // Mock a shell state and connect it to a Rya instance. + final SharedShellState state = new SharedShellState(); + state.connectedToAccumulo(mock(AccumuloConnectionDetails.class), mock(RyaClient.class)); + state.connectedToInstance("unitTest"); + state.connectedToRyaStreams(mockClient); + + // Execute the command. + final RyaStreamsCommands commands = new RyaStreamsCommands(state, mock(SparqlPrompt.class)); + final String message = commands.listQueries(); + + // Verify the correct report is returned. + final String expected = + "-----------------------------------------------\n" + + " Query ID: 11111111-1111-1111-1111-111111111111\n" + + "Is Active: true\n" + + " SPARQL: select ?a ?b ?c\n" + + " where {\n" + + " ?a ?b ?c.\n" + + " }\n" + + "-----------------------------------------------\n" + + " Query ID: 22222222-2222-2222-2222-222222222222\n" + + "Is Active: false\n" + + " SPARQL: select ?d ?e ?f\n" + + " where {\n" + + " ?d ?e ?f.\n" + + " }\n" + + "-----------------------------------------------\n" + + " Query ID: 33333333-3333-3333-3333-333333333333\n" + + "Is Active: true\n" + + " SPARQL: select ?person ?business\n" + + " where {\n" + + " ?person <urn:worksAt> ?business.\n" + + " }\n" + + "-----------------------------------------------\n"; + assertEquals(expected, message); + } + + @Test + public void printQueryDetails() throws Exception { + // Mock the object that performs the rya streams operation. + final RyaStreamsClient mockClient = mock(RyaStreamsClient.class); + final GetQuery getQuery = mock(GetQuery.class); + when(mockClient.getGetQuery()).thenReturn(getQuery); + + final UUID queryId = UUID.fromString("da55cea5-c21c-46a5-ab79-5433eef4efaa"); + final StreamsQuery query = new StreamsQuery(queryId, "SELECT * WHERE { ?a ?b ?c . }", true); + when(getQuery.getQuery(queryId)).thenReturn(java.util.Optional.of(query)); + + // Mock a shell state and connect it to a Rya instance. + final SharedShellState state = new SharedShellState(); + state.connectedToAccumulo(mock(AccumuloConnectionDetails.class), mock(RyaClient.class)); + state.connectedToInstance("unitTest"); + state.connectedToRyaStreams(mockClient); + + // Execute the command. + final RyaStreamsCommands commands = new RyaStreamsCommands(state, mock(SparqlPrompt.class)); + final String message = commands.printQueryDetails(queryId.toString()); + + // Verify the correct report is returned. + final String expected = + " Query ID: da55cea5-c21c-46a5-ab79-5433eef4efaa\n" + + "Is Active: true\n" + + " SPARQL: select ?a ?b ?c\n" + + " where {\n" + + " ?a ?b ?c.\n" + + " }\n"; + assertEquals(expected, message); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/59b20263/extras/shell/src/test/java/org/apache/rya/shell/SharedShellStateTest.java ---------------------------------------------------------------------- diff --git a/extras/shell/src/test/java/org/apache/rya/shell/SharedShellStateTest.java b/extras/shell/src/test/java/org/apache/rya/shell/SharedShellStateTest.java index b5f136c..e1ad039 100644 --- a/extras/shell/src/test/java/org/apache/rya/shell/SharedShellStateTest.java +++ b/extras/shell/src/test/java/org/apache/rya/shell/SharedShellStateTest.java @@ -25,6 +25,7 @@ import org.apache.rya.api.client.RyaClient; import org.apache.rya.api.client.accumulo.AccumuloConnectionDetails; import org.apache.rya.shell.SharedShellState.ConnectionState; import org.apache.rya.shell.SharedShellState.ShellState; +import org.apache.rya.streams.api.RyaStreamsClient; import org.junit.Test; /** @@ -163,4 +164,34 @@ public class SharedShellStateTest { .build(); assertEquals(expected, state.getShellState()); } + + @Test(expected = IllegalStateException.class) + public void connectedToRyaStreams_notConnectedToInstance() throws Exception { + // Create a shell state that is not connected to an instance. + final SharedShellState state = new SharedShellState(); + + // Connecting to a Rya Streams system fails. + state.connectedToRyaStreams( mock(RyaStreamsClient.class) ); + } + + @Test + public void connectedToRyaStreams() { + // Create a shell state. + final SharedShellState state = new SharedShellState(); + + // Connect to Accumulo. + final AccumuloConnectionDetails connectionDetails = mock(AccumuloConnectionDetails.class); + final RyaClient connectedCommands = mock(RyaClient.class); + state.connectedToAccumulo(connectionDetails, connectedCommands); + + // Connect to an Instance. + state.connectedToInstance("instance"); + + // Connect to Rya Streams for the instance. + final RyaStreamsClient streamsClient = mock(RyaStreamsClient.class); + state.connectedToRyaStreams(streamsClient); + + // Verify the state. + assertEquals(streamsClient, state.getShellState().getRyaStreamsCommands().get()); + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/59b20263/extras/shell/src/test/java/org/apache/rya/shell/util/StreamsQueryFormatterTest.java ---------------------------------------------------------------------- diff --git a/extras/shell/src/test/java/org/apache/rya/shell/util/StreamsQueryFormatterTest.java b/extras/shell/src/test/java/org/apache/rya/shell/util/StreamsQueryFormatterTest.java new file mode 100644 index 0000000..8e5d251 --- /dev/null +++ b/extras/shell/src/test/java/org/apache/rya/shell/util/StreamsQueryFormatterTest.java @@ -0,0 +1,102 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.shell.util; + +import static org.junit.Assert.assertEquals; + +import java.util.Set; +import java.util.UUID; + +import org.apache.rya.streams.api.entity.StreamsQuery; +import org.junit.Test; + +import com.google.common.collect.Sets; + +/** + * Unit tests the methods of {@link StreamsQueryFormatter}. + */ +public class StreamsQueryFormatterTest { + + @Test + public void formatQuery() throws Exception { + // Format the query. + final StreamsQuery query = new StreamsQuery( + UUID.fromString("da55cea5-c21c-46a5-ab79-5433eef4efaa"), + "SELECT * WHERE { ?a ?b ?c . }", + true); + final String formatted = StreamsQueryFormatter.format(query); + + // Ensure it has the expected format. + final String expected = + " Query ID: da55cea5-c21c-46a5-ab79-5433eef4efaa\n" + + "Is Active: true\n" + + " SPARQL: select ?a ?b ?c\n" + + " where {\n" + + " ?a ?b ?c.\n" + + " }\n"; + + assertEquals(expected, formatted); + } + + @Test + public void formatQueries() throws Exception { + // Format the queries. + final Set<StreamsQuery> queries = Sets.newHashSet( + new StreamsQuery( + UUID.fromString("33333333-3333-3333-3333-333333333333"), + "SELECT * WHERE { ?person <urn:worksAt> ?business . }", + true), + new StreamsQuery( + UUID.fromString("11111111-1111-1111-1111-111111111111"), + "SELECT * WHERE { ?a ?b ?c . }", + true), + new StreamsQuery( + UUID.fromString("22222222-2222-2222-2222-222222222222"), + "SELECT * WHERE { ?d ?e ?f . }", + false)); + + final String formatted = StreamsQueryFormatter.format(queries); + + // Ensure it has the expected format. + final String expected = + "-----------------------------------------------\n" + + " Query ID: 11111111-1111-1111-1111-111111111111\n" + + "Is Active: true\n" + + " SPARQL: select ?a ?b ?c\n" + + " where {\n" + + " ?a ?b ?c.\n" + + " }\n" + + "-----------------------------------------------\n" + + " Query ID: 22222222-2222-2222-2222-222222222222\n" + + "Is Active: false\n" + + " SPARQL: select ?d ?e ?f\n" + + " where {\n" + + " ?d ?e ?f.\n" + + " }\n" + + "-----------------------------------------------\n" + + " Query ID: 33333333-3333-3333-3333-333333333333\n" + + "Is Active: true\n" + + " SPARQL: select ?person ?business\n" + + " where {\n" + + " ?person <urn:worksAt> ?business.\n" + + " }\n" + + "-----------------------------------------------\n"; + assertEquals(expected, formatted); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/59b20263/extras/shell/src/test/resources/RyaShellTest-context.xml ---------------------------------------------------------------------- diff --git a/extras/shell/src/test/resources/RyaShellTest-context.xml b/extras/shell/src/test/resources/RyaShellTest-context.xml index f7ffe0f..54c44cf 100644 --- a/extras/shell/src/test/resources/RyaShellTest-context.xml +++ b/extras/shell/src/test/resources/RyaShellTest-context.xml @@ -60,4 +60,5 @@ <bean id="ryaConnectionCommands" class="org.apache.rya.shell.RyaConnectionCommands" /> <bean id="ryaCommands" class="org.apache.rya.shell.RyaCommands" /> <bean id="ryaAdminCommands" class="org.apache.rya.shell.RyaAdminCommands" /> + <bean id="ryaStreamsCommands" class="org.apache.rya.shell.RyaStreamsCommands" /> </beans> \ No newline at end of file
