RYA-377 Implemented integration tests for the client commands.
Project: http://git-wip-us.apache.org/repos/asf/incubator-rya/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-rya/commit/7deb0c00 Tree: http://git-wip-us.apache.org/repos/asf/incubator-rya/tree/7deb0c00 Diff: http://git-wip-us.apache.org/repos/asf/incubator-rya/diff/7deb0c00 Branch: refs/heads/master Commit: 7deb0c0026feb71fa33b1c4ea95bf46e4ff3f112 Parents: 27255cc Author: kchilton2 <[email protected]> Authored: Fri Oct 27 20:43:38 2017 -0400 Committer: caleb <[email protected]> Committed: Tue Jan 9 15:13:00 2018 -0500 ---------------------------------------------------------------------- .../interactor/defaults/DefaultListQueries.java | 7 +- .../rya/streams/client/RyaStreamsCommand.java | 29 ++- .../streams/client/command/AddQueryCommand.java | 17 +- .../client/command/DeleteQueryCommand.java | 19 +- .../client/command/ListQueriesCommand.java | 17 +- .../client/command/LoadStatementsCommand.java | 12 +- .../client/command/AddQueryCommandIT.java | 103 ++++++++++- .../client/command/DeleteQueryCommandIT.java | 153 +++++++++++++--- .../client/command/ListQueryCommandIT.java | 97 ++++++++-- .../client/command/LoadStatementsCommandIT.java | 181 +++++++++++++++++++ .../client/src/test/resources/statements.ttl | 21 +++ .../apache/rya/streams/kafka/KafkaTopics.java | 52 ++++++ 12 files changed, 619 insertions(+), 89 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/7deb0c00/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/interactor/defaults/DefaultListQueries.java ---------------------------------------------------------------------- diff --git a/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/interactor/defaults/DefaultListQueries.java b/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/interactor/defaults/DefaultListQueries.java index 82ca691..946944f 100644 --- a/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/interactor/defaults/DefaultListQueries.java +++ b/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/interactor/defaults/DefaultListQueries.java @@ -38,10 +38,9 @@ public class DefaultListQueries implements ListQueries { private final QueryRepository repository; /** - * Creates a new {@link DefaultAddQuery}. + * Creates a new {@link DefaultListQueries}. * - * @param repository - The {@link QueryRepository} to add a query to. (not - * null) + * @param repository - The {@link QueryRepository} that hosts the listed queries. (not null) */ public DefaultListQueries(final QueryRepository repository) { this.repository = requireNonNull(repository); @@ -51,4 +50,4 @@ public class DefaultListQueries implements ListQueries { public Set<StreamsQuery> all() throws RyaStreamsException { return repository.list(); } -} +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/7deb0c00/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/RyaStreamsCommand.java ---------------------------------------------------------------------- diff --git a/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/RyaStreamsCommand.java b/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/RyaStreamsCommand.java index 967b79e..5d64785 100644 --- a/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/RyaStreamsCommand.java +++ b/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/RyaStreamsCommand.java @@ -26,23 +26,22 @@ import edu.umd.cs.findbugs.annotations.DefaultAnnotation; import edu.umd.cs.findbugs.annotations.NonNull; /** - * A command that may be executed by the {@link PcjAdminClient}. + * A command that may be executed by the Rya Streams {@link CLIDriver}. */ @DefaultAnnotation(NonNull.class) public interface RyaStreamsCommand { + /** - * Command line parameters that are used by this command to configure - * itself. + * Command line parameters that are used by all commands that interact with Kafka. */ - class Parameters { - @Parameter(names = { "--topic", - "-t" }, required = true, description = "The kafka topic to load the statements into.") - public String topicName; - @Parameter(names = { "--kafkaPort", - "-p" }, required = true, description = "The port to use to connect to Kafka.") + class KafkaParameters { + @Parameter(names = {"--ryaInstance", "-r"}, required = true, description = "The name of the Rya Instance the Rya Streams is a part of.") + public String ryaInstance; + + @Parameter(names = { "--kafkaPort", "-p" }, required = true, description = "The port to use to connect to Kafka.") public String kafkaPort; - @Parameter(names = { "--kafkaHostname", - "-i" }, required = true, description = "The IP or Hostname to use to connect to Kafka.") + + @Parameter(names = { "--kafkaHostname", "-i" }, required = true, description = "The IP or Hostname to use to connect to Kafka.") public String kafkaIP; @Override @@ -51,8 +50,8 @@ public interface RyaStreamsCommand { parameters.append("Parameters"); parameters.append("\n"); - if (!Strings.isNullOrEmpty(topicName)) { - parameters.append("\tTopic: " + topicName); + if(!Strings.isNullOrEmpty(ryaInstance)) { + parameters.append("\tRya Instance Name: " + ryaInstance + "\n"); } if (!Strings.isNullOrEmpty(kafkaIP)) { @@ -82,7 +81,7 @@ public interface RyaStreamsCommand { * @return Describes what arguments may be provided to the command. */ default public String getUsage() { - final JCommander parser = new JCommander(new Parameters()); + final JCommander parser = new JCommander(new KafkaParameters()); final StringBuilder usage = new StringBuilder(); parser.usage(usage); @@ -127,4 +126,4 @@ public interface RyaStreamsCommand { super(message, cause); } } -} +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/7deb0c00/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/AddQueryCommand.java ---------------------------------------------------------------------- diff --git a/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/AddQueryCommand.java b/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/AddQueryCommand.java index dfaa6c6..8439f20 100644 --- a/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/AddQueryCommand.java +++ b/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/AddQueryCommand.java @@ -40,6 +40,7 @@ import org.apache.rya.streams.api.queries.QueryChange; import org.apache.rya.streams.api.queries.QueryChangeLog; import org.apache.rya.streams.api.queries.QueryRepository; import org.apache.rya.streams.client.RyaStreamsCommand; +import org.apache.rya.streams.kafka.KafkaTopics; import org.apache.rya.streams.kafka.queries.KafkaQueryChangeLog; import org.apache.rya.streams.kafka.serialization.queries.QueryChangeDeserializer; import org.apache.rya.streams.kafka.serialization.queries.QueryChangeSerializer; @@ -62,12 +63,10 @@ public class AddQueryCommand implements RyaStreamsCommand { private static final Logger log = LoggerFactory.getLogger(AddQueryCommand.class); /** - * Command line parameters that are used by this command to configure - * itself. + * Command line parameters that are used by this command to configure itself. */ - private class AddParameters extends RyaStreamsCommand.Parameters { - @Parameter(names = { "--query", - "-q" }, required = true, description = "The SPARQL query to add to Rya Streams.") + private class AddParameters extends RyaStreamsCommand.KafkaParameters { + @Parameter(names = { "--query", "-q" }, required = true, description = "The SPARQL query to add to Rya Streams.") private String query; @Override @@ -121,15 +120,19 @@ public class AddQueryCommand implements RyaStreamsCommand { producerProperties.setProperty(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, params.kafkaIP + ":" + params.kafkaPort); producerProperties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); producerProperties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, QueryChangeSerializer.class.getName()); + final Properties consumerProperties = new Properties(); consumerProperties.setProperty(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, params.kafkaIP + ":" + params.kafkaPort); consumerProperties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); consumerProperties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, QueryChangeDeserializer.class.getName()); + final Producer<?, QueryChange> queryProducer = new KafkaProducer<>(producerProperties); final Consumer<?, QueryChange> queryConsumer = new KafkaConsumer<>(consumerProperties); - final QueryChangeLog changeLog = new KafkaQueryChangeLog(queryProducer, queryConsumer, params.topicName); + final QueryChangeLog changeLog = new KafkaQueryChangeLog(queryProducer, queryConsumer, KafkaTopics.queryChangeLogTopic(params.ryaInstance)); final QueryRepository repo = new InMemoryQueryRepository(changeLog); + + // Execute the add query command. final AddQuery addQuery = new DefaultAddQuery(repo); try { final StreamsQuery query = addQuery.addQuery(params.query); @@ -140,4 +143,4 @@ public class AddQueryCommand implements RyaStreamsCommand { log.trace("Finished executing the Add Query Command."); } -} +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/7deb0c00/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/DeleteQueryCommand.java ---------------------------------------------------------------------- diff --git a/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/DeleteQueryCommand.java b/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/DeleteQueryCommand.java index 65a7017..b101a0f 100644 --- a/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/DeleteQueryCommand.java +++ b/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/DeleteQueryCommand.java @@ -40,6 +40,7 @@ import org.apache.rya.streams.api.queries.QueryChange; import org.apache.rya.streams.api.queries.QueryChangeLog; import org.apache.rya.streams.api.queries.QueryRepository; import org.apache.rya.streams.client.RyaStreamsCommand; +import org.apache.rya.streams.kafka.KafkaTopics; import org.apache.rya.streams.kafka.queries.KafkaQueryChangeLog; import org.apache.rya.streams.kafka.serialization.queries.QueryChangeDeserializer; import org.apache.rya.streams.kafka.serialization.queries.QueryChangeSerializer; @@ -62,10 +63,9 @@ public class DeleteQueryCommand implements RyaStreamsCommand { private static final Logger log = LoggerFactory.getLogger(DeleteQueryCommand.class); /** - * Command line parameters that are used by this command to configure - * itself. + * Command line parameters that are used by this command to configure itself. */ - private class RemoveParameters extends RyaStreamsCommand.Parameters { + private class RemoveParameters extends RyaStreamsCommand.KafkaParameters { @Parameter(names = { "--queryID", "-q" }, required = true, description = "The ID of the query to remove from Rya Streams.") private String queryId; @@ -73,7 +73,6 @@ public class DeleteQueryCommand implements RyaStreamsCommand { public String toString() { final StringBuilder parameters = new StringBuilder(); parameters.append(super.toString()); - parameters.append("\n"); if (!Strings.isNullOrEmpty(queryId)) { parameters.append("\tQueryID: " + queryId); @@ -120,23 +119,27 @@ public class DeleteQueryCommand implements RyaStreamsCommand { producerProperties.setProperty(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, params.kafkaIP + ":" + params.kafkaPort); producerProperties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); producerProperties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, QueryChangeSerializer.class.getName()); + final Properties consumerProperties = new Properties(); consumerProperties.setProperty(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, params.kafkaIP + ":" + params.kafkaPort); consumerProperties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); consumerProperties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, QueryChangeDeserializer.class.getName()); + final Producer<?, QueryChange> queryProducer = new KafkaProducer<>(producerProperties); final Consumer<?, QueryChange> queryConsumer = new KafkaConsumer<>(consumerProperties); - final QueryChangeLog changeLog = new KafkaQueryChangeLog(queryProducer, queryConsumer, params.topicName); + final QueryChangeLog changeLog = new KafkaQueryChangeLog(queryProducer, queryConsumer, KafkaTopics.queryChangeLogTopic(params.ryaInstance)); final QueryRepository repo = new InMemoryQueryRepository(changeLog); + + // Execute the delete query command. final DeleteQuery deleteQuery = new DefaultDeleteQuery(repo); try { deleteQuery.delete(UUID.fromString(params.queryId)); log.trace("Deleted query: " + params.queryId); } catch (final RyaStreamsException e) { - log.error("Unable to parse query: " + params.queryId, e); + log.error("Unable to delete query with ID: " + params.queryId, e); } - log.trace("Finished executing the Add Query Command."); + log.trace("Finished executing the Delete Query Command."); } -} +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/7deb0c00/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/ListQueriesCommand.java ---------------------------------------------------------------------- diff --git a/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/ListQueriesCommand.java b/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/ListQueriesCommand.java index ec40b50..c4e5de6 100644 --- a/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/ListQueriesCommand.java +++ b/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/ListQueriesCommand.java @@ -41,6 +41,7 @@ import org.apache.rya.streams.api.queries.QueryChange; import org.apache.rya.streams.api.queries.QueryChangeLog; import org.apache.rya.streams.api.queries.QueryRepository; import org.apache.rya.streams.client.RyaStreamsCommand; +import org.apache.rya.streams.kafka.KafkaTopics; import org.apache.rya.streams.kafka.queries.KafkaQueryChangeLog; import org.apache.rya.streams.kafka.serialization.queries.QueryChangeDeserializer; import org.apache.rya.streams.kafka.serialization.queries.QueryChangeSerializer; @@ -75,7 +76,7 @@ public class ListQueriesCommand implements RyaStreamsCommand { requireNonNull(args); // Parse the command line arguments. - final Parameters params = new Parameters(); + final KafkaParameters params = new KafkaParameters(); try { new JCommander(params, args); } catch (final ParameterException e) { @@ -88,25 +89,29 @@ public class ListQueriesCommand implements RyaStreamsCommand { producerProperties.setProperty(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, params.kafkaIP + ":" + params.kafkaPort); producerProperties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); producerProperties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, QueryChangeSerializer.class.getName()); + final Properties consumerProperties = new Properties(); consumerProperties.setProperty(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, params.kafkaIP + ":" + params.kafkaPort); consumerProperties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); consumerProperties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, QueryChangeDeserializer.class.getName()); + final Producer<?, QueryChange> queryProducer = new KafkaProducer<>(producerProperties); final Consumer<?, QueryChange> queryConsumer = new KafkaConsumer<>(consumerProperties); - final QueryChangeLog changeLog = new KafkaQueryChangeLog(queryProducer, queryConsumer, params.topicName); + final QueryChangeLog changeLog = new KafkaQueryChangeLog(queryProducer, queryConsumer, KafkaTopics.queryChangeLogTopic(params.ryaInstance)); final QueryRepository repo = new InMemoryQueryRepository(changeLog); + + // Execute the list queries command. final ListQueries listQueries = new DefaultListQueries(repo); try { final Set<StreamsQuery> queries = listQueries.all(); - logQueries(queries); + System.out.println( formatQueries(queries) ); } catch (final RyaStreamsException e) { log.error("Unable to retrieve the queries.", e); } } - private void logQueries(final Set<StreamsQuery> queries) { + private String formatQueries(final Set<StreamsQuery> queries) { final StringBuilder sb = new StringBuilder(); sb.append("\n"); sb.append("Queries in Rya Streams:\n"); @@ -119,6 +124,6 @@ public class ListQueriesCommand implements RyaStreamsCommand { sb.append(query.getSparql()); sb.append("\n"); }); - log.trace(sb.toString()); + return sb.toString(); } -} +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/7deb0c00/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/LoadStatementsCommand.java ---------------------------------------------------------------------- diff --git a/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/LoadStatementsCommand.java b/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/LoadStatementsCommand.java index 057de77..4763bd8 100644 --- a/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/LoadStatementsCommand.java +++ b/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/LoadStatementsCommand.java @@ -27,10 +27,13 @@ import java.util.Properties; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.common.serialization.StringSerializer; import org.apache.rya.api.model.VisibilityStatement; import org.apache.rya.streams.api.interactor.LoadStatements; import org.apache.rya.streams.client.RyaStreamsCommand; +import org.apache.rya.streams.kafka.KafkaTopics; import org.apache.rya.streams.kafka.interactor.KafkaLoadStatements; +import org.apache.rya.streams.kafka.serialization.VisibilityStatementSerializer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -53,9 +56,11 @@ public class LoadStatementsCommand implements RyaStreamsCommand { /** * Command line parameters that are used by this command to configure itself. */ - private static final class LoadStatementsParameters extends RyaStreamsCommand.Parameters { + private static final class LoadStatementsParameters extends RyaStreamsCommand.KafkaParameters { + @Parameter(names = {"--statementsFile", "-f"}, required = true, description = "The file of RDF statements to load into Rya Streams.") private String statementsFile; + @Parameter(names= {"--visibilities", "-v"}, required = true, description = "The visibilities to assign to the statements being loaded in.") private String visibilities; @@ -63,7 +68,6 @@ public class LoadStatementsCommand implements RyaStreamsCommand { public String toString() { final StringBuilder parameters = new StringBuilder(); parameters.append(super.toString()); - parameters.append("\n"); if (!Strings.isNullOrEmpty(statementsFile)) { parameters.append("\tStatements File: " + statementsFile); @@ -117,7 +121,7 @@ public class LoadStatementsCommand implements RyaStreamsCommand { final Properties producerProps = buildProperties(params); try (final Producer<Object, VisibilityStatement> producer = new KafkaProducer<>(producerProps)) { - final LoadStatements statements = new KafkaLoadStatements(params.topicName, producer); + final LoadStatements statements = new KafkaLoadStatements(KafkaTopics.statementsTopic(params.ryaInstance), producer); statements.load(statementsPath, params.visibilities); } catch (final Exception e) { log.error("Unable to parse statements file: " + statementsPath.toString(), e); @@ -130,6 +134,8 @@ public class LoadStatementsCommand implements RyaStreamsCommand { requireNonNull(params); final Properties props = new Properties(); props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, params.kafkaIP + ":" + params.kafkaPort); + props.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); + props.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, VisibilityStatementSerializer.class.getName()); return props; } } http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/7deb0c00/extras/rya.streams/client/src/test/java/org/apache/rya/streams/client/command/AddQueryCommandIT.java ---------------------------------------------------------------------- diff --git a/extras/rya.streams/client/src/test/java/org/apache/rya/streams/client/command/AddQueryCommandIT.java b/extras/rya.streams/client/src/test/java/org/apache/rya/streams/client/command/AddQueryCommandIT.java index 6b13b46..09e874c 100644 --- a/extras/rya.streams/client/src/test/java/org/apache/rya/streams/client/command/AddQueryCommandIT.java +++ b/extras/rya.streams/client/src/test/java/org/apache/rya/streams/client/command/AddQueryCommandIT.java @@ -21,10 +21,30 @@ package org.apache.rya.streams.client.command; import static org.junit.Assert.assertEquals; import java.util.Properties; +import java.util.Set; +import java.util.UUID; import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.serialization.StringSerializer; +import org.apache.rya.streams.api.entity.StreamsQuery; +import org.apache.rya.streams.api.queries.InMemoryQueryRepository; +import org.apache.rya.streams.api.queries.QueryChange; +import org.apache.rya.streams.api.queries.QueryChangeLog; +import org.apache.rya.streams.api.queries.QueryRepository; +import org.apache.rya.streams.kafka.KafkaTopics; +import org.apache.rya.streams.kafka.queries.KafkaQueryChangeLog; +import org.apache.rya.streams.kafka.serialization.queries.QueryChangeDeserializer; +import org.apache.rya.streams.kafka.serialization.queries.QueryChangeSerializer; import org.apache.rya.test.kafka.KafkaITBase; import org.apache.rya.test.kafka.KafkaTestInstanceRule; +import org.junit.After; import org.junit.Before; import org.junit.Rule; import org.junit.Test; @@ -33,7 +53,15 @@ import org.junit.Test; * integration Test for adding a new query through a command. */ public class AddQueryCommandIT extends KafkaITBase { - private String[] args; + + private final String ryaInstance = UUID.randomUUID().toString(); + + private String kafkaIp; + private String kafkaPort; + private QueryRepository queryRepo; + + private Producer<?, QueryChange> queryProducer = null; + private Consumer<?, QueryChange> queryConsumer = null; @Rule public KafkaTestInstanceRule rule = new KafkaTestInstanceRule(true); @@ -43,19 +71,74 @@ public class AddQueryCommandIT extends KafkaITBase { final Properties props = rule.createBootstrapServerConfig(); final String location = props.getProperty(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG); final String[] tokens = location.split(":"); - args = new String[] { - "-q", "Some sparql query", - "-t", rule.getKafkaTopicName(), - "-p", tokens[1], - "-i", tokens[0] + + kafkaIp = tokens[0]; + kafkaPort = tokens[1]; + + // Initialize the QueryRepository. + final Properties producerProperties = new Properties(); + producerProperties.setProperty(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, kafkaIp + ":" + kafkaPort); + producerProperties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); + producerProperties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, QueryChangeSerializer.class.getName()); + + final Properties consumerProperties = new Properties(); + consumerProperties.setProperty(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, kafkaIp + ":" + kafkaPort); + consumerProperties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); + consumerProperties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, QueryChangeDeserializer.class.getName()); + + queryProducer = new KafkaProducer<>(producerProperties); + queryConsumer = new KafkaConsumer<>(consumerProperties); + + final String changeLogTopic = KafkaTopics.queryChangeLogTopic("" + ryaInstance); + final QueryChangeLog changeLog = new KafkaQueryChangeLog(queryProducer, queryConsumer, changeLogTopic); + queryRepo = new InMemoryQueryRepository(changeLog); + } + + @After + public void cleanup() { + queryProducer.close(); + queryConsumer.close(); + } + + @Test + public void shortParams() throws Exception { + // Arguments that add a query to Rya Streams. + final String query = "SELECT * WHERE { ?person <urn:name> ?name }"; + final String[] args = new String[] { + "-r", "" + ryaInstance, + "-i", kafkaIp, + "-p", kafkaPort, + "-q", query }; + + // Execute the command. + final AddQueryCommand command = new AddQueryCommand(); + command.execute(args); + + // Show that the query was added to the Query Repository. + final Set<StreamsQuery> queries = queryRepo.list(); + assertEquals(1, queries.size()); + assertEquals(query, queries.iterator().next().getSparql()); } @Test - public void happyAddQueryTest() throws Exception { + public void longParams() throws Exception { + // Arguments that add a query to Rya Streams. + final String query = "SELECT * WHERE { ?person <urn:name> ?name }"; + final String[] args = new String[] { + "--ryaInstance", "" + ryaInstance, + "--kafkaHostname", kafkaIp, + "--kafkaPort", kafkaPort, + "--query", query + }; + + // Execute the command. final AddQueryCommand command = new AddQueryCommand(); command.execute(args); - // not sure what to assert here. - assertEquals(true, true); + + // Show that the query was added to the Query Repository. + final Set<StreamsQuery> queries = queryRepo.list(); + assertEquals(1, queries.size()); + assertEquals(query, queries.iterator().next().getSparql()); } -} +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/7deb0c00/extras/rya.streams/client/src/test/java/org/apache/rya/streams/client/command/DeleteQueryCommandIT.java ---------------------------------------------------------------------- diff --git a/extras/rya.streams/client/src/test/java/org/apache/rya/streams/client/command/DeleteQueryCommandIT.java b/extras/rya.streams/client/src/test/java/org/apache/rya/streams/client/command/DeleteQueryCommandIT.java index db8c200..0079371 100644 --- a/extras/rya.streams/client/src/test/java/org/apache/rya/streams/client/command/DeleteQueryCommandIT.java +++ b/extras/rya.streams/client/src/test/java/org/apache/rya/streams/client/command/DeleteQueryCommandIT.java @@ -19,25 +19,49 @@ package org.apache.rya.streams.client.command; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotEquals; -import java.util.ArrayList; -import java.util.List; import java.util.Properties; +import java.util.Set; +import java.util.UUID; import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.serialization.StringSerializer; +import org.apache.rya.streams.api.entity.StreamsQuery; +import org.apache.rya.streams.api.queries.InMemoryQueryRepository; +import org.apache.rya.streams.api.queries.QueryChange; +import org.apache.rya.streams.api.queries.QueryChangeLog; +import org.apache.rya.streams.api.queries.QueryRepository; +import org.apache.rya.streams.kafka.KafkaTopics; +import org.apache.rya.streams.kafka.queries.KafkaQueryChangeLog; +import org.apache.rya.streams.kafka.serialization.queries.QueryChangeDeserializer; +import org.apache.rya.streams.kafka.serialization.queries.QueryChangeSerializer; import org.apache.rya.test.kafka.KafkaITBase; import org.apache.rya.test.kafka.KafkaTestInstanceRule; +import org.junit.After; import org.junit.Before; import org.junit.Rule; import org.junit.Test; -import com.google.common.collect.Lists; - /** * Integration Test for deleting a query from Rya Streams through a command. */ public class DeleteQueryCommandIT extends KafkaITBase { - private List<String> args; + + private final String ryaInstance = UUID.randomUUID().toString(); + + private String kafkaIp; + private String kafkaPort; + + private Producer<?, QueryChange> queryProducer = null; + private Consumer<?, QueryChange> queryConsumer = null; @Rule public KafkaTestInstanceRule rule = new KafkaTestInstanceRule(true); @@ -48,28 +72,107 @@ public class DeleteQueryCommandIT extends KafkaITBase { final String location = props.getProperty(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG); final String[] tokens = location.split(":"); - args = Lists.newArrayList( - "-t", rule.getKafkaTopicName(), - "-p", tokens[1], - "-i", tokens[0] - ); + kafkaIp = tokens[0]; + kafkaPort = tokens[1]; + } + + /** + * This test simulates executing many commands and each of them use their own InMemoryQueryRepository. We need + * to re-create the repo outside of the command to ensure it has the most up to date values inside of it. + */ + private QueryRepository makeQueryRepository() { + final Properties producerProperties = new Properties(); + producerProperties.setProperty(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, kafkaIp + ":" + kafkaPort); + producerProperties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); + producerProperties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, QueryChangeSerializer.class.getName()); + + final Properties consumerProperties = new Properties(); + consumerProperties.setProperty(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, kafkaIp + ":" + kafkaPort); + consumerProperties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); + consumerProperties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, QueryChangeDeserializer.class.getName()); + + cleanup(); + queryProducer = new KafkaProducer<>(producerProperties); + queryConsumer = new KafkaConsumer<>(consumerProperties); + + final String changeLogTopic = KafkaTopics.queryChangeLogTopic("" + ryaInstance); + final QueryChangeLog changeLog = new KafkaQueryChangeLog(queryProducer, queryConsumer, changeLogTopic); + return new InMemoryQueryRepository(changeLog); + } + + @After + public void cleanup() { + if(queryProducer != null) { + queryProducer.close(); + } + if(queryConsumer != null) { + queryConsumer.close(); + } } @Test - public void happyDeleteQueryTest() throws Exception { - // add a query so that it can be removed. - final List<String> addArgs = new ArrayList<>(args); - addArgs.add("-q"); - addArgs.add("Some sparql query"); - final AddQueryCommand addCommand = new AddQueryCommand(); - addCommand.execute(addArgs.toArray(new String[] {})); - - final List<String> deleteArgs = new ArrayList<>(args); - addArgs.add("-q"); - addArgs.add("12345"); + public void shortParams() throws Exception { + // Add a few queries to Rya Streams. + QueryRepository repo = makeQueryRepository(); + repo.add("query1"); + final UUID query2Id = repo.add("query2").getQueryId(); + repo.add("query3"); + + // Show that all three of the queries were added. + Set<StreamsQuery> queries = repo.list(); + assertEquals(3, queries.size()); + + // Delete query 2 using the delete query command. + final String[] deleteArgs = new String[] { + "-r", "" + ryaInstance, + "-i", kafkaIp, + "-p", kafkaPort, + "-q", query2Id.toString() + }; + + final DeleteQueryCommand deleteCommand = new DeleteQueryCommand(); + deleteCommand.execute(deleteArgs); + + // Show query2 was deleted. + repo = makeQueryRepository(); + queries = repo.list(); + assertEquals(2, queries.size()); + + for(final StreamsQuery query : queries) { + assertNotEquals(query2Id, query.getQueryId()); + } + } + + @Test + public void longParams() throws Exception { + // Add a few queries to Rya Streams. + QueryRepository repo = makeQueryRepository(); + repo.add("query1"); + final UUID query2Id = repo.add("query2").getQueryId(); + repo.add("query3"); + + // Show that all three of the queries were added. + Set<StreamsQuery> queries = repo.list(); + assertEquals(3, queries.size()); + + // Delete query 2 using the delete query command. + final String[] deleteArgs = new String[] { + "--ryaInstance", "" + ryaInstance, + "--kafkaHostname", kafkaIp, + "--kafkaPort", kafkaPort, + "--queryID", query2Id.toString() + }; + final DeleteQueryCommand deleteCommand = new DeleteQueryCommand(); - deleteCommand.execute(deleteArgs.toArray(new String[] {})); - // not sure what to assert here. - assertEquals(true, true); + deleteCommand.execute(deleteArgs); + + // Show query2 was deleted. + repo = makeQueryRepository(); + queries = repo.list(); + assertEquals(2, queries.size()); + + for(final StreamsQuery query : queries) { + assertNotEquals(query2Id, query.getQueryId()); + } } -} +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/7deb0c00/extras/rya.streams/client/src/test/java/org/apache/rya/streams/client/command/ListQueryCommandIT.java ---------------------------------------------------------------------- diff --git a/extras/rya.streams/client/src/test/java/org/apache/rya/streams/client/command/ListQueryCommandIT.java b/extras/rya.streams/client/src/test/java/org/apache/rya/streams/client/command/ListQueryCommandIT.java index be90c5f..eb759ba 100644 --- a/extras/rya.streams/client/src/test/java/org/apache/rya/streams/client/command/ListQueryCommandIT.java +++ b/extras/rya.streams/client/src/test/java/org/apache/rya/streams/client/command/ListQueryCommandIT.java @@ -18,13 +18,29 @@ */ package org.apache.rya.streams.client.command; -import static org.junit.Assert.assertEquals; - import java.util.Properties; +import java.util.UUID; import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.serialization.StringSerializer; +import org.apache.rya.streams.api.queries.InMemoryQueryRepository; +import org.apache.rya.streams.api.queries.QueryChange; +import org.apache.rya.streams.api.queries.QueryChangeLog; +import org.apache.rya.streams.api.queries.QueryRepository; +import org.apache.rya.streams.kafka.KafkaTopics; +import org.apache.rya.streams.kafka.queries.KafkaQueryChangeLog; +import org.apache.rya.streams.kafka.serialization.queries.QueryChangeDeserializer; +import org.apache.rya.streams.kafka.serialization.queries.QueryChangeSerializer; import org.apache.rya.test.kafka.KafkaITBase; import org.apache.rya.test.kafka.KafkaTestInstanceRule; +import org.junit.After; import org.junit.Before; import org.junit.Rule; import org.junit.Test; @@ -33,7 +49,15 @@ import org.junit.Test; * integration Test for listing queries through a command. */ public class ListQueryCommandIT extends KafkaITBase { - private String[] args; + + private final String ryaInstance = UUID.randomUUID().toString(); + + private String kafkaIp; + private String kafkaPort; + private QueryRepository queryRepo; + + private Producer<?, QueryChange> queryProducer = null; + private Consumer<?, QueryChange> queryConsumer = null; @Rule public KafkaTestInstanceRule rule = new KafkaTestInstanceRule(true); @@ -43,18 +67,69 @@ public class ListQueryCommandIT extends KafkaITBase { final Properties props = rule.createBootstrapServerConfig(); final String location = props.getProperty(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG); final String[] tokens = location.split(":"); - args = new String[] { - "-t", rule.getKafkaTopicName(), - "-p", tokens[1], - "-i", tokens[0] + + kafkaIp = tokens[0]; + kafkaPort = tokens[1]; + + // Initialize the QueryRepository. + final Properties producerProperties = new Properties(); + producerProperties.setProperty(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, kafkaIp + ":" + kafkaPort); + producerProperties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); + producerProperties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, QueryChangeSerializer.class.getName()); + + final Properties consumerProperties = new Properties(); + consumerProperties.setProperty(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, kafkaIp + ":" + kafkaPort); + consumerProperties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); + consumerProperties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, QueryChangeDeserializer.class.getName()); + + queryProducer = new KafkaProducer<>(producerProperties); + queryConsumer = new KafkaConsumer<>(consumerProperties); + + final String changeLogTopic = KafkaTopics.queryChangeLogTopic("" + ryaInstance); + final QueryChangeLog changeLog = new KafkaQueryChangeLog(queryProducer, queryConsumer, changeLogTopic); + queryRepo = new InMemoryQueryRepository(changeLog); + } + + @After + public void cleanup() { + queryProducer.close(); + queryConsumer.close(); + } + + + @Test + public void shortParams() throws Exception { + // Add a few queries to Rya Streams. + queryRepo.add("query1"); + queryRepo.add("query2"); + queryRepo.add("query3"); + + // Execute the List Queries command. + final String[] args = new String[] { + "-r", "" + ryaInstance, + "-i", kafkaIp, + "-p", kafkaPort }; + + final ListQueriesCommand command = new ListQueriesCommand(); + command.execute(args); } @Test - public void happyListQueriesTest() throws Exception { + public void longParams() throws Exception { + // Add a few queries to Rya Streams. + queryRepo.add("query1"); + queryRepo.add("query2"); + queryRepo.add("query3"); + + // Execute the List Queries command. + final String[] args = new String[] { + "--ryaInstance", "" + ryaInstance, + "--kafkaHostname", kafkaIp, + "--kafkaPort", kafkaPort + }; + final ListQueriesCommand command = new ListQueriesCommand(); command.execute(args); - // not sure what to assert here. - assertEquals(true, true); } -} +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/7deb0c00/extras/rya.streams/client/src/test/java/org/apache/rya/streams/client/command/LoadStatementsCommandIT.java ---------------------------------------------------------------------- diff --git a/extras/rya.streams/client/src/test/java/org/apache/rya/streams/client/command/LoadStatementsCommandIT.java b/extras/rya.streams/client/src/test/java/org/apache/rya/streams/client/command/LoadStatementsCommandIT.java new file mode 100644 index 0000000..95a4876 --- /dev/null +++ b/extras/rya.streams/client/src/test/java/org/apache/rya/streams/client/command/LoadStatementsCommandIT.java @@ -0,0 +1,181 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.streams.client.command; + +import static org.junit.Assert.assertEquals; + +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Iterator; +import java.util.List; +import java.util.Properties; +import java.util.UUID; + +import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.rya.api.model.VisibilityStatement; +import org.apache.rya.streams.kafka.KafkaTopics; +import org.apache.rya.streams.kafka.serialization.VisibilityStatementDeserializer; +import org.apache.rya.test.kafka.KafkaTestInstanceRule; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.openrdf.model.ValueFactory; +import org.openrdf.model.impl.ValueFactoryImpl; + +/** + * Integration tests the methods of {@link LoadStatementsCommand}. + */ +public class LoadStatementsCommandIT { + + private static final Path TURTLE_FILE = Paths.get("src/test/resources/statements.ttl"); + + private final String ryaInstance = UUID.randomUUID().toString(); + + private String kafkaIp; + private String kafkaPort; + + @Rule + public KafkaTestInstanceRule rule = new KafkaTestInstanceRule(true); + + @Before + public void setup() { + final Properties props = rule.createBootstrapServerConfig(); + final String location = props.getProperty(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG); + final String[] tokens = location.split(":"); + + kafkaIp = tokens[0]; + kafkaPort = tokens[1]; + } + + @Test + public void shortParams() throws Exception { + // Load a file of statements into Kafka. + final String visibilities = "a|b|c"; + final String[] args = new String[] { + "-r", "" + ryaInstance, + "-i", kafkaIp, + "-p", kafkaPort, + "-f", TURTLE_FILE.toString(), + "-v", visibilities + }; + + new LoadStatementsCommand().execute(args); + + // Show that the statements were loaded into the topic. + // Read a VisibilityBindingSet from the test topic. + final List<VisibilityStatement> read = new ArrayList<>(); + + final Properties consumerProps = new Properties(); + consumerProps.setProperty(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, kafkaIp + ":" + kafkaPort); + consumerProps.setProperty(ConsumerConfig.GROUP_ID_CONFIG, UUID.randomUUID().toString()); + consumerProps.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + consumerProps.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); + consumerProps.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, VisibilityStatementDeserializer.class.getName()); + + try(final Consumer<String, VisibilityStatement> consumer = new KafkaConsumer<>(consumerProps)) { + final String topic = KafkaTopics.statementsTopic(ryaInstance); + consumer.subscribe(Arrays.asList(topic)); + final ConsumerRecords<String, VisibilityStatement> records = consumer.poll(3000); + + assertEquals(3, records.count()); + final Iterator<ConsumerRecord<String, VisibilityStatement>> iter = records.iterator(); + while(iter.hasNext()) { + final VisibilityStatement visiSet = iter.next().value(); + read.add(visiSet); + } + } + + final ValueFactory VF = ValueFactoryImpl.getInstance(); + final List<VisibilityStatement> expected = new ArrayList<>(); + expected.add(new VisibilityStatement( + VF.createStatement(VF.createURI("http://example#alice"), VF.createURI("http://example#talksTo"), VF.createURI("http://example#bob")), + visibilities)); + expected.add(new VisibilityStatement( + VF.createStatement(VF.createURI("http://example#bob"), VF.createURI("http://example#talksTo"), VF.createURI("http://example#charlie")), + visibilities)); + expected.add(new VisibilityStatement( + VF.createStatement(VF.createURI("http://example#charlie"), VF.createURI("http://example#likes"), VF.createURI("http://example#icecream")), + visibilities)); + + // Show the written statements matches the read ones. + assertEquals(expected, read); + } + + @Test + public void longParams() throws Exception { + // Load a file of statements into Kafka. + final String visibilities = "a|b|c"; + final String[] args = new String[] { + "--ryaInstance", "" + ryaInstance, + "--kafkaHostname", kafkaIp, + "--kafkaPort", kafkaPort, + "--statementsFile", TURTLE_FILE.toString(), + "--visibilities", visibilities + }; + + new LoadStatementsCommand().execute(args); + + // Show that the statements were loaded into the topic. + // Read a VisibilityBindingSet from the test topic. + final List<VisibilityStatement> read = new ArrayList<>(); + + final Properties consumerProps = new Properties(); + consumerProps.setProperty(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, kafkaIp + ":" + kafkaPort); + consumerProps.setProperty(ConsumerConfig.GROUP_ID_CONFIG, UUID.randomUUID().toString()); + consumerProps.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + consumerProps.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); + consumerProps.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, VisibilityStatementDeserializer.class.getName()); + + try(final Consumer<String, VisibilityStatement> consumer = new KafkaConsumer<>(consumerProps)) { + final String topic = KafkaTopics.statementsTopic(ryaInstance); + consumer.subscribe(Arrays.asList(topic)); + final ConsumerRecords<String, VisibilityStatement> records = consumer.poll(3000); + + assertEquals(3, records.count()); + final Iterator<ConsumerRecord<String, VisibilityStatement>> iter = records.iterator(); + while(iter.hasNext()) { + final VisibilityStatement visiSet = iter.next().value(); + read.add(visiSet); + } + } + + final ValueFactory VF = ValueFactoryImpl.getInstance(); + final List<VisibilityStatement> expected = new ArrayList<>(); + expected.add(new VisibilityStatement( + VF.createStatement(VF.createURI("http://example#alice"), VF.createURI("http://example#talksTo"), VF.createURI("http://example#bob")), + visibilities)); + expected.add(new VisibilityStatement( + VF.createStatement(VF.createURI("http://example#bob"), VF.createURI("http://example#talksTo"), VF.createURI("http://example#charlie")), + visibilities)); + expected.add(new VisibilityStatement( + VF.createStatement(VF.createURI("http://example#charlie"), VF.createURI("http://example#likes"), VF.createURI("http://example#icecream")), + visibilities)); + + // Show the written statements matches the read ones. + assertEquals(expected, read); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/7deb0c00/extras/rya.streams/client/src/test/resources/statements.ttl ---------------------------------------------------------------------- diff --git a/extras/rya.streams/client/src/test/resources/statements.ttl b/extras/rya.streams/client/src/test/resources/statements.ttl new file mode 100644 index 0000000..c19e22d --- /dev/null +++ b/extras/rya.streams/client/src/test/resources/statements.ttl @@ -0,0 +1,21 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +@prefix example: <http://example#> . + +example:alice example:talksTo example:bob . +example:bob example:talksTo example:charlie . +example:charlie example:likes example:icecream . http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/7deb0c00/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/KafkaTopics.java ---------------------------------------------------------------------- diff --git a/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/KafkaTopics.java b/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/KafkaTopics.java new file mode 100644 index 0000000..dfc4c9d --- /dev/null +++ b/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/KafkaTopics.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.streams.kafka; + +import org.apache.rya.streams.api.queries.QueryChangeLog; + +import edu.umd.cs.findbugs.annotations.DefaultAnnotation; +import edu.umd.cs.findbugs.annotations.NonNull; + +/** + * Creates the Kafka topic names that are used for Rya Streams systems. + */ +@DefaultAnnotation(NonNull.class) +public class KafkaTopics { + + /** + * Creates the Kafka topic that will be used for a specific instance of Rya's {@link QueryChangeLog}. + * + * @param ryaInstance - The Rya instance the change log is for. (not null) + * @return The name of the Kafka topic. + */ + public static String queryChangeLogTopic(final String ryaInstance) { + return ryaInstance + "-QueryChangeLog"; + } + + /** + * Creates the Kafka topic that will be used to load statements into the Rya Streams system for a specific + * instance of Rya. + * + * @param ryaInstance - The Rya instance the statements are for. (not null) + * @return The name of the Kafka topic. + */ + public static String statementsTopic(final String ryaInstance) { + return ryaInstance + "-Statements"; + } +} \ No newline at end of file
