RYA-377 Implement the stream query command in the client and fix a bunch of client bugs.
Project: http://git-wip-us.apache.org/repos/asf/incubator-rya/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-rya/commit/f3ac7df1 Tree: http://git-wip-us.apache.org/repos/asf/incubator-rya/tree/f3ac7df1 Diff: http://git-wip-us.apache.org/repos/asf/incubator-rya/diff/f3ac7df1 Branch: refs/heads/master Commit: f3ac7df1e9e707eb2e28e1c1c70212b89b716163 Parents: fc9775e Author: kchilton2 <kevin.e.chil...@gmail.com> Authored: Thu Nov 2 12:10:31 2017 -0400 Committer: caleb <caleb.me...@parsons.com> Committed: Tue Jan 9 15:13:00 2018 -0500 ---------------------------------------------------------------------- .../api/queries/InMemoryQueryChangeLog.java | 5 + .../api/queries/InMemoryQueryRepository.java | 10 ++ .../rya/streams/api/queries/QueryChangeLog.java | 2 +- .../streams/api/queries/QueryRepository.java | 2 +- .../apache/rya/streams/client/CLIDriver.java | 40 ++++-- .../rya/streams/client/RyaStreamsCommand.java | 8 ++ .../streams/client/command/AddQueryCommand.java | 77 ++++------ .../client/command/DeleteQueryCommand.java | 72 ++++------ .../client/command/ListQueriesCommand.java | 70 ++++----- .../client/command/LoadStatementsCommand.java | 25 ++-- .../client/command/StreamResultsCommand.java | 141 +++++++++++++++++++ .../client/src/main/resources/log4j.properties | 27 ++++ .../client/command/AddQueryCommandIT.java | 3 +- .../client/command/DeleteQueryCommandIT.java | 3 +- .../client/command/ListQueryCommandIT.java | 3 +- .../kafka/queries/KafkaQueryChangeLog.java | 30 +++- .../queries/KafkaQueryChangeLogFactory.java | 74 ++++++++++ 17 files changed, 427 insertions(+), 165 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/f3ac7df1/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/queries/InMemoryQueryChangeLog.java ---------------------------------------------------------------------- diff --git a/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/queries/InMemoryQueryChangeLog.java b/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/queries/InMemoryQueryChangeLog.java index 71af850..f0f628e 100644 --- a/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/queries/InMemoryQueryChangeLog.java +++ b/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/queries/InMemoryQueryChangeLog.java @@ -71,6 +71,11 @@ public class InMemoryQueryChangeLog implements QueryChangeLog { } } + @Override + public void close() throws Exception { + // Nothing to do here. + } + /** * A {@link CloseableIteration} that iterates over a list. * http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/f3ac7df1/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/queries/InMemoryQueryRepository.java ---------------------------------------------------------------------- diff --git a/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/queries/InMemoryQueryRepository.java b/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/queries/InMemoryQueryRepository.java index 652b16f..c1048fc 100644 --- a/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/queries/InMemoryQueryRepository.java +++ b/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/queries/InMemoryQueryRepository.java @@ -131,6 +131,16 @@ public class InMemoryQueryRepository implements QueryRepository { } } + @Override + public void close() throws Exception { + lock.lock(); + try { + changeLog.close(); + } finally { + lock.unlock(); + } + } + /** * A {@link Map} from query id to the {@link StreamsQuery} that is represented by that id based on what * is already in a {@link QueryChangeLog}. http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/f3ac7df1/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/queries/QueryChangeLog.java ---------------------------------------------------------------------- diff --git a/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/queries/QueryChangeLog.java b/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/queries/QueryChangeLog.java index 824eebc..5765366 100644 --- a/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/queries/QueryChangeLog.java +++ b/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/queries/QueryChangeLog.java @@ -26,7 +26,7 @@ import info.aduna.iteration.CloseableIteration; * An ordered log of all of the changes that have been applied to the SPARQL Queries that are managed by Rya Streams. */ @DefaultAnnotation(NonNull.class) -public interface QueryChangeLog { +public interface QueryChangeLog extends AutoCloseable { /** * Write a new {@link QueryChange} to the end of the change log. http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/f3ac7df1/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/queries/QueryRepository.java ---------------------------------------------------------------------- diff --git a/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/queries/QueryRepository.java b/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/queries/QueryRepository.java index 54f98fc..850b2bc 100644 --- a/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/queries/QueryRepository.java +++ b/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/queries/QueryRepository.java @@ -31,7 +31,7 @@ import edu.umd.cs.findbugs.annotations.NonNull; * Repository for adding, deleting, and listing active queries in Rya Streams. */ @DefaultAnnotation(NonNull.class) -public interface QueryRepository { +public interface QueryRepository extends AutoCloseable { /** * Adds a new query to Rya Streams. * http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/f3ac7df1/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/CLIDriver.java ---------------------------------------------------------------------- diff --git a/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/CLIDriver.java b/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/CLIDriver.java index 93df2ae..5c0816f 100644 --- a/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/CLIDriver.java +++ b/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/CLIDriver.java @@ -26,9 +26,11 @@ import java.util.Set; import org.apache.rya.streams.client.RyaStreamsCommand.ArgumentsException; import org.apache.rya.streams.client.RyaStreamsCommand.ExecutionException; +import org.apache.rya.streams.client.command.AddQueryCommand; +import org.apache.rya.streams.client.command.DeleteQueryCommand; +import org.apache.rya.streams.client.command.ListQueriesCommand; import org.apache.rya.streams.client.command.LoadStatementsCommand; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import org.apache.rya.streams.client.command.StreamResultsCommand; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; @@ -37,23 +39,31 @@ import edu.umd.cs.findbugs.annotations.DefaultAnnotation; import edu.umd.cs.findbugs.annotations.NonNull; /** - * CLI tool for interacting with rya streams. - * <p> + * CLI tool for interacting with Rya Streams. + * </p> * This tool can be used to: * <ul> - * <li>Load a file of statements into rya streams</li> + * <li>Add a Query to Rya Streams</li> + * <li>Delete a Query from Rya Streams</li> + * <li>List the Queries that are being managed by Rya Streams</li> + * <li>Load a file of RDF Statements into Rya Streams</li> + * <li>Stream the results of a Query to the console</li> * </ul> */ @DefaultAnnotation(NonNull.class) public class CLIDriver { - private static final Logger LOG = LoggerFactory.getLogger(CLIDriver.class); + /** * Maps from command strings to the object that performs the command. */ private static final ImmutableMap<String, RyaStreamsCommand> COMMANDS; static { final Set<Class<? extends RyaStreamsCommand>> commandClasses = new HashSet<>(); + commandClasses.add(AddQueryCommand.class); + commandClasses.add(DeleteQueryCommand.class); + commandClasses.add(ListQueriesCommand.class); commandClasses.add(LoadStatementsCommand.class); + commandClasses.add(StreamResultsCommand.class); final ImmutableMap.Builder<String, RyaStreamsCommand> builder = ImmutableMap.builder(); for(final Class<? extends RyaStreamsCommand> commandClass : commandClasses) { try { @@ -70,10 +80,7 @@ public class CLIDriver { private static final String USAGE = makeUsage(COMMANDS); public static void main(final String[] args) { - LOG.trace("Starting up the Rya Streams Client."); - - // If no command provided or the command isn't recognized, then print - // the usage. + // If no command provided or the command isn't recognized, then print the usage. if (args.length == 0 || !COMMANDS.containsKey(args[0])) { System.out.println(USAGE); System.exit(1); @@ -84,14 +91,19 @@ public class CLIDriver { final String[] commandArgs = Arrays.copyOfRange(args, 1, args.length); final RyaStreamsCommand streamsCommand = COMMANDS.get(command); + // Print usage if the arguments are invalid for the command. + if(!streamsCommand.validArguments(commandArgs)) { + System.out.println(streamsCommand.getUsage()); + System.exit(1); + } + // Execute the command. try { streamsCommand.execute(commandArgs); } catch (ArgumentsException | ExecutionException e) { - LOG.error("The command: " + command + " failed to execute properly.", e); + System.err.println("The command: " + command + " failed to execute properly."); + e.printStackTrace(); System.exit(2); - } finally { - LOG.trace("Shutting down the Rya Streams Client."); } } @@ -119,4 +131,4 @@ public class CLIDriver { return usage.toString(); } -} +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/f3ac7df1/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/RyaStreamsCommand.java ---------------------------------------------------------------------- diff --git a/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/RyaStreamsCommand.java b/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/RyaStreamsCommand.java index 5d64785..5b05d0a 100644 --- a/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/RyaStreamsCommand.java +++ b/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/RyaStreamsCommand.java @@ -89,6 +89,14 @@ public interface RyaStreamsCommand { } /** + * Validates a set of arguments that may be passed into the command. + * + * @param args - The arguments that will be validated. (not null) + * @return {@code true} if the arguments are valid, otherwise {@code false}. + */ + public boolean validArguments(String[] args); + + /** * Execute the command using the command line arguments. * * @param args - Command line arguments that configure how the command will execute. (not null) http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/f3ac7df1/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/AddQueryCommand.java ---------------------------------------------------------------------- diff --git a/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/AddQueryCommand.java b/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/AddQueryCommand.java index 8439f20..c72e6a2 100644 --- a/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/AddQueryCommand.java +++ b/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/AddQueryCommand.java @@ -20,32 +20,16 @@ package org.apache.rya.streams.client.command; import static java.util.Objects.requireNonNull; -import java.util.Properties; - -import org.apache.kafka.clients.CommonClientConfigs; -import org.apache.kafka.clients.consumer.Consumer; -import org.apache.kafka.clients.consumer.ConsumerConfig; -import org.apache.kafka.clients.consumer.KafkaConsumer; -import org.apache.kafka.clients.producer.KafkaProducer; -import org.apache.kafka.clients.producer.Producer; -import org.apache.kafka.clients.producer.ProducerConfig; -import org.apache.kafka.common.serialization.StringDeserializer; -import org.apache.kafka.common.serialization.StringSerializer; import org.apache.rya.streams.api.entity.StreamsQuery; import org.apache.rya.streams.api.exception.RyaStreamsException; import org.apache.rya.streams.api.interactor.AddQuery; import org.apache.rya.streams.api.interactor.defaults.DefaultAddQuery; import org.apache.rya.streams.api.queries.InMemoryQueryRepository; -import org.apache.rya.streams.api.queries.QueryChange; import org.apache.rya.streams.api.queries.QueryChangeLog; import org.apache.rya.streams.api.queries.QueryRepository; import org.apache.rya.streams.client.RyaStreamsCommand; import org.apache.rya.streams.kafka.KafkaTopics; -import org.apache.rya.streams.kafka.queries.KafkaQueryChangeLog; -import org.apache.rya.streams.kafka.serialization.queries.QueryChangeDeserializer; -import org.apache.rya.streams.kafka.serialization.queries.QueryChangeSerializer; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import org.apache.rya.streams.kafka.queries.KafkaQueryChangeLogFactory; import com.beust.jcommander.JCommander; import com.beust.jcommander.Parameter; @@ -60,7 +44,6 @@ import edu.umd.cs.findbugs.annotations.NonNull; */ @DefaultAnnotation(NonNull.class) public class AddQueryCommand implements RyaStreamsCommand { - private static final Logger log = LoggerFactory.getLogger(AddQueryCommand.class); /** * Command line parameters that are used by this command to configure itself. @@ -73,11 +56,9 @@ public class AddQueryCommand implements RyaStreamsCommand { public String toString() { final StringBuilder parameters = new StringBuilder(); parameters.append(super.toString()); - parameters.append("\n"); if (!Strings.isNullOrEmpty(query)) { - parameters.append("\tQuery: " + query); - parameters.append("\n"); + parameters.append("\tQuery: " + query + "\n"); } return parameters.toString(); } @@ -103,6 +84,17 @@ public class AddQueryCommand implements RyaStreamsCommand { } @Override + public boolean validArguments(final String[] args) { + boolean valid = true; + try { + new JCommander(new AddParameters(), args); + } catch(final ParameterException e) { + valid = false; + } + return valid; + } + + @Override public void execute(final String[] args) throws ArgumentsException, ExecutionException { requireNonNull(args); @@ -113,34 +105,27 @@ public class AddQueryCommand implements RyaStreamsCommand { } catch(final ParameterException e) { throw new ArgumentsException("Could not add a new query because of invalid command line parameters.", e); } - log.trace("Executing the Add Query Command\n" + params.toString()); - // Create properties for interacting with Kafka. - final Properties producerProperties = new Properties(); - producerProperties.setProperty(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, params.kafkaIP + ":" + params.kafkaPort); - producerProperties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); - producerProperties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, QueryChangeSerializer.class.getName()); - - final Properties consumerProperties = new Properties(); - consumerProperties.setProperty(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, params.kafkaIP + ":" + params.kafkaPort); - consumerProperties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); - consumerProperties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, QueryChangeDeserializer.class.getName()); - - final Producer<?, QueryChange> queryProducer = new KafkaProducer<>(producerProperties); - final Consumer<?, QueryChange> queryConsumer = new KafkaConsumer<>(consumerProperties); - - final QueryChangeLog changeLog = new KafkaQueryChangeLog(queryProducer, queryConsumer, KafkaTopics.queryChangeLogTopic(params.ryaInstance)); - final QueryRepository repo = new InMemoryQueryRepository(changeLog); + // Create the Kafka backed QueryChangeLog. + final String bootstrapServers = params.kafkaIP + ":" + params.kafkaPort; + final String topic = KafkaTopics.queryChangeLogTopic(params.ryaInstance); + final QueryChangeLog queryChangeLog = KafkaQueryChangeLogFactory.make(bootstrapServers, topic); // Execute the add query command. - final AddQuery addQuery = new DefaultAddQuery(repo); - try { - final StreamsQuery query = addQuery.addQuery(params.query); - log.trace("Added query: " + query.getSparql()); - } catch (final RyaStreamsException e) { - log.error("Unable to parse query: " + params.query, e); + try(QueryRepository queryRepo = new InMemoryQueryRepository(queryChangeLog)) { + final AddQuery addQuery = new DefaultAddQuery(queryRepo); + try { + final StreamsQuery query = addQuery.addQuery(params.query); + System.out.println("Added query: " + query.getSparql()); + } catch (final RyaStreamsException e) { + System.err.println("Unable to parse query: " + params.query); + e.printStackTrace(); + System.exit(1); + } + } catch (final Exception e) { + System.err.println("Problem encountered while closing the QueryRepository."); + e.printStackTrace(); + System.exit(1); } - - log.trace("Finished executing the Add Query Command."); } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/f3ac7df1/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/DeleteQueryCommand.java ---------------------------------------------------------------------- diff --git a/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/DeleteQueryCommand.java b/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/DeleteQueryCommand.java index b101a0f..2aeb90c 100644 --- a/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/DeleteQueryCommand.java +++ b/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/DeleteQueryCommand.java @@ -20,32 +20,17 @@ package org.apache.rya.streams.client.command; import static java.util.Objects.requireNonNull; -import java.util.Properties; import java.util.UUID; -import org.apache.kafka.clients.CommonClientConfigs; -import org.apache.kafka.clients.consumer.Consumer; -import org.apache.kafka.clients.consumer.ConsumerConfig; -import org.apache.kafka.clients.consumer.KafkaConsumer; -import org.apache.kafka.clients.producer.KafkaProducer; -import org.apache.kafka.clients.producer.Producer; -import org.apache.kafka.clients.producer.ProducerConfig; -import org.apache.kafka.common.serialization.StringDeserializer; -import org.apache.kafka.common.serialization.StringSerializer; import org.apache.rya.streams.api.exception.RyaStreamsException; import org.apache.rya.streams.api.interactor.DeleteQuery; import org.apache.rya.streams.api.interactor.defaults.DefaultDeleteQuery; import org.apache.rya.streams.api.queries.InMemoryQueryRepository; -import org.apache.rya.streams.api.queries.QueryChange; import org.apache.rya.streams.api.queries.QueryChangeLog; import org.apache.rya.streams.api.queries.QueryRepository; import org.apache.rya.streams.client.RyaStreamsCommand; import org.apache.rya.streams.kafka.KafkaTopics; -import org.apache.rya.streams.kafka.queries.KafkaQueryChangeLog; -import org.apache.rya.streams.kafka.serialization.queries.QueryChangeDeserializer; -import org.apache.rya.streams.kafka.serialization.queries.QueryChangeSerializer; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import org.apache.rya.streams.kafka.queries.KafkaQueryChangeLogFactory; import com.beust.jcommander.JCommander; import com.beust.jcommander.Parameter; @@ -60,7 +45,6 @@ import edu.umd.cs.findbugs.annotations.NonNull; */ @DefaultAnnotation(NonNull.class) public class DeleteQueryCommand implements RyaStreamsCommand { - private static final Logger log = LoggerFactory.getLogger(DeleteQueryCommand.class); /** * Command line parameters that are used by this command to configure itself. @@ -102,6 +86,17 @@ public class DeleteQueryCommand implements RyaStreamsCommand { } @Override + public boolean validArguments(final String[] args) { + boolean valid = true; + try { + new JCommander(new RemoveParameters(), args); + } catch(final ParameterException e) { + valid = false; + } + return valid; + } + + @Override public void execute(final String[] args) throws ArgumentsException, ExecutionException { requireNonNull(args); @@ -112,34 +107,27 @@ public class DeleteQueryCommand implements RyaStreamsCommand { } catch(final ParameterException e) { throw new ArgumentsException("Could not add a new query because of invalid command line parameters.", e); } - log.trace("Executing the Add Query Command\n" + params.toString()); - // Create properties for interacting with Kafka. - final Properties producerProperties = new Properties(); - producerProperties.setProperty(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, params.kafkaIP + ":" + params.kafkaPort); - producerProperties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); - producerProperties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, QueryChangeSerializer.class.getName()); - - final Properties consumerProperties = new Properties(); - consumerProperties.setProperty(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, params.kafkaIP + ":" + params.kafkaPort); - consumerProperties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); - consumerProperties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, QueryChangeDeserializer.class.getName()); - - final Producer<?, QueryChange> queryProducer = new KafkaProducer<>(producerProperties); - final Consumer<?, QueryChange> queryConsumer = new KafkaConsumer<>(consumerProperties); - - final QueryChangeLog changeLog = new KafkaQueryChangeLog(queryProducer, queryConsumer, KafkaTopics.queryChangeLogTopic(params.ryaInstance)); - final QueryRepository repo = new InMemoryQueryRepository(changeLog); + // Create the Kafka backed QueryChangeLog. + final String bootstrapServers = params.kafkaIP + ":" + params.kafkaPort; + final String topic = KafkaTopics.queryChangeLogTopic(params.ryaInstance); + final QueryChangeLog queryChangeLog = KafkaQueryChangeLogFactory.make(bootstrapServers, topic); // Execute the delete query command. - final DeleteQuery deleteQuery = new DefaultDeleteQuery(repo); - try { - deleteQuery.delete(UUID.fromString(params.queryId)); - log.trace("Deleted query: " + params.queryId); - } catch (final RyaStreamsException e) { - log.error("Unable to delete query with ID: " + params.queryId, e); + try(QueryRepository queryRepo = new InMemoryQueryRepository(queryChangeLog)) { + final DeleteQuery deleteQuery = new DefaultDeleteQuery(queryRepo); + try { + deleteQuery.delete(UUID.fromString(params.queryId)); + System.out.println("Deleted query: " + params.queryId); + } catch (final RyaStreamsException e) { + System.err.println("Unable to delete query with ID: " + params.queryId); + e.printStackTrace(); + System.exit(1); + } + } catch (final Exception e) { + System.err.println("Problem encountered while closing the QueryRepository."); + e.printStackTrace(); + System.exit(1); } - - log.trace("Finished executing the Delete Query Command."); } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/f3ac7df1/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/ListQueriesCommand.java ---------------------------------------------------------------------- diff --git a/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/ListQueriesCommand.java b/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/ListQueriesCommand.java index c4e5de6..670007b 100644 --- a/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/ListQueriesCommand.java +++ b/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/ListQueriesCommand.java @@ -20,33 +20,18 @@ package org.apache.rya.streams.client.command; import static java.util.Objects.requireNonNull; -import java.util.Properties; import java.util.Set; -import org.apache.kafka.clients.CommonClientConfigs; -import org.apache.kafka.clients.consumer.Consumer; -import org.apache.kafka.clients.consumer.ConsumerConfig; -import org.apache.kafka.clients.consumer.KafkaConsumer; -import org.apache.kafka.clients.producer.KafkaProducer; -import org.apache.kafka.clients.producer.Producer; -import org.apache.kafka.clients.producer.ProducerConfig; -import org.apache.kafka.common.serialization.StringDeserializer; -import org.apache.kafka.common.serialization.StringSerializer; import org.apache.rya.streams.api.entity.StreamsQuery; import org.apache.rya.streams.api.exception.RyaStreamsException; import org.apache.rya.streams.api.interactor.ListQueries; import org.apache.rya.streams.api.interactor.defaults.DefaultListQueries; import org.apache.rya.streams.api.queries.InMemoryQueryRepository; -import org.apache.rya.streams.api.queries.QueryChange; import org.apache.rya.streams.api.queries.QueryChangeLog; import org.apache.rya.streams.api.queries.QueryRepository; import org.apache.rya.streams.client.RyaStreamsCommand; import org.apache.rya.streams.kafka.KafkaTopics; -import org.apache.rya.streams.kafka.queries.KafkaQueryChangeLog; -import org.apache.rya.streams.kafka.serialization.queries.QueryChangeDeserializer; -import org.apache.rya.streams.kafka.serialization.queries.QueryChangeSerializer; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import org.apache.rya.streams.kafka.queries.KafkaQueryChangeLogFactory; import com.beust.jcommander.JCommander; import com.beust.jcommander.ParameterException; @@ -59,7 +44,6 @@ import edu.umd.cs.findbugs.annotations.NonNull; */ @DefaultAnnotation(NonNull.class) public class ListQueriesCommand implements RyaStreamsCommand { - private static final Logger log = LoggerFactory.getLogger(ListQueriesCommand.class); @Override public String getCommand() { @@ -72,6 +56,17 @@ public class ListQueriesCommand implements RyaStreamsCommand { } @Override + public boolean validArguments(final String[] args) { + boolean valid = true; + try { + new JCommander(new KafkaParameters(), args); + } catch(final ParameterException e) { + valid = false; + } + return valid; + } + + @Override public void execute(final String[] args) throws ArgumentsException, ExecutionException { requireNonNull(args); @@ -82,32 +77,27 @@ public class ListQueriesCommand implements RyaStreamsCommand { } catch (final ParameterException e) { throw new ArgumentsException("Could not list the queries because of invalid command line parameters.", e); } - log.trace("Executing the List Query Command.\n" + params.toString()); - // Create properties for interacting with Kafka. - final Properties producerProperties = new Properties(); - producerProperties.setProperty(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, params.kafkaIP + ":" + params.kafkaPort); - producerProperties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); - producerProperties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, QueryChangeSerializer.class.getName()); - - final Properties consumerProperties = new Properties(); - consumerProperties.setProperty(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, params.kafkaIP + ":" + params.kafkaPort); - consumerProperties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); - consumerProperties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, QueryChangeDeserializer.class.getName()); - - final Producer<?, QueryChange> queryProducer = new KafkaProducer<>(producerProperties); - final Consumer<?, QueryChange> queryConsumer = new KafkaConsumer<>(consumerProperties); - - final QueryChangeLog changeLog = new KafkaQueryChangeLog(queryProducer, queryConsumer, KafkaTopics.queryChangeLogTopic(params.ryaInstance)); - final QueryRepository repo = new InMemoryQueryRepository(changeLog); + // Create the Kafka backed QueryChangeLog. + final String bootstrapServers = params.kafkaIP + ":" + params.kafkaPort; + final String topic = KafkaTopics.queryChangeLogTopic(params.ryaInstance); + final QueryChangeLog queryChangeLog = KafkaQueryChangeLogFactory.make(bootstrapServers, topic); // Execute the list queries command. - final ListQueries listQueries = new DefaultListQueries(repo); - try { - final Set<StreamsQuery> queries = listQueries.all(); - System.out.println( formatQueries(queries) ); - } catch (final RyaStreamsException e) { - log.error("Unable to retrieve the queries.", e); + try(QueryRepository queryRepo = new InMemoryQueryRepository(queryChangeLog)) { + final ListQueries listQueries = new DefaultListQueries(queryRepo); + try { + final Set<StreamsQuery> queries = listQueries.all(); + System.out.println( formatQueries(queries) ); + } catch (final RyaStreamsException e) { + System.err.println("Unable to retrieve the queries."); + e.printStackTrace(); + System.exit(1); + } + } catch (final Exception e) { + System.err.println("Problem encountered while closing the QueryRepository."); + e.printStackTrace(); + System.exit(1); } } http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/f3ac7df1/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/LoadStatementsCommand.java ---------------------------------------------------------------------- diff --git a/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/LoadStatementsCommand.java b/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/LoadStatementsCommand.java index 4763bd8..6ae63da 100644 --- a/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/LoadStatementsCommand.java +++ b/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/LoadStatementsCommand.java @@ -34,8 +34,6 @@ import org.apache.rya.streams.client.RyaStreamsCommand; import org.apache.rya.streams.kafka.KafkaTopics; import org.apache.rya.streams.kafka.interactor.KafkaLoadStatements; import org.apache.rya.streams.kafka.serialization.VisibilityStatementSerializer; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import com.beust.jcommander.JCommander; import com.beust.jcommander.Parameter; @@ -51,7 +49,6 @@ import edu.umd.cs.findbugs.annotations.NonNull; */ @DefaultAnnotation(NonNull.class) public class LoadStatementsCommand implements RyaStreamsCommand { - private static final Logger log = LoggerFactory.getLogger(LoadStatementsCommand.class); /** * Command line parameters that are used by this command to configure itself. @@ -90,7 +87,7 @@ public class LoadStatementsCommand implements RyaStreamsCommand { @Override public String getDescription() { - return "Load RDF Statements into Rya Streams"; + return "Load RDF Statements into Rya Streams."; } @Override @@ -103,10 +100,20 @@ public class LoadStatementsCommand implements RyaStreamsCommand { } @Override + public boolean validArguments(final String[] args) { + boolean valid = true; + try { + new JCommander(new LoadStatementsParameters(), args); + } catch(final ParameterException e) { + valid = false; + } + return valid; + } + + @Override public void execute(final String[] args) throws ArgumentsException, ExecutionException { requireNonNull(args); - // Parse the command line arguments. final LoadStatementsParameters params = new LoadStatementsParameters(); try { @@ -114,9 +121,7 @@ public class LoadStatementsCommand implements RyaStreamsCommand { } catch(final ParameterException e) { throw new ArgumentsException("Could not load the Statements file because of invalid command line parameters.", e); } - log.trace("Executing the Load Statements Command\n" + params.toString()); - log.trace("Loading Statements from the file '" + params.statementsFile + "'."); final Path statementsPath = Paths.get(params.statementsFile); final Properties producerProps = buildProperties(params); @@ -124,10 +129,8 @@ public class LoadStatementsCommand implements RyaStreamsCommand { final LoadStatements statements = new KafkaLoadStatements(KafkaTopics.statementsTopic(params.ryaInstance), producer); statements.load(statementsPath, params.visibilities); } catch (final Exception e) { - log.error("Unable to parse statements file: " + statementsPath.toString(), e); + System.err.println("Unable to parse statements file: " + statementsPath.toString()); } - - log.trace("Finished executing the Load Statements Command."); } private Properties buildProperties(final LoadStatementsParameters params) { @@ -138,4 +141,4 @@ public class LoadStatementsCommand implements RyaStreamsCommand { props.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, VisibilityStatementSerializer.class.getName()); return props; } -} +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/f3ac7df1/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/StreamResultsCommand.java ---------------------------------------------------------------------- diff --git a/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/StreamResultsCommand.java b/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/StreamResultsCommand.java new file mode 100644 index 0000000..9de978b --- /dev/null +++ b/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/StreamResultsCommand.java @@ -0,0 +1,141 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.streams.client.command; + +import static java.util.Objects.requireNonNull; + +import java.util.UUID; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.apache.rya.api.model.VisibilityBindingSet; +import org.apache.rya.streams.api.entity.QueryResultStream; +import org.apache.rya.streams.api.interactor.GetQueryResultStream; +import org.apache.rya.streams.client.RyaStreamsCommand; +import org.apache.rya.streams.kafka.interactor.KafkaGetQueryResultStream; + +import com.beust.jcommander.JCommander; +import com.beust.jcommander.Parameter; +import com.beust.jcommander.ParameterException; +import com.google.common.base.Strings; + +import edu.umd.cs.findbugs.annotations.DefaultAnnotation; +import edu.umd.cs.findbugs.annotations.NonNull; + +/** + * A command that streams the results of a query to the console. + */ +@DefaultAnnotation(NonNull.class) +public class StreamResultsCommand implements RyaStreamsCommand { + + /** + * Command line parameters that are used by this command to configure itself. + */ + private static final class StreamResultsParameters extends RyaStreamsCommand.KafkaParameters { + + @Parameter(names = {"--queryId", "-q"}, required = true, description = "The query whose results will be streamed to the console.") + private String queryId; + + @Override + public String toString() { + final StringBuilder parameters = new StringBuilder(); + parameters.append(super.toString()); + + if (!Strings.isNullOrEmpty(queryId)) { + parameters.append("\tQuery ID: " + queryId); + parameters.append("\n"); + } + + return parameters.toString(); + } + } + + @Override + public String getCommand() { + return "stream-results"; + } + + @Override + public String getDescription() { + return "Stream the results of a query to the console."; + } + + @Override + public String getUsage() { + final JCommander parser = new JCommander(new StreamResultsParameters()); + + final StringBuilder usage = new StringBuilder(); + parser.usage(usage); + return usage.toString(); + } + + @Override + public boolean validArguments(final String[] args) { + boolean valid = true; + try { + new JCommander(new StreamResultsParameters(), args); + } catch(final ParameterException e) { + valid = false; + } + return valid; + } + + @Override + public void execute(final String[] args) throws ArgumentsException, ExecutionException { + requireNonNull(args); + + // Parse the command line arguments. + final StreamResultsParameters params = new StreamResultsParameters(); + try { + new JCommander(params, args); + } catch(final ParameterException e) { + throw new ArgumentsException("Could not stream the query's results because of invalid command line parameters.", e); + } + + final UUID queryId; + try { + queryId = UUID.fromString( params.queryId ); + } catch(final IllegalArgumentException e) { + throw new ArgumentsException("Invalid Query ID " + params.queryId); + } + + // This command executes until the application is killed, so create a kill boolean. + final AtomicBoolean finished = new AtomicBoolean(false); + Runtime.getRuntime().addShutdownHook(new Thread() { + @Override + public void run() { + finished.set(true); + } + }); + + // Execute the command. + final GetQueryResultStream getQueryResultStream = new KafkaGetQueryResultStream(params.kafkaIP, params.kafkaPort); + + try (final QueryResultStream stream = getQueryResultStream.fromNow(queryId)) { + while(!finished.get()) { + for(final VisibilityBindingSet visBs : stream.poll(1000)) { + System.out.println(visBs); + } + } + } catch (final Exception e) { + System.err.println("Error while reading the results from the stream."); + e.printStackTrace(); + System.exit(1); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/f3ac7df1/extras/rya.streams/client/src/main/resources/log4j.properties ---------------------------------------------------------------------- diff --git a/extras/rya.streams/client/src/main/resources/log4j.properties b/extras/rya.streams/client/src/main/resources/log4j.properties new file mode 100644 index 0000000..b07468c --- /dev/null +++ b/extras/rya.streams/client/src/main/resources/log4j.properties @@ -0,0 +1,27 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +# Root logger option +log4j.rootLogger=INFO, stdout + +# Direct log messages to stdout +log4j.appender.stdout=org.apache.log4j.ConsoleAppender +log4j.appender.stdout.Target=System.out +log4j.appender.stdout.layout=org.apache.log4j.PatternLayout +log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %-5p %c{1}:%L - %m%n http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/f3ac7df1/extras/rya.streams/client/src/test/java/org/apache/rya/streams/client/command/AddQueryCommandIT.java ---------------------------------------------------------------------- diff --git a/extras/rya.streams/client/src/test/java/org/apache/rya/streams/client/command/AddQueryCommandIT.java b/extras/rya.streams/client/src/test/java/org/apache/rya/streams/client/command/AddQueryCommandIT.java index 09e874c..ee4378e 100644 --- a/extras/rya.streams/client/src/test/java/org/apache/rya/streams/client/command/AddQueryCommandIT.java +++ b/extras/rya.streams/client/src/test/java/org/apache/rya/streams/client/command/AddQueryCommandIT.java @@ -42,7 +42,6 @@ import org.apache.rya.streams.kafka.KafkaTopics; import org.apache.rya.streams.kafka.queries.KafkaQueryChangeLog; import org.apache.rya.streams.kafka.serialization.queries.QueryChangeDeserializer; import org.apache.rya.streams.kafka.serialization.queries.QueryChangeSerializer; -import org.apache.rya.test.kafka.KafkaITBase; import org.apache.rya.test.kafka.KafkaTestInstanceRule; import org.junit.After; import org.junit.Before; @@ -52,7 +51,7 @@ import org.junit.Test; /** * integration Test for adding a new query through a command. */ -public class AddQueryCommandIT extends KafkaITBase { +public class AddQueryCommandIT { private final String ryaInstance = UUID.randomUUID().toString(); http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/f3ac7df1/extras/rya.streams/client/src/test/java/org/apache/rya/streams/client/command/DeleteQueryCommandIT.java ---------------------------------------------------------------------- diff --git a/extras/rya.streams/client/src/test/java/org/apache/rya/streams/client/command/DeleteQueryCommandIT.java b/extras/rya.streams/client/src/test/java/org/apache/rya/streams/client/command/DeleteQueryCommandIT.java index 0079371..c5dad3d 100644 --- a/extras/rya.streams/client/src/test/java/org/apache/rya/streams/client/command/DeleteQueryCommandIT.java +++ b/extras/rya.streams/client/src/test/java/org/apache/rya/streams/client/command/DeleteQueryCommandIT.java @@ -43,7 +43,6 @@ import org.apache.rya.streams.kafka.KafkaTopics; import org.apache.rya.streams.kafka.queries.KafkaQueryChangeLog; import org.apache.rya.streams.kafka.serialization.queries.QueryChangeDeserializer; import org.apache.rya.streams.kafka.serialization.queries.QueryChangeSerializer; -import org.apache.rya.test.kafka.KafkaITBase; import org.apache.rya.test.kafka.KafkaTestInstanceRule; import org.junit.After; import org.junit.Before; @@ -53,7 +52,7 @@ import org.junit.Test; /** * Integration Test for deleting a query from Rya Streams through a command. */ -public class DeleteQueryCommandIT extends KafkaITBase { +public class DeleteQueryCommandIT { private final String ryaInstance = UUID.randomUUID().toString(); http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/f3ac7df1/extras/rya.streams/client/src/test/java/org/apache/rya/streams/client/command/ListQueryCommandIT.java ---------------------------------------------------------------------- diff --git a/extras/rya.streams/client/src/test/java/org/apache/rya/streams/client/command/ListQueryCommandIT.java b/extras/rya.streams/client/src/test/java/org/apache/rya/streams/client/command/ListQueryCommandIT.java index eb759ba..b32967e 100644 --- a/extras/rya.streams/client/src/test/java/org/apache/rya/streams/client/command/ListQueryCommandIT.java +++ b/extras/rya.streams/client/src/test/java/org/apache/rya/streams/client/command/ListQueryCommandIT.java @@ -38,7 +38,6 @@ import org.apache.rya.streams.kafka.KafkaTopics; import org.apache.rya.streams.kafka.queries.KafkaQueryChangeLog; import org.apache.rya.streams.kafka.serialization.queries.QueryChangeDeserializer; import org.apache.rya.streams.kafka.serialization.queries.QueryChangeSerializer; -import org.apache.rya.test.kafka.KafkaITBase; import org.apache.rya.test.kafka.KafkaTestInstanceRule; import org.junit.After; import org.junit.Before; @@ -48,7 +47,7 @@ import org.junit.Test; /** * integration Test for listing queries through a command. */ -public class ListQueryCommandIT extends KafkaITBase { +public class ListQueryCommandIT { private final String ryaInstance = UUID.randomUUID().toString(); http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/f3ac7df1/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/queries/KafkaQueryChangeLog.java ---------------------------------------------------------------------- diff --git a/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/queries/KafkaQueryChangeLog.java b/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/queries/KafkaQueryChangeLog.java index 19622ae..9403e4b 100644 --- a/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/queries/KafkaQueryChangeLog.java +++ b/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/queries/KafkaQueryChangeLog.java @@ -23,11 +23,14 @@ import static java.util.Objects.requireNonNull; import java.util.ArrayList; import java.util.Iterator; import java.util.List; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.common.TopicPartition; import org.apache.rya.streams.api.queries.ChangeLogEntry; import org.apache.rya.streams.api.queries.QueryChange; @@ -76,7 +79,17 @@ public class KafkaQueryChangeLog implements QueryChangeLog { @Override public void write(final QueryChange newChange) throws QueryChangeLogException { requireNonNull(newChange); - producer.send(new ProducerRecord<>(topic, newChange)); + + // Write the change to the log immediately. + final Future<RecordMetadata> future = producer.send(new ProducerRecord<>(topic, newChange)); + producer.flush(); + + // Don't return until the write has been completed. + try { + future.get(); + } catch (InterruptedException | ExecutionException e) { + throw new QueryChangeLogException("Could not record a new query change to the Kafka Query Change Log.", e); + } } @Override @@ -96,6 +109,15 @@ public class KafkaQueryChangeLog implements QueryChangeLog { } /** + * Closing this class will also close the {@link Producer} and {@link Consumer} that were passed into it. + */ + @Override + public void close() throws Exception { + producer.close(); + consumer.close(); + } + + /** * A {@link CloseableIteration} to iterate over a consumer's results. Since * the consumer returns in bulk when poll(), a cache of recent polling is * maintained. @@ -149,10 +171,10 @@ public class KafkaQueryChangeLog implements QueryChangeLog { final ConsumerRecords<?, QueryChange> records = consumer.poll(3000L); final List<ChangeLogEntry<QueryChange>> changes = new ArrayList<>(); records.forEach( - record -> - changes.add(new ChangeLogEntry<QueryChange>(record.offset(), record.value())) + record -> + changes.add(new ChangeLogEntry<>(record.offset(), record.value())) ); iterCache = changes.iterator(); } } -} +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/f3ac7df1/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/queries/KafkaQueryChangeLogFactory.java ---------------------------------------------------------------------- diff --git a/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/queries/KafkaQueryChangeLogFactory.java b/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/queries/KafkaQueryChangeLogFactory.java new file mode 100644 index 0000000..5042b30 --- /dev/null +++ b/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/queries/KafkaQueryChangeLogFactory.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.streams.kafka.queries; + +import static java.util.Objects.requireNonNull; + +import java.util.Properties; + +import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.serialization.StringSerializer; +import org.apache.rya.streams.api.queries.QueryChange; +import org.apache.rya.streams.kafka.serialization.queries.QueryChangeDeserializer; +import org.apache.rya.streams.kafka.serialization.queries.QueryChangeSerializer; + +import edu.umd.cs.findbugs.annotations.DefaultAnnotation; +import edu.umd.cs.findbugs.annotations.NonNull; + +/** + * Creates instances of {@link KafkaQueryChangeLog}. + */ +@DefaultAnnotation(NonNull.class) +public class KafkaQueryChangeLogFactory { + + /** + * Creates an instance of {@link KafkaQueryChangeLog} using a new {@link Producer} and {@link Consumer}. + * + * @param bootstrapServers - Indicates which instance of Kafka that will be connected to. (not null) + * @param topic - The topic the QueryChangeLog is persisted to. (not null) + * @return A new instance of {@link KafkaQueryChangeLog}. + */ + public static KafkaQueryChangeLog make( + final String bootstrapServers, + final String topic) { + requireNonNull(bootstrapServers); + requireNonNull(topic); + + final Properties producerProperties = new Properties(); + producerProperties.setProperty(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); + producerProperties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); + producerProperties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, QueryChangeSerializer.class.getName()); + + final Properties consumerProperties = new Properties(); + consumerProperties.setProperty(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); + consumerProperties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); + consumerProperties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, QueryChangeDeserializer.class.getName()); + + final Producer<?, QueryChange> producer = new KafkaProducer<>(producerProperties); + final Consumer<?, QueryChange> consumer = new KafkaConsumer<>(consumerProperties); + return new KafkaQueryChangeLog(producer, consumer, topic); + } +} \ No newline at end of file