RYA-377 Implement a command for running a Rya Streams query out of the command line client.
Project: http://git-wip-us.apache.org/repos/asf/incubator-rya/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-rya/commit/94423229 Tree: http://git-wip-us.apache.org/repos/asf/incubator-rya/tree/94423229 Diff: http://git-wip-us.apache.org/repos/asf/incubator-rya/diff/94423229 Branch: refs/heads/master Commit: 94423229ebe7b34e0fb6c17fbe022e080cfe79d9 Parents: a5e3618 Author: kchilton2 <kevin.e.chil...@gmail.com> Authored: Tue Nov 14 18:32:53 2017 -0500 Committer: caleb <caleb.me...@parsons.com> Committed: Tue Jan 9 15:13:01 2018 -0500 ---------------------------------------------------------------------- .../rya/streams/api/interactor/RunQuery.java | 41 ++++ .../api/queries/InMemoryQueryRepository.java | 21 +- .../streams/api/queries/QueryRepository.java | 14 +- .../queries/InMemoryQueryRepositoryTest.java | 121 ++++++----- .../apache/rya/streams/client/CLIDriver.java | 2 + .../client/command/LoadStatementsCommand.java | 1 + .../streams/client/command/RunQueryCommand.java | 155 ++++++++++++++ .../client/command/StreamResultsCommand.java | 2 +- .../client/command/AddQueryCommandIT.java | 55 ++--- .../client/command/DeleteQueryCommandIT.java | 183 +++++++--------- .../client/command/ListQueryCommandIT.java | 56 ++--- .../client/command/LoadStatementsCommandIT.java | 78 ++----- .../client/command/RunQueryCommandIT.java | 196 +++++++++++++++++ extras/rya.streams/kafka/pom.xml | 10 + .../apache/rya/streams/kafka/KafkaTopics.java | 39 ++++ .../kafka/interactor/KafkaLoadStatements.java | 5 +- .../streams/kafka/interactor/KafkaRunQuery.java | 136 ++++++++++++ .../processors/join/KeyValueJoinStateStore.java | 5 +- .../apache/rya/streams/kafka/KafkaTestUtil.java | 211 ------------------- .../rya/streams/kafka/RyaStreamsTestUtil.java | 124 +++++++++++ .../interactor/KafkaGetQueryResultStreamIT.java | 2 +- .../kafka/interactor/KafkaLoadStatementsIT.java | 5 +- .../kafka/interactor/KafkaRunQueryIT.java | 170 +++++++++++++++ .../processors/StatementPatternProcessorIT.java | 10 +- .../processors/filter/FilterProcessorIT.java | 4 +- .../kafka/processors/join/JoinProcessorIT.java | 12 +- .../projection/MultiProjectionProcessorIT.java | 4 +- .../projection/ProjectionProcessorIT.java | 4 +- .../kafka/queries/KafkaQueryChangeLogIT.java | 2 +- .../VisibilityBindingSetKafkaIT.java | 2 +- .../VisibilityStatementKafkaIT.java | 2 +- test/kafka/pom.xml | 5 + .../rya/test/kafka/KafkaTestInstanceRule.java | 7 + .../apache/rya/test/kafka/KafkaTestUtil.java | 126 +++++++++++ 34 files changed, 1269 insertions(+), 541 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/94423229/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/interactor/RunQuery.java ---------------------------------------------------------------------- diff --git a/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/interactor/RunQuery.java b/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/interactor/RunQuery.java new file mode 100644 index 0000000..7f47095 --- /dev/null +++ b/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/interactor/RunQuery.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.streams.api.interactor; + +import java.util.UUID; + +import org.apache.rya.streams.api.exception.RyaStreamsException; + +import edu.umd.cs.findbugs.annotations.DefaultAnnotation; +import edu.umd.cs.findbugs.annotations.NonNull; + +/** + * Runs a Rya Streams processing topology on the machine this class is invoked on. + */ +@DefaultAnnotation(NonNull.class) +public interface RunQuery { + + /** + * Runs the specified query on the machine this method was invoked on. + * + * @param queryId - The id of the query that will be processed. (not null) + * @throws RyaStreamsException The query could not processed. + */ + public void run(UUID queryId) throws RyaStreamsException; +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/94423229/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/queries/InMemoryQueryRepository.java ---------------------------------------------------------------------- diff --git a/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/queries/InMemoryQueryRepository.java b/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/queries/InMemoryQueryRepository.java index c1048fc..80678de 100644 --- a/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/queries/InMemoryQueryRepository.java +++ b/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/queries/InMemoryQueryRepository.java @@ -22,6 +22,7 @@ import static java.util.Objects.requireNonNull; import java.util.HashMap; import java.util.Map; +import java.util.Optional; import java.util.Set; import java.util.UUID; import java.util.concurrent.locks.ReentrantLock; @@ -63,13 +64,7 @@ public class InMemoryQueryRepository implements QueryRepository { this.changeLog = requireNonNull(changeLog); // Lazily initialize the queries cache the first time you try to use it. - queriesCache = Suppliers.memoize(new Supplier<Map<UUID, StreamsQuery>>() { - @Override - public Map<UUID, StreamsQuery> get() { - // Initialize the queries cache using the current state of the change log. - return initializeCache(changeLog); - } - }); + queriesCache = Suppliers.memoize(() -> initializeCache(changeLog)); } @Override @@ -98,6 +93,18 @@ public class InMemoryQueryRepository implements QueryRepository { } @Override + public Optional<StreamsQuery> get(final UUID queryId) throws QueryRepositoryException { + requireNonNull(queryId); + + lock.lock(); + try { + return Optional.ofNullable( queriesCache.get().get(queryId) ); + } finally { + lock.unlock(); + } + } + + @Override public void delete(final UUID queryId) throws QueryRepositoryException { requireNonNull(queryId); http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/94423229/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/queries/QueryRepository.java ---------------------------------------------------------------------- diff --git a/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/queries/QueryRepository.java b/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/queries/QueryRepository.java index 850b2bc..7269588 100644 --- a/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/queries/QueryRepository.java +++ b/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/queries/QueryRepository.java @@ -18,6 +18,7 @@ */ package org.apache.rya.streams.api.queries; +import java.util.Optional; import java.util.Set; import java.util.UUID; @@ -32,6 +33,7 @@ import edu.umd.cs.findbugs.annotations.NonNull; */ @DefaultAnnotation(NonNull.class) public interface QueryRepository extends AutoCloseable { + /** * Adds a new query to Rya Streams. * @@ -42,6 +44,15 @@ public interface QueryRepository extends AutoCloseable { public StreamsQuery add(final String query) throws QueryRepositoryException; /** + * Get an existing query from Rya Streams. + * + * @param queryId - Identifies which query will be fetched. + * @return the {@link StreamsQuery} for the id if one exists; otherwise empty. + * @throws QueryRepositoryException The query could not be fetched. + */ + public Optional<StreamsQuery> get(UUID queryId) throws QueryRepositoryException; + + /** * Removes an existing query from Rya Streams. * * @param queryID - The {@link UUID} of the query to remove. (not null) @@ -53,8 +64,7 @@ public interface QueryRepository extends AutoCloseable { * Lists all existing queries in Rya Streams. * * @return - A List of the current {@link StreamsQuery}s - * @throws QueryRepositoryException The {@link StreamsQuery}s could not be - * listed. + * @throws QueryRepositoryException The {@link StreamsQuery}s could not be listed. */ public Set<StreamsQuery> list() throws QueryRepositoryException; http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/94423229/extras/rya.streams/api/src/test/java/org/apache/rya/streams/api/queries/InMemoryQueryRepositoryTest.java ---------------------------------------------------------------------- diff --git a/extras/rya.streams/api/src/test/java/org/apache/rya/streams/api/queries/InMemoryQueryRepositoryTest.java b/extras/rya.streams/api/src/test/java/org/apache/rya/streams/api/queries/InMemoryQueryRepositoryTest.java index 25cbab2..92193ca 100644 --- a/extras/rya.streams/api/src/test/java/org/apache/rya/streams/api/queries/InMemoryQueryRepositoryTest.java +++ b/extras/rya.streams/api/src/test/java/org/apache/rya/streams/api/queries/InMemoryQueryRepositoryTest.java @@ -19,16 +19,17 @@ package org.apache.rya.streams.api.queries; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; import java.util.HashSet; +import java.util.Optional; import java.util.Set; import java.util.UUID; import org.apache.rya.streams.api.entity.StreamsQuery; import org.apache.rya.streams.api.queries.QueryChangeLog.QueryChangeLogException; -import org.apache.rya.streams.api.queries.QueryRepository.QueryRepositoryException; import org.junit.Test; /** @@ -37,69 +38,95 @@ import org.junit.Test; public class InMemoryQueryRepositoryTest { @Test - public void canReadAddedQueries() throws QueryRepositoryException { + public void canReadAddedQueries() throws Exception { // Setup a totally in memory QueryRepository. - final QueryRepository queries = new InMemoryQueryRepository( new InMemoryQueryChangeLog() ); - - // Add some queries to it. - final Set<StreamsQuery> expected = new HashSet<>(); - expected.add( queries.add("query 1") ); - expected.add( queries.add("query 2") ); - expected.add( queries.add("query 3") ); - - // Show they are in the list of all queries. - final Set<StreamsQuery> stored = queries.list(); - assertEquals(expected, stored); + try(final QueryRepository queries = new InMemoryQueryRepository( new InMemoryQueryChangeLog() )) { + // Add some queries to it. + final Set<StreamsQuery> expected = new HashSet<>(); + expected.add( queries.add("query 1") ); + expected.add( queries.add("query 2") ); + expected.add( queries.add("query 3") ); + + // Show they are in the list of all queries. + final Set<StreamsQuery> stored = queries.list(); + assertEquals(expected, stored); + } } @Test - public void deletedQueriesDisappear() throws QueryRepositoryException { + public void deletedQueriesDisappear() throws Exception { // Setup a totally in memory QueryRepository. - final QueryRepository queries = new InMemoryQueryRepository( new InMemoryQueryChangeLog() ); - - // Add some queries to it. The second one we will delete. - final Set<StreamsQuery> expected = new HashSet<>(); - expected.add( queries.add("query 1") ); - final UUID deletedMeId = queries.add("query 2").getQueryId(); - expected.add( queries.add("query 3") ); - - // Delete the second query. - queries.delete( deletedMeId ); - - // Show only queries 1 and 3 are in the list. - final Set<StreamsQuery> stored = queries.list(); - assertEquals(expected, stored); + try(final QueryRepository queries = new InMemoryQueryRepository( new InMemoryQueryChangeLog() )) { + // Add some queries to it. The second one we will delete. + final Set<StreamsQuery> expected = new HashSet<>(); + expected.add( queries.add("query 1") ); + final UUID deletedMeId = queries.add("query 2").getQueryId(); + expected.add( queries.add("query 3") ); + + // Delete the second query. + queries.delete( deletedMeId ); + + // Show only queries 1 and 3 are in the list. + final Set<StreamsQuery> stored = queries.list(); + assertEquals(expected, stored); + } } @Test - public void initializedWithPopulatedChnageLog() throws QueryRepositoryException { + public void initializedWithPopulatedChnageLog() throws Exception { // Setup a totally in memory QueryRepository. Hold onto the change log so that we can use it again later. final QueryChangeLog changeLog = new InMemoryQueryChangeLog(); - final QueryRepository queries = new InMemoryQueryRepository( changeLog ); - - // Add some queries and deletes to it. - final Set<StreamsQuery> expected = new HashSet<>(); - expected.add( queries.add("query 1") ); - final UUID deletedMeId = queries.add("query 2").getQueryId(); - expected.add( queries.add("query 3") ); - queries.delete( deletedMeId ); - - // Create a new totally in memory QueryRepository. - final QueryRepository initializedQueries = new InMemoryQueryRepository( changeLog ); - - // Listing the queries should work using an initialized change log. - final Set<StreamsQuery> stored = initializedQueries.list(); - assertEquals(expected, stored); + try(final QueryRepository queries = new InMemoryQueryRepository( changeLog )) { + // Add some queries and deletes to it. + final Set<StreamsQuery> expected = new HashSet<>(); + expected.add( queries.add("query 1") ); + final UUID deletedMeId = queries.add("query 2").getQueryId(); + expected.add( queries.add("query 3") ); + queries.delete( deletedMeId ); + + // Create a new totally in memory QueryRepository. + try(final QueryRepository initializedQueries = new InMemoryQueryRepository( changeLog )) { + // Listing the queries should work using an initialized change log. + final Set<StreamsQuery> stored = initializedQueries.list(); + assertEquals(expected, stored); + } + } } @Test(expected = RuntimeException.class) - public void changeLogThrowsExceptions() throws QueryChangeLogException, QueryRepositoryException { + public void changeLogThrowsExceptions() throws Exception { // Create a mock change log that throws an exception when you try to list what is in it. final QueryChangeLog changeLog = mock(QueryChangeLog.class); when(changeLog.readFromStart()).thenThrow(new QueryChangeLogException("Mocked exception.")); // Create the QueryRepository and invoke one of the methods. - final QueryRepository queries = new InMemoryQueryRepository( changeLog ); - queries.list(); + try(final QueryRepository queries = new InMemoryQueryRepository( changeLog )) { + queries.list(); + } + } + + @Test + public void get_present() throws Exception { + // Setup a totally in memory QueryRepository. + try(final QueryRepository queries = new InMemoryQueryRepository( new InMemoryQueryChangeLog() )) { + // Add a query to it. + final StreamsQuery query = queries.add("query 1"); + + // Show the fetched query matches the expected ones. + final Optional<StreamsQuery> fetched = queries.get(query.getQueryId()); + assertEquals(query, fetched.get()); + } + } + + @Test + public void get_notPresent() throws Exception { + // Setup a totally in memory QueryRepository. + try(final QueryRepository queries = new InMemoryQueryRepository( new InMemoryQueryChangeLog() )) { + // Fetch a query that was never added to the repository. + final Optional<StreamsQuery> query = queries.get(UUID.randomUUID()); + + // Show it could not be found. + assertFalse(query.isPresent()); + } } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/94423229/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/CLIDriver.java ---------------------------------------------------------------------- diff --git a/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/CLIDriver.java b/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/CLIDriver.java index 5c0816f..05e75d9 100644 --- a/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/CLIDriver.java +++ b/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/CLIDriver.java @@ -30,6 +30,7 @@ import org.apache.rya.streams.client.command.AddQueryCommand; import org.apache.rya.streams.client.command.DeleteQueryCommand; import org.apache.rya.streams.client.command.ListQueriesCommand; import org.apache.rya.streams.client.command.LoadStatementsCommand; +import org.apache.rya.streams.client.command.RunQueryCommand; import org.apache.rya.streams.client.command.StreamResultsCommand; import com.google.common.collect.ImmutableMap; @@ -63,6 +64,7 @@ public class CLIDriver { commandClasses.add(DeleteQueryCommand.class); commandClasses.add(ListQueriesCommand.class); commandClasses.add(LoadStatementsCommand.class); + commandClasses.add(RunQueryCommand.class); commandClasses.add(StreamResultsCommand.class); final ImmutableMap.Builder<String, RyaStreamsCommand> builder = ImmutableMap.builder(); for(final Class<? extends RyaStreamsCommand> commandClass : commandClasses) { http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/94423229/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/LoadStatementsCommand.java ---------------------------------------------------------------------- diff --git a/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/LoadStatementsCommand.java b/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/LoadStatementsCommand.java index 9414b28..42020b3 100644 --- a/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/LoadStatementsCommand.java +++ b/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/LoadStatementsCommand.java @@ -127,6 +127,7 @@ public class LoadStatementsCommand implements RyaStreamsCommand { final Properties producerProps = buildProperties(params); try (final Producer<Object, VisibilityStatement> producer = new KafkaProducer<>(producerProps)) { final LoadStatements statements = new KafkaLoadStatements(KafkaTopics.statementsTopic(params.ryaInstance), producer); + System.out.printf("Loading statements from file `%s` using visibilities `%s`.\n", statementsPath, params.visibilities); statements.fromFile(statementsPath, params.visibilities); } catch (final Exception e) { System.err.println("Unable to parse statements file: " + statementsPath.toString()); http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/94423229/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/RunQueryCommand.java ---------------------------------------------------------------------- diff --git a/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/RunQueryCommand.java b/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/RunQueryCommand.java new file mode 100644 index 0000000..8f7f162 --- /dev/null +++ b/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/RunQueryCommand.java @@ -0,0 +1,155 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.streams.client.command; + +import static java.util.Objects.requireNonNull; + +import java.util.HashSet; +import java.util.Optional; +import java.util.Set; +import java.util.UUID; + +import org.apache.rya.streams.api.entity.StreamsQuery; +import org.apache.rya.streams.api.queries.InMemoryQueryRepository; +import org.apache.rya.streams.api.queries.QueryChangeLog; +import org.apache.rya.streams.api.queries.QueryRepository; +import org.apache.rya.streams.client.RyaStreamsCommand; +import org.apache.rya.streams.kafka.KafkaTopics; +import org.apache.rya.streams.kafka.interactor.KafkaRunQuery; +import org.apache.rya.streams.kafka.queries.KafkaQueryChangeLogFactory; +import org.apache.rya.streams.kafka.topology.TopologyFactory; + +import com.beust.jcommander.JCommander; +import com.beust.jcommander.Parameter; +import com.beust.jcommander.ParameterException; +import com.google.common.base.Strings; + +import edu.umd.cs.findbugs.annotations.DefaultAnnotation; +import edu.umd.cs.findbugs.annotations.NonNull; + +/** + * A command that runs a Rya Streams processing topology on the node the client is executed on until it has finished. + */ +@DefaultAnnotation(NonNull.class) +public class RunQueryCommand implements RyaStreamsCommand { + + private class RunParameters extends RyaStreamsCommand.KafkaParameters { + @Parameter(names = { "--queryID", "-q" }, required = true, description = "The ID of the query to run.") + private String queryId; + + @Parameter(names = {"--zookeepers", "-z"}, required = true, description = "The servers that Zookeeper runs on.") + private String zookeeperServers; + + @Override + public String toString() { + final StringBuilder parameters = new StringBuilder(); + parameters.append(super.toString()); + + if (!Strings.isNullOrEmpty(queryId)) { + parameters.append("\tQueryID: " + queryId); + parameters.append("\n"); + } + return parameters.toString(); + } + } + + @Override + public String getCommand() { + return "run-query"; + } + + @Override + public String getDescription() { + return "Runs a Rya Streams query until the command is killed. This command also creates the input and output " + + "topics required to execute the query."; + } + + @Override + public String getUsage() { + final JCommander parser = new JCommander(new RunParameters()); + + final StringBuilder usage = new StringBuilder(); + parser.usage(usage); + return usage.toString(); + } + + @Override + public boolean validArguments(final String[] args) { + boolean valid = true; + try { + new JCommander(new RunParameters(), args); + } catch(final ParameterException e) { + valid = false; + } + return valid; + } + + @Override + public void execute(final String[] args) throws ArgumentsException, ExecutionException { + requireNonNull(args); + + // Parse the command line arguments. + final RunParameters params = new RunParameters(); + try { + new JCommander(params, args); + } catch(final ParameterException e) { + throw new ArgumentsException("Could not add a new query because of invalid command line parameters.", e); + } + + // Create the Kafka backed QueryChangeLog. + final String bootstrapServers = params.kafkaIP + ":" + params.kafkaPort; + final String topic = KafkaTopics.queryChangeLogTopic(params.ryaInstance); + final QueryChangeLog queryChangeLog = KafkaQueryChangeLogFactory.make(bootstrapServers, topic); + + // Look up the query to be executed from the change log. + try(QueryRepository queryRepo = new InMemoryQueryRepository(queryChangeLog)) { + try { + final UUID queryId = UUID.fromString( params.queryId ); + final Optional<StreamsQuery> query = queryRepo.get(queryId); + + if(!query.isPresent()) { + throw new ArgumentsException("There is no registered query for queryId " + params.queryId); + } + + // Make sure the topics required by the application exists for the specified Rya instances. + final Set<String> topics = new HashSet<>(); + topics.add( KafkaTopics.statementsTopic(params.ryaInstance) ); + topics.add( KafkaTopics.queryResultsTopic(queryId) ); + KafkaTopics.createTopic(params.zookeeperServers, topics, 1, 1); + + // Run the query that uses those topics. + final KafkaRunQuery runQuery = new KafkaRunQuery( + params.kafkaIP, + params.kafkaPort, + KafkaTopics.statementsTopic(params.ryaInstance), + KafkaTopics.queryResultsTopic(queryId), + queryRepo, + new TopologyFactory()); + runQuery.run(queryId); + } catch(final Exception e) { + throw new ExecutionException("Could not execute the Run Query command.", e); + } + } catch(final ArgumentsException | ExecutionException e) { + // Rethrow the exceptions that are advertised by execute. + throw e; + } catch (final Exception e) { + throw new ExecutionException("Problem encountered while closing the QueryRepository.", e); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/94423229/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/StreamResultsCommand.java ---------------------------------------------------------------------- diff --git a/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/StreamResultsCommand.java b/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/StreamResultsCommand.java index 9de978b..64f78a3 100644 --- a/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/StreamResultsCommand.java +++ b/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/StreamResultsCommand.java @@ -126,7 +126,7 @@ public class StreamResultsCommand implements RyaStreamsCommand { // Execute the command. final GetQueryResultStream getQueryResultStream = new KafkaGetQueryResultStream(params.kafkaIP, params.kafkaPort); - try (final QueryResultStream stream = getQueryResultStream.fromNow(queryId)) { + try (final QueryResultStream stream = getQueryResultStream.fromStart(queryId)) { while(!finished.get()) { for(final VisibilityBindingSet visBs : stream.poll(1000)) { System.out.println(visBs); http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/94423229/extras/rya.streams/client/src/test/java/org/apache/rya/streams/client/command/AddQueryCommandIT.java ---------------------------------------------------------------------- diff --git a/extras/rya.streams/client/src/test/java/org/apache/rya/streams/client/command/AddQueryCommandIT.java b/extras/rya.streams/client/src/test/java/org/apache/rya/streams/client/command/AddQueryCommandIT.java index ee4378e..3a412d2 100644 --- a/extras/rya.streams/client/src/test/java/org/apache/rya/streams/client/command/AddQueryCommandIT.java +++ b/extras/rya.streams/client/src/test/java/org/apache/rya/streams/client/command/AddQueryCommandIT.java @@ -20,17 +20,11 @@ package org.apache.rya.streams.client.command; import static org.junit.Assert.assertEquals; -import java.util.Properties; import java.util.Set; import java.util.UUID; -import org.apache.kafka.clients.CommonClientConfigs; import org.apache.kafka.clients.consumer.Consumer; -import org.apache.kafka.clients.consumer.ConsumerConfig; -import org.apache.kafka.clients.consumer.KafkaConsumer; -import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; -import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.serialization.StringSerializer; import org.apache.rya.streams.api.entity.StreamsQuery; @@ -43,6 +37,7 @@ import org.apache.rya.streams.kafka.queries.KafkaQueryChangeLog; import org.apache.rya.streams.kafka.serialization.queries.QueryChangeDeserializer; import org.apache.rya.streams.kafka.serialization.queries.QueryChangeSerializer; import org.apache.rya.test.kafka.KafkaTestInstanceRule; +import org.apache.rya.test.kafka.KafkaTestUtil; import org.junit.After; import org.junit.Before; import org.junit.Rule; @@ -54,49 +49,27 @@ import org.junit.Test; public class AddQueryCommandIT { private final String ryaInstance = UUID.randomUUID().toString(); - - private String kafkaIp; - private String kafkaPort; private QueryRepository queryRepo; - private Producer<?, QueryChange> queryProducer = null; - private Consumer<?, QueryChange> queryConsumer = null; - @Rule - public KafkaTestInstanceRule rule = new KafkaTestInstanceRule(true); + public KafkaTestInstanceRule kafka = new KafkaTestInstanceRule(true); @Before public void setup() { - final Properties props = rule.createBootstrapServerConfig(); - final String location = props.getProperty(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG); - final String[] tokens = location.split(":"); - - kafkaIp = tokens[0]; - kafkaPort = tokens[1]; - - // Initialize the QueryRepository. - final Properties producerProperties = new Properties(); - producerProperties.setProperty(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, kafkaIp + ":" + kafkaPort); - producerProperties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); - producerProperties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, QueryChangeSerializer.class.getName()); - - final Properties consumerProperties = new Properties(); - consumerProperties.setProperty(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, kafkaIp + ":" + kafkaPort); - consumerProperties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); - consumerProperties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, QueryChangeDeserializer.class.getName()); - - queryProducer = new KafkaProducer<>(producerProperties); - queryConsumer = new KafkaConsumer<>(consumerProperties); - + // Make sure the topic that the change log uses exists. final String changeLogTopic = KafkaTopics.queryChangeLogTopic("" + ryaInstance); + kafka.createTopic(changeLogTopic); + + // Setup the QueryRepository used by the test. + final Producer<?, QueryChange> queryProducer = KafkaTestUtil.makeProducer(kafka, StringSerializer.class, QueryChangeSerializer.class); + final Consumer<?, QueryChange>queryConsumer = KafkaTestUtil.fromStartConsumer(kafka, StringDeserializer.class, QueryChangeDeserializer.class); final QueryChangeLog changeLog = new KafkaQueryChangeLog(queryProducer, queryConsumer, changeLogTopic); queryRepo = new InMemoryQueryRepository(changeLog); } @After - public void cleanup() { - queryProducer.close(); - queryConsumer.close(); + public void cleanup() throws Exception { + queryRepo.close(); } @Test @@ -105,8 +78,8 @@ public class AddQueryCommandIT { final String query = "SELECT * WHERE { ?person <urn:name> ?name }"; final String[] args = new String[] { "-r", "" + ryaInstance, - "-i", kafkaIp, - "-p", kafkaPort, + "-i", kafka.getKafkaHostname(), + "-p", kafka.getKafkaPort(), "-q", query }; @@ -126,8 +99,8 @@ public class AddQueryCommandIT { final String query = "SELECT * WHERE { ?person <urn:name> ?name }"; final String[] args = new String[] { "--ryaInstance", "" + ryaInstance, - "--kafkaHostname", kafkaIp, - "--kafkaPort", kafkaPort, + "--kafkaHostname", kafka.getKafkaHostname(), + "--kafkaPort", kafka.getKafkaPort(), "--query", query }; http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/94423229/extras/rya.streams/client/src/test/java/org/apache/rya/streams/client/command/DeleteQueryCommandIT.java ---------------------------------------------------------------------- diff --git a/extras/rya.streams/client/src/test/java/org/apache/rya/streams/client/command/DeleteQueryCommandIT.java b/extras/rya.streams/client/src/test/java/org/apache/rya/streams/client/command/DeleteQueryCommandIT.java index c5dad3d..91647f2 100644 --- a/extras/rya.streams/client/src/test/java/org/apache/rya/streams/client/command/DeleteQueryCommandIT.java +++ b/extras/rya.streams/client/src/test/java/org/apache/rya/streams/client/command/DeleteQueryCommandIT.java @@ -18,20 +18,15 @@ */ package org.apache.rya.streams.client.command; +import static java.util.Objects.requireNonNull; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotEquals; -import java.util.Properties; import java.util.Set; import java.util.UUID; -import org.apache.kafka.clients.CommonClientConfigs; import org.apache.kafka.clients.consumer.Consumer; -import org.apache.kafka.clients.consumer.ConsumerConfig; -import org.apache.kafka.clients.consumer.KafkaConsumer; -import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; -import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.serialization.StringSerializer; import org.apache.rya.streams.api.entity.StreamsQuery; @@ -44,8 +39,7 @@ import org.apache.rya.streams.kafka.queries.KafkaQueryChangeLog; import org.apache.rya.streams.kafka.serialization.queries.QueryChangeDeserializer; import org.apache.rya.streams.kafka.serialization.queries.QueryChangeSerializer; import org.apache.rya.test.kafka.KafkaTestInstanceRule; -import org.junit.After; -import org.junit.Before; +import org.apache.rya.test.kafka.KafkaTestUtil; import org.junit.Rule; import org.junit.Test; @@ -54,124 +48,103 @@ import org.junit.Test; */ public class DeleteQueryCommandIT { - private final String ryaInstance = UUID.randomUUID().toString(); - - private String kafkaIp; - private String kafkaPort; - - private Producer<?, QueryChange> queryProducer = null; - private Consumer<?, QueryChange> queryConsumer = null; - @Rule - public KafkaTestInstanceRule rule = new KafkaTestInstanceRule(true); - - @Before - public void setup() { - final Properties props = rule.createBootstrapServerConfig(); - final String location = props.getProperty(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG); - final String[] tokens = location.split(":"); - - kafkaIp = tokens[0]; - kafkaPort = tokens[1]; - } + public KafkaTestInstanceRule kafka = new KafkaTestInstanceRule(true); /** * This test simulates executing many commands and each of them use their own InMemoryQueryRepository. We need * to re-create the repo outside of the command to ensure it has the most up to date values inside of it. + * + * @param ryaInstance - The rya instance the repository is connected to. (not null) + * @param createTopic - Set this to true if the topic doesn't exist yet. */ - private QueryRepository makeQueryRepository() { - final Properties producerProperties = new Properties(); - producerProperties.setProperty(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, kafkaIp + ":" + kafkaPort); - producerProperties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); - producerProperties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, QueryChangeSerializer.class.getName()); - - final Properties consumerProperties = new Properties(); - consumerProperties.setProperty(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, kafkaIp + ":" + kafkaPort); - consumerProperties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); - consumerProperties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, QueryChangeDeserializer.class.getName()); - - cleanup(); - queryProducer = new KafkaProducer<>(producerProperties); - queryConsumer = new KafkaConsumer<>(consumerProperties); + private QueryRepository makeQueryRepository(final String ryaInstance, final boolean createTopic) { + requireNonNull(ryaInstance); + // Make sure the topic that the change log uses exists. final String changeLogTopic = KafkaTopics.queryChangeLogTopic("" + ryaInstance); + if(createTopic) { + kafka.createTopic(changeLogTopic); + } + + // Setup the QueryRepository used by the test. + final Producer<?, QueryChange> queryProducer = KafkaTestUtil.makeProducer(kafka, StringSerializer.class, QueryChangeSerializer.class); + final Consumer<?, QueryChange>queryConsumer = KafkaTestUtil.fromStartConsumer(kafka, StringDeserializer.class, QueryChangeDeserializer.class); final QueryChangeLog changeLog = new KafkaQueryChangeLog(queryProducer, queryConsumer, changeLogTopic); return new InMemoryQueryRepository(changeLog); } - @After - public void cleanup() { - if(queryProducer != null) { - queryProducer.close(); - } - if(queryConsumer != null) { - queryConsumer.close(); - } - } - @Test public void shortParams() throws Exception { + final String ryaInstance = UUID.randomUUID().toString(); + // Add a few queries to Rya Streams. - QueryRepository repo = makeQueryRepository(); - repo.add("query1"); - final UUID query2Id = repo.add("query2").getQueryId(); - repo.add("query3"); - - // Show that all three of the queries were added. - Set<StreamsQuery> queries = repo.list(); - assertEquals(3, queries.size()); - - // Delete query 2 using the delete query command. - final String[] deleteArgs = new String[] { - "-r", "" + ryaInstance, - "-i", kafkaIp, - "-p", kafkaPort, - "-q", query2Id.toString() - }; - - final DeleteQueryCommand deleteCommand = new DeleteQueryCommand(); - deleteCommand.execute(deleteArgs); - - // Show query2 was deleted. - repo = makeQueryRepository(); - queries = repo.list(); - assertEquals(2, queries.size()); - - for(final StreamsQuery query : queries) { - assertNotEquals(query2Id, query.getQueryId()); + try(QueryRepository repo = makeQueryRepository(ryaInstance, true)) { + repo.add("query1"); + final UUID query2Id = repo.add("query2").getQueryId(); + repo.add("query3"); + + // Show that all three of the queries were added. + Set<StreamsQuery> queries = repo.list(); + assertEquals(3, queries.size()); + + // Delete query 2 using the delete query command. + final String[] deleteArgs = new String[] { + "-r", "" + ryaInstance, + "-i", kafka.getKafkaHostname(), + "-p", kafka.getKafkaPort(), + "-q", query2Id.toString() + }; + + final DeleteQueryCommand deleteCommand = new DeleteQueryCommand(); + deleteCommand.execute(deleteArgs); + + // Show query2 was deleted. + try(QueryRepository repo2 = makeQueryRepository(ryaInstance, false)) { + queries = repo2.list(); + assertEquals(2, queries.size()); + + for(final StreamsQuery query : queries) { + assertNotEquals(query2Id, query.getQueryId()); + } + } } } @Test public void longParams() throws Exception { + final String ryaInstance = UUID.randomUUID().toString(); + // Add a few queries to Rya Streams. - QueryRepository repo = makeQueryRepository(); - repo.add("query1"); - final UUID query2Id = repo.add("query2").getQueryId(); - repo.add("query3"); - - // Show that all three of the queries were added. - Set<StreamsQuery> queries = repo.list(); - assertEquals(3, queries.size()); - - // Delete query 2 using the delete query command. - final String[] deleteArgs = new String[] { - "--ryaInstance", "" + ryaInstance, - "--kafkaHostname", kafkaIp, - "--kafkaPort", kafkaPort, - "--queryID", query2Id.toString() - }; - - final DeleteQueryCommand deleteCommand = new DeleteQueryCommand(); - deleteCommand.execute(deleteArgs); - - // Show query2 was deleted. - repo = makeQueryRepository(); - queries = repo.list(); - assertEquals(2, queries.size()); - - for(final StreamsQuery query : queries) { - assertNotEquals(query2Id, query.getQueryId()); + try(QueryRepository repo = makeQueryRepository(ryaInstance, true)) { + repo.add("query1"); + final UUID query2Id = repo.add("query2").getQueryId(); + repo.add("query3"); + + // Show that all three of the queries were added. + Set<StreamsQuery> queries = repo.list(); + assertEquals(3, queries.size()); + + // Delete query 2 using the delete query command. + final String[] deleteArgs = new String[] { + "--ryaInstance", "" + ryaInstance, + "--kafkaHostname", kafka.getKafkaHostname(), + "--kafkaPort", kafka.getKafkaPort(), + "--queryID", query2Id.toString() + }; + + final DeleteQueryCommand deleteCommand = new DeleteQueryCommand(); + deleteCommand.execute(deleteArgs); + + // Show query2 was deleted. + try(QueryRepository repo2 = makeQueryRepository(ryaInstance, false)) { + queries = repo2.list(); + assertEquals(2, queries.size()); + + for(final StreamsQuery query : queries) { + assertNotEquals(query2Id, query.getQueryId()); + } + } } } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/94423229/extras/rya.streams/client/src/test/java/org/apache/rya/streams/client/command/ListQueryCommandIT.java ---------------------------------------------------------------------- diff --git a/extras/rya.streams/client/src/test/java/org/apache/rya/streams/client/command/ListQueryCommandIT.java b/extras/rya.streams/client/src/test/java/org/apache/rya/streams/client/command/ListQueryCommandIT.java index b32967e..00b4ce0 100644 --- a/extras/rya.streams/client/src/test/java/org/apache/rya/streams/client/command/ListQueryCommandIT.java +++ b/extras/rya.streams/client/src/test/java/org/apache/rya/streams/client/command/ListQueryCommandIT.java @@ -18,16 +18,10 @@ */ package org.apache.rya.streams.client.command; -import java.util.Properties; import java.util.UUID; -import org.apache.kafka.clients.CommonClientConfigs; import org.apache.kafka.clients.consumer.Consumer; -import org.apache.kafka.clients.consumer.ConsumerConfig; -import org.apache.kafka.clients.consumer.KafkaConsumer; -import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; -import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.serialization.StringSerializer; import org.apache.rya.streams.api.queries.InMemoryQueryRepository; @@ -39,6 +33,7 @@ import org.apache.rya.streams.kafka.queries.KafkaQueryChangeLog; import org.apache.rya.streams.kafka.serialization.queries.QueryChangeDeserializer; import org.apache.rya.streams.kafka.serialization.queries.QueryChangeSerializer; import org.apache.rya.test.kafka.KafkaTestInstanceRule; +import org.apache.rya.test.kafka.KafkaTestUtil; import org.junit.After; import org.junit.Before; import org.junit.Rule; @@ -50,52 +45,29 @@ import org.junit.Test; public class ListQueryCommandIT { private final String ryaInstance = UUID.randomUUID().toString(); - - private String kafkaIp; - private String kafkaPort; private QueryRepository queryRepo; - private Producer<?, QueryChange> queryProducer = null; - private Consumer<?, QueryChange> queryConsumer = null; - @Rule - public KafkaTestInstanceRule rule = new KafkaTestInstanceRule(true); + public KafkaTestInstanceRule kafka = new KafkaTestInstanceRule(true); @Before public void setup() { - final Properties props = rule.createBootstrapServerConfig(); - final String location = props.getProperty(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG); - final String[] tokens = location.split(":"); - - kafkaIp = tokens[0]; - kafkaPort = tokens[1]; - - // Initialize the QueryRepository. - final Properties producerProperties = new Properties(); - producerProperties.setProperty(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, kafkaIp + ":" + kafkaPort); - producerProperties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); - producerProperties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, QueryChangeSerializer.class.getName()); - - final Properties consumerProperties = new Properties(); - consumerProperties.setProperty(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, kafkaIp + ":" + kafkaPort); - consumerProperties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); - consumerProperties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, QueryChangeDeserializer.class.getName()); - - queryProducer = new KafkaProducer<>(producerProperties); - queryConsumer = new KafkaConsumer<>(consumerProperties); - + // Make sure the topic that the change log uses exists. final String changeLogTopic = KafkaTopics.queryChangeLogTopic("" + ryaInstance); + kafka.createTopic(changeLogTopic); + + // Setup the QueryRepository used by the test. + final Producer<?, QueryChange> queryProducer = KafkaTestUtil.makeProducer(kafka, StringSerializer.class, QueryChangeSerializer.class); + final Consumer<?, QueryChange>queryConsumer = KafkaTestUtil.fromStartConsumer(kafka, StringDeserializer.class, QueryChangeDeserializer.class); final QueryChangeLog changeLog = new KafkaQueryChangeLog(queryProducer, queryConsumer, changeLogTopic); queryRepo = new InMemoryQueryRepository(changeLog); } @After - public void cleanup() { - queryProducer.close(); - queryConsumer.close(); + public void cleanup() throws Exception { + queryRepo.close(); } - @Test public void shortParams() throws Exception { // Add a few queries to Rya Streams. @@ -106,8 +78,8 @@ public class ListQueryCommandIT { // Execute the List Queries command. final String[] args = new String[] { "-r", "" + ryaInstance, - "-i", kafkaIp, - "-p", kafkaPort + "-i", kafka.getKafkaHostname(), + "-p", kafka.getKafkaPort() }; final ListQueriesCommand command = new ListQueriesCommand(); @@ -124,8 +96,8 @@ public class ListQueryCommandIT { // Execute the List Queries command. final String[] args = new String[] { "--ryaInstance", "" + ryaInstance, - "--kafkaHostname", kafkaIp, - "--kafkaPort", kafkaPort + "--kafkaHostname", kafka.getKafkaHostname(), + "--kafkaPort", kafka.getKafkaPort() }; final ListQueriesCommand command = new ListQueriesCommand(); http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/94423229/extras/rya.streams/client/src/test/java/org/apache/rya/streams/client/command/LoadStatementsCommandIT.java ---------------------------------------------------------------------- diff --git a/extras/rya.streams/client/src/test/java/org/apache/rya/streams/client/command/LoadStatementsCommandIT.java b/extras/rya.streams/client/src/test/java/org/apache/rya/streams/client/command/LoadStatementsCommandIT.java index 95a4876..03c31b4 100644 --- a/extras/rya.streams/client/src/test/java/org/apache/rya/streams/client/command/LoadStatementsCommandIT.java +++ b/extras/rya.streams/client/src/test/java/org/apache/rya/streams/client/command/LoadStatementsCommandIT.java @@ -26,21 +26,16 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Iterator; import java.util.List; -import java.util.Properties; import java.util.UUID; -import org.apache.kafka.clients.CommonClientConfigs; import org.apache.kafka.clients.consumer.Consumer; -import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; -import org.apache.kafka.clients.consumer.ConsumerRecords; -import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.rya.api.model.VisibilityStatement; import org.apache.rya.streams.kafka.KafkaTopics; import org.apache.rya.streams.kafka.serialization.VisibilityStatementDeserializer; import org.apache.rya.test.kafka.KafkaTestInstanceRule; -import org.junit.Before; +import org.apache.rya.test.kafka.KafkaTestUtil; import org.junit.Rule; import org.junit.Test; import org.openrdf.model.ValueFactory; @@ -55,21 +50,8 @@ public class LoadStatementsCommandIT { private final String ryaInstance = UUID.randomUUID().toString(); - private String kafkaIp; - private String kafkaPort; - @Rule - public KafkaTestInstanceRule rule = new KafkaTestInstanceRule(true); - - @Before - public void setup() { - final Properties props = rule.createBootstrapServerConfig(); - final String location = props.getProperty(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG); - final String[] tokens = location.split(":"); - - kafkaIp = tokens[0]; - kafkaPort = tokens[1]; - } + public KafkaTestInstanceRule kafka = new KafkaTestInstanceRule(true); @Test public void shortParams() throws Exception { @@ -77,35 +59,27 @@ public class LoadStatementsCommandIT { final String visibilities = "a|b|c"; final String[] args = new String[] { "-r", "" + ryaInstance, - "-i", kafkaIp, - "-p", kafkaPort, + "-i", kafka.getKafkaHostname(), + "-p", kafka.getKafkaPort(), "-f", TURTLE_FILE.toString(), "-v", visibilities }; + // Load the file of statements into the Statements topic. new LoadStatementsCommand().execute(args); // Show that the statements were loaded into the topic. - // Read a VisibilityBindingSet from the test topic. final List<VisibilityStatement> read = new ArrayList<>(); - final Properties consumerProps = new Properties(); - consumerProps.setProperty(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, kafkaIp + ":" + kafkaPort); - consumerProps.setProperty(ConsumerConfig.GROUP_ID_CONFIG, UUID.randomUUID().toString()); - consumerProps.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); - consumerProps.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); - consumerProps.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, VisibilityStatementDeserializer.class.getName()); - - try(final Consumer<String, VisibilityStatement> consumer = new KafkaConsumer<>(consumerProps)) { - final String topic = KafkaTopics.statementsTopic(ryaInstance); - consumer.subscribe(Arrays.asList(topic)); - final ConsumerRecords<String, VisibilityStatement> records = consumer.poll(3000); + try(final Consumer<String, VisibilityStatement> consumer = + KafkaTestUtil.fromStartConsumer(kafka, StringDeserializer.class, VisibilityStatementDeserializer.class)) { + // Subscribe for messages. + consumer.subscribe( Arrays.asList(KafkaTopics.statementsTopic(ryaInstance)) ); - assertEquals(3, records.count()); - final Iterator<ConsumerRecord<String, VisibilityStatement>> iter = records.iterator(); + // Read the messages and extract their values. + final Iterator<ConsumerRecord<String, VisibilityStatement>> iter = consumer.poll(3000).iterator(); while(iter.hasNext()) { - final VisibilityStatement visiSet = iter.next().value(); - read.add(visiSet); + read.add( iter.next().value() ); } } @@ -131,35 +105,27 @@ public class LoadStatementsCommandIT { final String visibilities = "a|b|c"; final String[] args = new String[] { "--ryaInstance", "" + ryaInstance, - "--kafkaHostname", kafkaIp, - "--kafkaPort", kafkaPort, + "--kafkaHostname", kafka.getKafkaHostname(), + "--kafkaPort", kafka.getKafkaPort(), "--statementsFile", TURTLE_FILE.toString(), "--visibilities", visibilities }; + // Load the file of statements into the Statements topic. new LoadStatementsCommand().execute(args); // Show that the statements were loaded into the topic. - // Read a VisibilityBindingSet from the test topic. final List<VisibilityStatement> read = new ArrayList<>(); - final Properties consumerProps = new Properties(); - consumerProps.setProperty(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, kafkaIp + ":" + kafkaPort); - consumerProps.setProperty(ConsumerConfig.GROUP_ID_CONFIG, UUID.randomUUID().toString()); - consumerProps.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); - consumerProps.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); - consumerProps.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, VisibilityStatementDeserializer.class.getName()); - - try(final Consumer<String, VisibilityStatement> consumer = new KafkaConsumer<>(consumerProps)) { - final String topic = KafkaTopics.statementsTopic(ryaInstance); - consumer.subscribe(Arrays.asList(topic)); - final ConsumerRecords<String, VisibilityStatement> records = consumer.poll(3000); + try(final Consumer<String, VisibilityStatement> consumer = + KafkaTestUtil.fromStartConsumer(kafka, StringDeserializer.class, VisibilityStatementDeserializer.class)) { + // Subscribe for messages. + consumer.subscribe( Arrays.asList(KafkaTopics.statementsTopic(ryaInstance)) ); - assertEquals(3, records.count()); - final Iterator<ConsumerRecord<String, VisibilityStatement>> iter = records.iterator(); + // Read the messages and extract their values. + final Iterator<ConsumerRecord<String, VisibilityStatement>> iter = consumer.poll(3000).iterator(); while(iter.hasNext()) { - final VisibilityStatement visiSet = iter.next().value(); - read.add(visiSet); + read.add( iter.next().value() ); } } http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/94423229/extras/rya.streams/client/src/test/java/org/apache/rya/streams/client/command/RunQueryCommandIT.java ---------------------------------------------------------------------- diff --git a/extras/rya.streams/client/src/test/java/org/apache/rya/streams/client/command/RunQueryCommandIT.java b/extras/rya.streams/client/src/test/java/org/apache/rya/streams/client/command/RunQueryCommandIT.java new file mode 100644 index 0000000..788b41f --- /dev/null +++ b/extras/rya.streams/client/src/test/java/org/apache/rya/streams/client/command/RunQueryCommandIT.java @@ -0,0 +1,196 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.streams.client.command; + +import static org.junit.Assert.assertEquals; + +import java.util.ArrayList; +import java.util.List; +import java.util.UUID; + +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.serialization.StringSerializer; +import org.apache.rya.api.model.VisibilityBindingSet; +import org.apache.rya.api.model.VisibilityStatement; +import org.apache.rya.streams.api.entity.StreamsQuery; +import org.apache.rya.streams.api.interactor.LoadStatements; +import org.apache.rya.streams.api.queries.InMemoryQueryRepository; +import org.apache.rya.streams.api.queries.QueryChange; +import org.apache.rya.streams.api.queries.QueryChangeLog; +import org.apache.rya.streams.api.queries.QueryRepository; +import org.apache.rya.streams.client.RyaStreamsCommand.ArgumentsException; +import org.apache.rya.streams.client.RyaStreamsCommand.ExecutionException; +import org.apache.rya.streams.kafka.KafkaTopics; +import org.apache.rya.streams.kafka.interactor.KafkaLoadStatements; +import org.apache.rya.streams.kafka.queries.KafkaQueryChangeLog; +import org.apache.rya.streams.kafka.serialization.VisibilityBindingSetDeserializer; +import org.apache.rya.streams.kafka.serialization.VisibilityStatementSerializer; +import org.apache.rya.streams.kafka.serialization.queries.QueryChangeDeserializer; +import org.apache.rya.streams.kafka.serialization.queries.QueryChangeSerializer; +import org.apache.rya.test.kafka.KafkaTestInstanceRule; +import org.apache.rya.test.kafka.KafkaTestUtil; +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.openrdf.model.ValueFactory; +import org.openrdf.model.impl.ValueFactoryImpl; +import org.openrdf.query.impl.MapBindingSet; + +import com.google.common.collect.Lists; + +/** + * Integration tests the methods of {@link RunQueryCommand}. + */ +public class RunQueryCommandIT { + + private final String ryaInstance = UUID.randomUUID().toString(); + + private QueryRepository queryRepo; + private Producer<String, VisibilityStatement> stmtProducer = null; + private Consumer<String, VisibilityBindingSet> resultConsumer = null; + + @Rule + public KafkaTestInstanceRule kafka = new KafkaTestInstanceRule(true); + + @Before + public void setup() { + // Make sure the topic that the change log uses exists. + final String changeLogTopic = KafkaTopics.queryChangeLogTopic("" + ryaInstance); + kafka.createTopic(changeLogTopic); + + // Setup the QueryRepository used by the test. + final Producer<?, QueryChange> queryProducer = KafkaTestUtil.makeProducer(kafka, StringSerializer.class, QueryChangeSerializer.class); + final Consumer<?, QueryChange>queryConsumer = KafkaTestUtil.fromStartConsumer(kafka, StringDeserializer.class, QueryChangeDeserializer.class); + final QueryChangeLog changeLog = new KafkaQueryChangeLog(queryProducer, queryConsumer, changeLogTopic); + queryRepo = new InMemoryQueryRepository(changeLog); + + // Initialize the Statements Producer and the Results Consumer. + stmtProducer = KafkaTestUtil.makeProducer(kafka, StringSerializer.class, VisibilityStatementSerializer.class); + resultConsumer = KafkaTestUtil.fromStartConsumer(kafka, StringDeserializer.class, VisibilityBindingSetDeserializer.class); + } + + @After + public void cleanup() throws Exception{ + stmtProducer.close(); + resultConsumer.close(); + queryRepo.close(); + } + + @Test(expected = ExecutionException.class) + public void runUnregisteredQuery() throws Exception { + // Arguments that run a query that is not registered with Rya Streams. + final String[] args = new String[] { + "--ryaInstance", "" + ryaInstance, + "--kafkaHostname", kafka.getKafkaHostname(), + "--kafkaPort", kafka.getKafkaPort(), + "--queryID", UUID.randomUUID().toString() + }; + + // Run the test. This will throw an exception. + final RunQueryCommand command = new RunQueryCommand(); + command.execute(args); + } + + @Test + public void runQuery() throws Exception { + // Register a query with the Query Repository. + final StreamsQuery sQuery = queryRepo.add("SELECT * WHERE { ?person <urn:worksAt> ?business . }"); + + // Arguments that run the query we just registered with Rya Streams. + final String[] args = new String[] { + "--ryaInstance", "" + ryaInstance, + "--kafkaHostname", kafka.getKafkaHostname(), + "--kafkaPort", kafka.getKafkaPort(), + "--queryID", sQuery.getQueryId().toString(), + "--zookeepers", kafka.getZookeeperServers() + }; + + // Create a new Thread that runs the command. + final Thread commandThread = new Thread() { + @Override + public void run() { + final RunQueryCommand command = new RunQueryCommand(); + try { + command.execute(args); + } catch (ArgumentsException | ExecutionException e) { + // Do nothing. Test will still fail because the expected results will be missing. + } + } + }; + + // Create the statements that will be loaded. + final ValueFactory vf = new ValueFactoryImpl(); + final List<VisibilityStatement> statements = new ArrayList<>(); + statements.add(new VisibilityStatement(vf.createStatement( + vf.createURI("urn:Alice"), + vf.createURI("urn:worksAt"), + vf.createURI("urn:BurgerJoint")), "a")); + statements.add(new VisibilityStatement(vf.createStatement( + vf.createURI("urn:Bob"), + vf.createURI("urn:worksAt"), + vf.createURI("urn:TacoShop")), "a")); + statements.add(new VisibilityStatement(vf.createStatement( + vf.createURI("urn:Charlie"), + vf.createURI("urn:worksAt"), + vf.createURI("urn:TacoShop")), "a")); + + // Create the expected results. + final List<VisibilityBindingSet> expected = new ArrayList<>(); + MapBindingSet bs = new MapBindingSet(); + bs.addBinding("person", vf.createURI("urn:Alice")); + bs.addBinding("business", vf.createURI("urn:BurgerJoint")); + expected.add(new VisibilityBindingSet(bs, "a")); + bs = new MapBindingSet(); + bs.addBinding("person", vf.createURI("urn:Bob")); + bs.addBinding("business", vf.createURI("urn:TacoShop")); + expected.add(new VisibilityBindingSet(bs, "a")); + bs = new MapBindingSet(); + bs.addBinding("person", vf.createURI("urn:Charlie")); + bs.addBinding("business", vf.createURI("urn:TacoShop")); + expected.add(new VisibilityBindingSet(bs, "a")); + + // Execute the test. This will result in a set of results that were read from the results topic. + final List<VisibilityBindingSet> results; + try { + // Wait for the program to start. + commandThread.start(); + Thread.sleep(5000); + + // Write some statements to the program. + final String statementsTopic = KafkaTopics.statementsTopic(ryaInstance); + final LoadStatements loadStatements = new KafkaLoadStatements(statementsTopic, stmtProducer); + loadStatements.fromCollection(statements); + + // Read the output of the streams program. + final String resultsTopic = KafkaTopics.queryResultsTopic(sQuery.getQueryId()); + resultConsumer.subscribe( Lists.newArrayList(resultsTopic) ); + results = KafkaTestUtil.pollForResults(500, 6, 3, resultConsumer); + } finally { + // Tear down the test. + commandThread.interrupt(); + commandThread.join(3000); + } + + // Show the read results matched the expected ones. + assertEquals(expected, results); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/94423229/extras/rya.streams/kafka/pom.xml ---------------------------------------------------------------------- diff --git a/extras/rya.streams/kafka/pom.xml b/extras/rya.streams/kafka/pom.xml index 16a8b8e..0ccbb6e 100644 --- a/extras/rya.streams/kafka/pom.xml +++ b/extras/rya.streams/kafka/pom.xml @@ -55,6 +55,16 @@ under the License. <!-- Kafka dependencies --> <dependency> <groupId>org.apache.kafka</groupId> + <artifactId>kafka_2.11</artifactId> + <exclusions> + <exclusion> + <artifactId>slf4j-log4j12</artifactId> + <groupId>org.slf4j</groupId> + </exclusion> + </exclusions> + </dependency> + <dependency> + <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> </dependency> <dependency> http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/94423229/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/KafkaTopics.java ---------------------------------------------------------------------- diff --git a/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/KafkaTopics.java b/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/KafkaTopics.java index a8fbf23..3e0df50 100644 --- a/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/KafkaTopics.java +++ b/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/KafkaTopics.java @@ -20,12 +20,19 @@ package org.apache.rya.streams.kafka; import static java.util.Objects.requireNonNull; +import java.util.Properties; +import java.util.Set; import java.util.UUID; +import org.I0Itec.zkclient.ZkClient; import org.apache.rya.streams.api.queries.QueryChangeLog; import edu.umd.cs.findbugs.annotations.DefaultAnnotation; import edu.umd.cs.findbugs.annotations.NonNull; +import kafka.admin.AdminUtils; +import kafka.admin.RackAwareMode; +import kafka.utils.ZKStringSerializer$; +import kafka.utils.ZkUtils; /** * Creates the Kafka topic names that are used for Rya Streams systems. @@ -66,4 +73,36 @@ public class KafkaTopics { requireNonNull(queryId); return "QueryResults-" + queryId.toString(); } + + /** + * Creates a set of Kafka topics for each topic that does not already exist. + * + * @param zookeeperServers - The Zookeeper servers that are used by the Kafka Streams program. (not null) + * @param topicNames - The topics that will be created. (not null) + * @param partitions - The number of partitions that each of the topics will have. + * @param replicationFactor - The replication factor of the topics that are created. + */ + public static void createTopic( + final String zookeeperServers, + final Set<String> topicNames, + final int partitions, + final int replicationFactor) { + requireNonNull(zookeeperServers); + requireNonNull(topicNames); + + ZkUtils zkUtils = null; + try { + zkUtils = ZkUtils.apply(new ZkClient(zookeeperServers, 30000, 30000, ZKStringSerializer$.MODULE$), false); + for(final String topicName : topicNames) { + if(!AdminUtils.topicExists(zkUtils, topicName)) { + AdminUtils.createTopic(zkUtils, topicName, partitions, replicationFactor, new Properties(), RackAwareMode.Disabled$.MODULE$); + } + } + } + finally { + if(zkUtils != null) { + zkUtils.close(); + } + } + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/94423229/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/interactor/KafkaLoadStatements.java ---------------------------------------------------------------------- diff --git a/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/interactor/KafkaLoadStatements.java b/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/interactor/KafkaLoadStatements.java index 8ab3ab6..d3ec650 100644 --- a/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/interactor/KafkaLoadStatements.java +++ b/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/interactor/KafkaLoadStatements.java @@ -64,7 +64,6 @@ public class KafkaLoadStatements implements LoadStatements { this.producer = requireNonNull(producer); } - @Override public void fromFile(final Path statementsPath, final String visibilities) throws RyaStreamsException { requireNonNull(statementsPath); @@ -77,7 +76,7 @@ public class KafkaLoadStatements implements LoadStatements { parser.setRDFHandler(new RDFHandlerBase() { @Override public void startRDF() throws RDFHandlerException { - log.trace("starting loading statements."); + log.trace("Starting loading statements."); } @Override @@ -89,7 +88,7 @@ public class KafkaLoadStatements implements LoadStatements { @Override public void endRDF() throws RDFHandlerException { producer.flush(); - log.trace("done."); + log.trace("Done."); } }); http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/94423229/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/interactor/KafkaRunQuery.java ---------------------------------------------------------------------- diff --git a/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/interactor/KafkaRunQuery.java b/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/interactor/KafkaRunQuery.java new file mode 100644 index 0000000..e587998 --- /dev/null +++ b/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/interactor/KafkaRunQuery.java @@ -0,0 +1,136 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.streams.kafka.interactor; + +import static java.util.Objects.requireNonNull; + +import java.util.Optional; +import java.util.Properties; +import java.util.UUID; +import java.util.concurrent.CountDownLatch; + +import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.streams.KafkaStreams; +import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.processor.TopologyBuilder; +import org.apache.rya.api.function.projection.RandomUUIDFactory; +import org.apache.rya.streams.api.entity.StreamsQuery; +import org.apache.rya.streams.api.exception.RyaStreamsException; +import org.apache.rya.streams.api.interactor.RunQuery; +import org.apache.rya.streams.api.queries.QueryRepository; +import org.apache.rya.streams.kafka.topology.TopologyBuilderFactory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import edu.umd.cs.findbugs.annotations.DefaultAnnotation; +import edu.umd.cs.findbugs.annotations.NonNull; + +/** + * A Kafka Streams implementation of {@link RunQuery}. + */ +@DefaultAnnotation(NonNull.class) +public class KafkaRunQuery implements RunQuery { + private static final Logger log = LoggerFactory.getLogger(KafkaRunQuery.class); + + private final String kafkaHostname; + private final String kafkaPort; + private final String statementsTopic; + private final String resultsTopic; + private final TopologyBuilderFactory topologyFactory; + private final QueryRepository queryRepo; + + /** + * Constructs an instance of {@link KafkaRunQuery}. + * + * @param kafkaHostname - The hostname of the Kafka Broker to connect to. (not null) + * @param kafkaPort - The port of the Kafka Broker to connect to. (not null) + * @param statementsTopic - The name of the topic that statements will be read from. (not null) + * @param resultsTopic - The name of the topic that query results will be writen to. (not null) + * @param queryRepo - The query repository that holds queries that are registered. (not null) + * @param topologyFactory - Builds Kafka Stream processing topologies from SPARQL. (not null) + */ + public KafkaRunQuery( + final String kafkaHostname, + final String kafkaPort, + final String statementsTopic, + final String resultsTopic, + final QueryRepository queryRepo, + final TopologyBuilderFactory topologyFactory) { + this.kafkaHostname = requireNonNull( kafkaHostname ); + this.kafkaPort = requireNonNull( kafkaPort ); + this.statementsTopic = requireNonNull(statementsTopic ); + this.resultsTopic = requireNonNull( resultsTopic ); + this.topologyFactory = requireNonNull( topologyFactory ); + this.queryRepo = requireNonNull( queryRepo ); + } + + @Override + public void run(final UUID queryId) throws RyaStreamsException { + requireNonNull(queryId); + + // Fetch the query from the repository. Throw an exception if it isn't present. + final Optional<StreamsQuery> query = queryRepo.get(queryId); + if(!query.isPresent()) { + throw new RyaStreamsException("Could not run the Query with ID " + queryId + " because no such query " + + "is currently registered."); + } + + // Build a processing topology using the SPARQL, provided statements topic, and provided results topic. + final String sparql = query.get().getSparql(); + final TopologyBuilder topologyBuilder; + try { + topologyBuilder = topologyFactory.build(sparql, statementsTopic, resultsTopic, new RandomUUIDFactory()); + } catch (final Exception e) { + throw new RyaStreamsException("Could not run the Query with ID " + queryId + " because a processing " + + "topolgoy could not be built for the SPARQL " + sparql, e); + } + + // Setup the Kafka Stream program. + final Properties streamsProps = new Properties(); + streamsProps.setProperty(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, kafkaHostname + ":" + kafkaPort); + streamsProps.put(StreamsConfig.APPLICATION_ID_CONFIG, "KafkaRunQuery-" + UUID.randomUUID()); + + final KafkaStreams streams = new KafkaStreams(topologyBuilder, new StreamsConfig(streamsProps)); + + // If an unhandled exception is thrown, rethrow it. + streams.setUncaughtExceptionHandler((t, e) -> { + // Log the problem and kill the program. + log.error("Unhandled exception while processing the Rya Streams query. Shutting down.", e); + System.exit(1); + }); + + // Setup a shutdown hook that kills the streams program at shutdown. + final CountDownLatch awaitTermination = new CountDownLatch(1); + Runtime.getRuntime().addShutdownHook(new Thread() { + @Override + public void run() { + awaitTermination.countDown(); + } + }); + + // Run the streams program and wait for termination. + streams.start(); + try { + awaitTermination.await(); + } catch (final InterruptedException e) { + log.warn("Interrupted while waiting for termination. Shutting down."); + } + streams.close(); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/94423229/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/processors/join/KeyValueJoinStateStore.java ---------------------------------------------------------------------- diff --git a/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/processors/join/KeyValueJoinStateStore.java b/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/processors/join/KeyValueJoinStateStore.java index d73b40e..d12957a 100644 --- a/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/processors/join/KeyValueJoinStateStore.java +++ b/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/processors/join/KeyValueJoinStateStore.java @@ -34,6 +34,7 @@ import org.openrdf.query.impl.MapBindingSet; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.google.common.base.Charsets; import com.google.common.base.Joiner; import edu.umd.cs.findbugs.annotations.DefaultAnnotation; @@ -70,12 +71,12 @@ public class KeyValueJoinStateStore implements JoinStateStore { /** * This is the minimum value in UTF-8 character. */ - private static final String START_RANGE_SUFFIX = new String(new byte[] { 0x00 } ); + private static final String START_RANGE_SUFFIX = new String(new byte[] { 0x00 }, Charsets.UTF_8); /** * This is the maximum value of a UTF-8 character. */ - private static final String END_RANGE_SUFFIX = new String(new byte[] { (byte) 0XFF } ); + private static final String END_RANGE_SUFFIX = new String(new byte[] { (byte) 0XFF }, Charsets.UTF_8); /** * A default empty value that is stored for a start of range or end of range marker.