Repository: incubator-rya Updated Branches: refs/heads/master 6ec8cd2aa -> 3d4a5d0e6
RYA-442 Implementing the Start and Stop Query interactors. Closes #265. Project: http://git-wip-us.apache.org/repos/asf/incubator-rya/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-rya/commit/3d4a5d0e Tree: http://git-wip-us.apache.org/repos/asf/incubator-rya/tree/3d4a5d0e Diff: http://git-wip-us.apache.org/repos/asf/incubator-rya/diff/3d4a5d0e Branch: refs/heads/master Commit: 3d4a5d0e6e5766c2285fc5e359fb0d645818a8e9 Parents: 6ec8cd2 Author: kchilton2 <[email protected]> Authored: Fri Jan 12 14:48:55 2018 -0500 Committer: caleb <[email protected]> Committed: Thu Jan 18 14:28:24 2018 -0500 ---------------------------------------------------------------------- .../rya/streams/api/entity/StreamsQuery.java | 19 ++- .../rya/streams/api/interactor/AddQuery.java | 4 +- .../interactor/defaults/DefaultAddQuery.java | 4 +- .../interactor/defaults/DefaultStartQuery.java | 54 +++++++ .../interactor/defaults/DefaultStopQuery.java | 54 +++++++ .../api/queries/InMemoryQueryRepository.java | 119 ++++++++++----- .../rya/streams/api/queries/QueryChange.java | 46 +++++- .../streams/api/queries/QueryRepository.java | 15 +- .../defaults/DefaultAddQueryTest.java | 6 +- .../queries/InMemoryQueryRepositoryTest.java | 39 +++-- .../streams/client/command/AddQueryCommand.java | 6 +- .../client/command/AddQueryCommandIT.java | 6 +- .../client/command/DeleteQueryCommandIT.java | 146 +++++++++---------- .../client/command/ListQueryCommandIT.java | 12 +- .../client/command/RunQueryCommandIT.java | 2 +- .../kafka/queries/KafkaQueryChangeLog.java | 25 ++-- .../kafka/interactor/KafkaRunQueryIT.java | 2 +- .../kafka/queries/KafkaQueryChangeLogIT.java | 38 ++++- 18 files changed, 424 insertions(+), 173 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/3d4a5d0e/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/entity/StreamsQuery.java ---------------------------------------------------------------------- diff --git a/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/entity/StreamsQuery.java b/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/entity/StreamsQuery.java index 8239025..bd750a6 100644 --- a/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/entity/StreamsQuery.java +++ b/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/entity/StreamsQuery.java @@ -1,4 +1,4 @@ -/* +/** * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -34,16 +34,19 @@ public class StreamsQuery { private final UUID queryId; private final String sparql; + private final boolean isActive; /** * Constructs an instance of {@link StreamsQuery}. * * @param queryId - Uniquely identifies the query within Rya Streams. (not null) * @param sparql - The SPARQL query that defines how statements will be processed. (not null) + * @param isActive - {@code true} if Rya Streams should process this query; otherwise {@code false}. */ - public StreamsQuery(final UUID queryId, final String sparql) { + public StreamsQuery(final UUID queryId, final String sparql, final boolean isActive) { this.queryId = requireNonNull(queryId); this.sparql = requireNonNull(sparql); + this.isActive = isActive; } /** @@ -60,9 +63,16 @@ public class StreamsQuery { return sparql; } + /** + * @return {@code true} if Rya Streams should process this query; otherwise {@code false}. + */ + public boolean isActive() { + return isActive; + } + @Override public int hashCode() { - return Objects.hash(queryId, sparql); + return Objects.hash(queryId, sparql, isActive); } @Override @@ -70,7 +80,8 @@ public class StreamsQuery { if(o instanceof StreamsQuery) { final StreamsQuery other = (StreamsQuery) o; return Objects.equals(queryId, other.queryId) && - Objects.equals(sparql, other.sparql); + Objects.equals(sparql, other.sparql) && + isActive == other.isActive; } return false; } http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/3d4a5d0e/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/interactor/AddQuery.java ---------------------------------------------------------------------- diff --git a/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/interactor/AddQuery.java b/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/interactor/AddQuery.java index 8915d98..9889fd0 100644 --- a/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/interactor/AddQuery.java +++ b/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/interactor/AddQuery.java @@ -34,8 +34,10 @@ public interface AddQuery { * Adds a query to the Rya Streams system. * * @param query - The SPARQL query that will be added. (not null) + * @param isActive - {@code true} if the query needs to be maintained by + * Rya Streams; otherwise {@code false}. * @return The {@link StreamsQuery} used by Rya Streams for this query. * @throws RyaStreamsException The query could not be added to Rya Streams. */ - public StreamsQuery addQuery(final String query) throws RyaStreamsException; + public StreamsQuery addQuery(final String query, boolean isActive) throws RyaStreamsException; } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/3d4a5d0e/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/interactor/defaults/DefaultAddQuery.java ---------------------------------------------------------------------- diff --git a/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/interactor/defaults/DefaultAddQuery.java b/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/interactor/defaults/DefaultAddQuery.java index f94835c..edd90fd 100644 --- a/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/interactor/defaults/DefaultAddQuery.java +++ b/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/interactor/defaults/DefaultAddQuery.java @@ -49,7 +49,7 @@ public class DefaultAddQuery implements AddQuery { } @Override - public StreamsQuery addQuery(final String query) throws RyaStreamsException { + public StreamsQuery addQuery(final String query, final boolean isActive) throws RyaStreamsException { requireNonNull(query); // Make sure the SPARQL is valid. @@ -60,6 +60,6 @@ public class DefaultAddQuery implements AddQuery { } // If it is, then store it in the repository. - return repository.add(query); + return repository.add(query, isActive); } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/3d4a5d0e/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/interactor/defaults/DefaultStartQuery.java ---------------------------------------------------------------------- diff --git a/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/interactor/defaults/DefaultStartQuery.java b/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/interactor/defaults/DefaultStartQuery.java new file mode 100644 index 0000000..3ee693e --- /dev/null +++ b/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/interactor/defaults/DefaultStartQuery.java @@ -0,0 +1,54 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.streams.api.interactor.defaults; + +import static java.util.Objects.requireNonNull; + +import java.util.UUID; + +import org.apache.rya.streams.api.exception.RyaStreamsException; +import org.apache.rya.streams.api.interactor.StartQuery; +import org.apache.rya.streams.api.queries.QueryRepository; + +import edu.umd.cs.findbugs.annotations.DefaultAnnotation; +import edu.umd.cs.findbugs.annotations.NonNull; + +/** + * Start a query that is managed by Rya Streams. + */ +@DefaultAnnotation(NonNull.class) +public class DefaultStartQuery implements StartQuery { + + private final QueryRepository repository; + + /** + * Constructs an instance of {@link DefaultStartQuery}. + * + * @param repository - The {@link QueryRepository} that will be interacted with. (not null) + */ + public DefaultStartQuery(final QueryRepository repository) { + this.repository = requireNonNull(repository); + } + + @Override + public void start(final UUID queryId) throws RyaStreamsException { + requireNonNull(queryId); + repository.updateIsActive(queryId, true); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/3d4a5d0e/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/interactor/defaults/DefaultStopQuery.java ---------------------------------------------------------------------- diff --git a/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/interactor/defaults/DefaultStopQuery.java b/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/interactor/defaults/DefaultStopQuery.java new file mode 100644 index 0000000..382b0f1 --- /dev/null +++ b/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/interactor/defaults/DefaultStopQuery.java @@ -0,0 +1,54 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.streams.api.interactor.defaults; + +import static java.util.Objects.requireNonNull; + +import java.util.UUID; + +import org.apache.rya.streams.api.exception.RyaStreamsException; +import org.apache.rya.streams.api.interactor.StopQuery; +import org.apache.rya.streams.api.queries.QueryRepository; + +import edu.umd.cs.findbugs.annotations.DefaultAnnotation; +import edu.umd.cs.findbugs.annotations.NonNull; + +/** + * Stop a query that is managed by Rya Streams. + */ +@DefaultAnnotation(NonNull.class) +public class DefaultStopQuery implements StopQuery { + + private final QueryRepository repository; + + /** + * Constructs an instance of {@link DefaultStopQuery}. + * + * @param repository - The {@link QueryRepository} that will be interacted with. (not null) + */ + public DefaultStopQuery(final QueryRepository repository) { + this.repository = requireNonNull(repository); + } + + @Override + public void stop(final UUID queryId) throws RyaStreamsException { + requireNonNull(queryId); + repository.updateIsActive(queryId, false); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/3d4a5d0e/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/queries/InMemoryQueryRepository.java ---------------------------------------------------------------------- diff --git a/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/queries/InMemoryQueryRepository.java b/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/queries/InMemoryQueryRepository.java index 80678de..f4b7b25 100644 --- a/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/queries/InMemoryQueryRepository.java +++ b/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/queries/InMemoryQueryRepository.java @@ -33,16 +33,15 @@ import org.apache.rya.streams.api.queries.QueryChangeLog.QueryChangeLogException import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.google.common.base.Supplier; -import com.google.common.base.Suppliers; - import edu.umd.cs.findbugs.annotations.DefaultAnnotation; import edu.umd.cs.findbugs.annotations.NonNull; import info.aduna.iteration.CloseableIteration; /** - * An in memory implementation of {@link QueryRepository}. It is lazily initialized the first time one of its - * functions is invoked. + * An in memory implementation of {@link QueryRepository}. It is lazily + * initialized the first time one of its functions is invoked and it updates + * its view of the {@link QueryChangeLog} any time a method is invoked that + * requires the latest view of the queries. * </p> * Thread safe. */ @@ -51,39 +50,47 @@ public class InMemoryQueryRepository implements QueryRepository { private static final Logger LOG = LoggerFactory.getLogger(InMemoryQueryRepository.class); private final ReentrantLock lock = new ReentrantLock(); - private final Supplier<Map<UUID, StreamsQuery>> queriesCache; + /** + * The change log that is the ground truth for describing what the queries look like. + */ private final QueryChangeLog changeLog; /** + * Represents the position within the {@link QueryChangeLog} that {@code queriesCache} represents. + */ + private Optional<Long> cachePosition = Optional.empty(); + + /** + * The most recently cached view of the queries within this repository. + */ + private final Map<UUID, StreamsQuery> queriesCache = new HashMap<>(); + + /** * Constructs an instance of {@link InMemoryQueryRepository}. * * @param changeLog - The change log that this repository will maintain and be based on. (not null) */ public InMemoryQueryRepository(final QueryChangeLog changeLog) { this.changeLog = requireNonNull(changeLog); - - // Lazily initialize the queries cache the first time you try to use it. - queriesCache = Suppliers.memoize(() -> initializeCache(changeLog)); } @Override - public StreamsQuery add(final String query) throws QueryRepositoryException { + public StreamsQuery add(final String query, final boolean isActive) throws QueryRepositoryException { requireNonNull(query); lock.lock(); try { // First record the change to the log. final UUID queryId = UUID.randomUUID(); - final QueryChange change = QueryChange.create(queryId, query); + final QueryChange change = QueryChange.create(queryId, query, isActive); changeLog.write(change); - // Then update the view of the change log within the repository. - final StreamsQuery streamsQuery = new StreamsQuery(queryId, query); - queriesCache.get().put(queryId, streamsQuery); + // Update the cache to represent what is currently in the log. + updateCache(); - // Return the SreamsQuery that represents the just added query. - return streamsQuery; + // Return the StreamsQuery that represents the just added query. + return queriesCache.get(queryId); } catch (final QueryChangeLogException e) { throw new QueryRepositoryException("Could not create a Rya Streams query for the SPARQL string: " + query, e); @@ -98,7 +105,35 @@ public class InMemoryQueryRepository implements QueryRepository { lock.lock(); try { - return Optional.ofNullable( queriesCache.get().get(queryId) ); + // Update the cache to represent what is currently in the log. + updateCache(); + + return Optional.ofNullable( queriesCache.get(queryId) ); + } finally { + lock.unlock(); + } + } + + @Override + public void updateIsActive(final UUID queryId, final boolean isActive) throws QueryRepositoryException { + requireNonNull(queryId); + + lock.lock(); + try { + // Update the cache to represent what is currently in the log. + updateCache(); + + // Ensure the query is in the log. + if(!queriesCache.containsKey(queryId)) { + throw new QueryRepositoryException("No query exists for ID " + queryId + "."); + } + + // First record the change to the log. + final QueryChange change = QueryChange.update(queryId, isActive); + changeLog.write(change); + + } catch (final QueryChangeLogException e) { + throw new QueryRepositoryException("Could not update the Rya Streams query for with ID: " + queryId, e); } finally { lock.unlock(); } @@ -114,9 +149,6 @@ public class InMemoryQueryRepository implements QueryRepository { final QueryChange change = QueryChange.delete(queryId); changeLog.write(change); - // Then update the view of the change log within the repository. - queriesCache.get().remove(queryId); - } catch (final QueryChangeLogException e) { throw new QueryRepositoryException("Could not delete a Rya Streams query for the Query ID: " + queryId, e); } finally { @@ -128,8 +160,11 @@ public class InMemoryQueryRepository implements QueryRepository { public Set<StreamsQuery> list() throws QueryRepositoryException { lock.lock(); try { - // Our internal cache is already up to date, so just return it's values. - return queriesCache.get().values() + // Update the cache to represent what is currently in the log. + updateCache(); + + // Our internal cache is already up to date, so just return its values. + return queriesCache.values() .stream() .collect(Collectors.toSet()); @@ -149,22 +184,19 @@ public class InMemoryQueryRepository implements QueryRepository { } /** - * A {@link Map} from query id to the {@link StreamsQuery} that is represented by that id based on what - * is already in a {@link QueryChangeLog}. - * - * @param changeLog - The change log the cache will represent. (not null) - * @return The most recent view of the change log. + * Updates the {@link #queriesCache} to reflect the latest position within the {@link #changeLog}. */ - private static Map<UUID, StreamsQuery> initializeCache(final QueryChangeLog changeLog) { + private void updateCache() { requireNonNull(changeLog); - // The Map that will be initialized and then returned by this supplier. - final Map<UUID, StreamsQuery> queriesCache = new HashMap<>(); - CloseableIteration<ChangeLogEntry<QueryChange>, QueryChangeLogException> it = null; try { - // Iterate over everything that is already in the change log. - it = changeLog.readFromStart(); + // Iterate over everything since the last position that was handled within the change log. + if(cachePosition.isPresent()) { + it = changeLog.readFromPosition(cachePosition.get() + 1); + } else { + it = changeLog.readFromStart(); + } // Apply each change to the cache. while(it.hasNext()) { @@ -174,18 +206,31 @@ public class InMemoryQueryRepository implements QueryRepository { switch(change.getChangeType()) { case CREATE: - final StreamsQuery query = new StreamsQuery(queryId, change.getSparql().get()); + final StreamsQuery query = new StreamsQuery( + queryId, + change.getSparql().get(), + change.getIsActive().get()); queriesCache.put(queryId, query); break; + case UPDATE: + if(queriesCache.containsKey(queryId)) { + final StreamsQuery old = queriesCache.get(queryId); + final StreamsQuery updated = new StreamsQuery( + old.getQueryId(), + old.getSparql(), + change.getIsActive().get()); + queriesCache.put(queryId, updated); + } + break; + case DELETE: queriesCache.remove(queryId); break; } - } - // Return the initialized cache. - return queriesCache; + cachePosition = Optional.of( entry.getPosition() ); + } } catch (final QueryChangeLogException e) { // Rethrow the exception because the object the supplier tried to create could not be created. http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/3d4a5d0e/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/queries/QueryChange.java ---------------------------------------------------------------------- diff --git a/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/queries/QueryChange.java b/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/queries/QueryChange.java index 90af79c..d283957 100644 --- a/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/queries/QueryChange.java +++ b/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/queries/QueryChange.java @@ -1,4 +1,4 @@ -/* +/** * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -37,9 +37,11 @@ import edu.umd.cs.findbugs.annotations.NonNull; @DefaultAnnotation(NonNull.class) public final class QueryChange implements Serializable { private static final long serialVersionUID = 1L; + private final UUID queryId; private final ChangeType changeType; private final Optional<String> sparql; + private final Optional<Boolean> isActive; /** * Constructs an instance of {@link QueryChange}. Use the {@link #create(UUID, String)} or {@link #delete(UUID)} @@ -48,14 +50,18 @@ public final class QueryChange implements Serializable { * @param queryId - Uniquely identifies the query within Rya Streams. (not null) * @param changeType - Indicates the type of change this object represents. (not null) * @param sparql - If this is a create change, then the SPARQL query that will be evaluated within Rya Streams. (not null) + * @param isActive - If this is a create or update change, then the active state that defines if the + * query will be evaluated by RyaStreams. (not null) */ private QueryChange( final UUID queryId, final ChangeType changeType, - final Optional<String> sparql) { + final Optional<String> sparql, + final Optional<Boolean> isActive) { this.queryId = requireNonNull(queryId); this.changeType = requireNonNull(changeType); this.sparql = requireNonNull(sparql); + this.isActive = requireNonNull(isActive); } /** @@ -79,9 +85,17 @@ public final class QueryChange implements Serializable { return sparql; } + /** + * @return If this is a create or update change, then the active state that defines if the + * query will be evaluated by RyaStreams. (not null) + */ + public Optional<Boolean> getIsActive() { + return isActive; + } + @Override public int hashCode() { - return Objects.hash(queryId, changeType, sparql); + return Objects.hash(queryId, changeType, sparql, isActive); } @Override @@ -90,7 +104,8 @@ public final class QueryChange implements Serializable { final QueryChange change = (QueryChange) o; return Objects.equals(queryId, change.queryId) && Objects.equals(changeType, change.changeType) && - Objects.equals(sparql, change.sparql); + Objects.equals(sparql, change.sparql) && + Objects.equals(isActive, change.isActive); } return false; } @@ -100,10 +115,22 @@ public final class QueryChange implements Serializable { * * @param queryId - Uniquely identifies the query within the streaming system. (not null) * @param sparql - The query that will be evaluated. (not null) + * @param isActive - The active state that defines if the query will be evaluated by RyaStreams. (not null) + * @return A {@link QueryChange} built using the provided values. + */ + public static QueryChange create(final UUID queryId, final String sparql, final boolean isActive) { + return new QueryChange(queryId, ChangeType.CREATE, Optional.of(sparql), Optional.of(isActive)); + } + + /** + * Create a {@link QueryChange} that represents a query in Rya Streams whose active state has changed. + * + * @param queryId - Uniquely identifies the query within the streaming system. (not null) + * @param isActive - The active state that defines if the query will be evaluated by RyaStreams. (not null) * @return A {@link QueryChange} built using the provided values. */ - public static QueryChange create(final UUID queryId, final String sparql) { - return new QueryChange(queryId, ChangeType.CREATE, Optional.of(sparql)); + public static QueryChange update(final UUID queryId, final boolean isActive) { + return new QueryChange(queryId, ChangeType.UPDATE, Optional.absent(), Optional.of(isActive)); } /** @@ -113,7 +140,7 @@ public final class QueryChange implements Serializable { * @return A {@link QueryChange} built using the provided values. */ public static QueryChange delete(final UUID queryId) { - return new QueryChange(queryId, ChangeType.DELETE, Optional.absent()); + return new QueryChange(queryId, ChangeType.DELETE, Optional.absent(), Optional.absent()); } /** @@ -126,6 +153,11 @@ public final class QueryChange implements Serializable { CREATE, /** + * The {@link QueryChange} indicates something about a registered query changed. + */ + UPDATE, + + /** * The {@link QueryChange} indicates a SPARQL query no longer needs to be processed by Rya Streams. */ DELETE; http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/3d4a5d0e/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/queries/QueryRepository.java ---------------------------------------------------------------------- diff --git a/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/queries/QueryRepository.java b/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/queries/QueryRepository.java index 7269588..fd51b2f 100644 --- a/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/queries/QueryRepository.java +++ b/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/queries/QueryRepository.java @@ -38,10 +38,23 @@ public interface QueryRepository extends AutoCloseable { * Adds a new query to Rya Streams. * * @param query - The SPARQL query to add. (not null) + * @param isActive - {@code true} if the query should be processed after it is added + * otherwise {@code false}. * @return The {@link StreamsQuery} used in Rya Streams. * @throws QueryRepositoryException Could not add the query. */ - public StreamsQuery add(final String query) throws QueryRepositoryException; + public StreamsQuery add(final String query, boolean isActive) throws QueryRepositoryException; + + /** + * Updates the isActive state of a {@link StreamsQuery}. Setting this value to {@code true} + * means Rya Streams will start processing the query. Setting it to {@code false} will stop + * the processing. + * + * @param queryId - Identifies which query will be updated. (not null) + * @param isActive - The new isActive state for the query. + * @throws QueryRepositoryException If the query does not exist or something else caused the change to fail. + */ + public void updateIsActive(UUID queryId, boolean isActive) throws QueryRepositoryException; /** * Get an existing query from Rya Streams. http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/3d4a5d0e/extras/rya.streams/api/src/test/java/org/apache/rya/streams/api/interactor/defaults/DefaultAddQueryTest.java ---------------------------------------------------------------------- diff --git a/extras/rya.streams/api/src/test/java/org/apache/rya/streams/api/interactor/defaults/DefaultAddQueryTest.java b/extras/rya.streams/api/src/test/java/org/apache/rya/streams/api/interactor/defaults/DefaultAddQueryTest.java index 88be6e7..77a0a15 100644 --- a/extras/rya.streams/api/src/test/java/org/apache/rya/streams/api/interactor/defaults/DefaultAddQueryTest.java +++ b/extras/rya.streams/api/src/test/java/org/apache/rya/streams/api/interactor/defaults/DefaultAddQueryTest.java @@ -43,10 +43,10 @@ public class DefaultAddQueryTest { final AddQuery addQuery = new DefaultAddQuery(repo); // Add the query. - addQuery.addQuery(sparql); + addQuery.addQuery(sparql, true); // Verify the call was forwarded to the repository. - verify(repo, times(1)).add(eq(sparql)); + verify(repo, times(1)).add(eq(sparql), eq(true)); } @Test(expected = RyaStreamsException.class) @@ -59,6 +59,6 @@ public class DefaultAddQueryTest { final AddQuery addQuery = new DefaultAddQuery(repo); // Add the query. - addQuery.addQuery(sparql); + addQuery.addQuery(sparql, true); } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/3d4a5d0e/extras/rya.streams/api/src/test/java/org/apache/rya/streams/api/queries/InMemoryQueryRepositoryTest.java ---------------------------------------------------------------------- diff --git a/extras/rya.streams/api/src/test/java/org/apache/rya/streams/api/queries/InMemoryQueryRepositoryTest.java b/extras/rya.streams/api/src/test/java/org/apache/rya/streams/api/queries/InMemoryQueryRepositoryTest.java index 92193ca..22e616d 100644 --- a/extras/rya.streams/api/src/test/java/org/apache/rya/streams/api/queries/InMemoryQueryRepositoryTest.java +++ b/extras/rya.streams/api/src/test/java/org/apache/rya/streams/api/queries/InMemoryQueryRepositoryTest.java @@ -43,9 +43,9 @@ public class InMemoryQueryRepositoryTest { try(final QueryRepository queries = new InMemoryQueryRepository( new InMemoryQueryChangeLog() )) { // Add some queries to it. final Set<StreamsQuery> expected = new HashSet<>(); - expected.add( queries.add("query 1") ); - expected.add( queries.add("query 2") ); - expected.add( queries.add("query 3") ); + expected.add( queries.add("query 1", true) ); + expected.add( queries.add("query 2", false) ); + expected.add( queries.add("query 3", true) ); // Show they are in the list of all queries. final Set<StreamsQuery> stored = queries.list(); @@ -59,9 +59,9 @@ public class InMemoryQueryRepositoryTest { try(final QueryRepository queries = new InMemoryQueryRepository( new InMemoryQueryChangeLog() )) { // Add some queries to it. The second one we will delete. final Set<StreamsQuery> expected = new HashSet<>(); - expected.add( queries.add("query 1") ); - final UUID deletedMeId = queries.add("query 2").getQueryId(); - expected.add( queries.add("query 3") ); + expected.add( queries.add("query 1", true) ); + final UUID deletedMeId = queries.add("query 2", false).getQueryId(); + expected.add( queries.add("query 3", true) ); // Delete the second query. queries.delete( deletedMeId ); @@ -73,15 +73,15 @@ public class InMemoryQueryRepositoryTest { } @Test - public void initializedWithPopulatedChnageLog() throws Exception { + public void initializedWithPopulatedChangeLog() throws Exception { // Setup a totally in memory QueryRepository. Hold onto the change log so that we can use it again later. final QueryChangeLog changeLog = new InMemoryQueryChangeLog(); try(final QueryRepository queries = new InMemoryQueryRepository( changeLog )) { // Add some queries and deletes to it. final Set<StreamsQuery> expected = new HashSet<>(); - expected.add( queries.add("query 1") ); - final UUID deletedMeId = queries.add("query 2").getQueryId(); - expected.add( queries.add("query 3") ); + expected.add( queries.add("query 1", true) ); + final UUID deletedMeId = queries.add("query 2", false).getQueryId(); + expected.add( queries.add("query 3", true) ); queries.delete( deletedMeId ); // Create a new totally in memory QueryRepository. @@ -110,7 +110,7 @@ public class InMemoryQueryRepositoryTest { // Setup a totally in memory QueryRepository. try(final QueryRepository queries = new InMemoryQueryRepository( new InMemoryQueryChangeLog() )) { // Add a query to it. - final StreamsQuery query = queries.add("query 1"); + final StreamsQuery query = queries.add("query 1", true); // Show the fetched query matches the expected ones. final Optional<StreamsQuery> fetched = queries.get(query.getQueryId()); @@ -129,4 +129,21 @@ public class InMemoryQueryRepositoryTest { assertFalse(query.isPresent()); } } + + @Test + public void update() throws Exception { + // Setup a totally in memory QueryRepository. + try(final QueryRepository queries = new InMemoryQueryRepository( new InMemoryQueryChangeLog() )) { + // Add a query to it. + final StreamsQuery query = queries.add("query 1", true); + + // Change the isActive state of that query. + queries.updateIsActive(query.getQueryId(), false); + + // Show the fetched query matches the expected one. + final Optional<StreamsQuery> fetched = queries.get(query.getQueryId()); + final StreamsQuery expected = new StreamsQuery(query.getQueryId(), query.getSparql(), false); + assertEquals(expected, fetched.get()); + } + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/3d4a5d0e/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/AddQueryCommand.java ---------------------------------------------------------------------- diff --git a/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/AddQueryCommand.java b/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/AddQueryCommand.java index c72e6a2..275a975 100644 --- a/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/AddQueryCommand.java +++ b/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/AddQueryCommand.java @@ -52,6 +52,9 @@ public class AddQueryCommand implements RyaStreamsCommand { @Parameter(names = { "--query", "-q" }, required = true, description = "The SPARQL query to add to Rya Streams.") private String query; + @Parameter(names = {"--isActive", "-a"}, required = false, description = "True if the added query will be started.") + private String isActive; + @Override public String toString() { final StringBuilder parameters = new StringBuilder(); @@ -60,6 +63,7 @@ public class AddQueryCommand implements RyaStreamsCommand { if (!Strings.isNullOrEmpty(query)) { parameters.append("\tQuery: " + query + "\n"); } + parameters.append("\tIs Active: " + isActive + "\n"); return parameters.toString(); } } @@ -115,7 +119,7 @@ public class AddQueryCommand implements RyaStreamsCommand { try(QueryRepository queryRepo = new InMemoryQueryRepository(queryChangeLog)) { final AddQuery addQuery = new DefaultAddQuery(queryRepo); try { - final StreamsQuery query = addQuery.addQuery(params.query); + final StreamsQuery query = addQuery.addQuery(params.query, Boolean.parseBoolean(params.isActive)); System.out.println("Added query: " + query.getSparql()); } catch (final RyaStreamsException e) { System.err.println("Unable to parse query: " + params.query); http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/3d4a5d0e/extras/rya.streams/client/src/test/java/org/apache/rya/streams/client/command/AddQueryCommandIT.java ---------------------------------------------------------------------- diff --git a/extras/rya.streams/client/src/test/java/org/apache/rya/streams/client/command/AddQueryCommandIT.java b/extras/rya.streams/client/src/test/java/org/apache/rya/streams/client/command/AddQueryCommandIT.java index 3a412d2..8b4f074 100644 --- a/extras/rya.streams/client/src/test/java/org/apache/rya/streams/client/command/AddQueryCommandIT.java +++ b/extras/rya.streams/client/src/test/java/org/apache/rya/streams/client/command/AddQueryCommandIT.java @@ -80,7 +80,8 @@ public class AddQueryCommandIT { "-r", "" + ryaInstance, "-i", kafka.getKafkaHostname(), "-p", kafka.getKafkaPort(), - "-q", query + "-q", query, + "-a", "true" }; // Execute the command. @@ -101,7 +102,8 @@ public class AddQueryCommandIT { "--ryaInstance", "" + ryaInstance, "--kafkaHostname", kafka.getKafkaHostname(), "--kafkaPort", kafka.getKafkaPort(), - "--query", query + "--query", query, + "--isActive", "true" }; // Execute the command. http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/3d4a5d0e/extras/rya.streams/client/src/test/java/org/apache/rya/streams/client/command/DeleteQueryCommandIT.java ---------------------------------------------------------------------- diff --git a/extras/rya.streams/client/src/test/java/org/apache/rya/streams/client/command/DeleteQueryCommandIT.java b/extras/rya.streams/client/src/test/java/org/apache/rya/streams/client/command/DeleteQueryCommandIT.java index 91647f2..6083543 100644 --- a/extras/rya.streams/client/src/test/java/org/apache/rya/streams/client/command/DeleteQueryCommandIT.java +++ b/extras/rya.streams/client/src/test/java/org/apache/rya/streams/client/command/DeleteQueryCommandIT.java @@ -1,4 +1,4 @@ -/* +/** * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -18,7 +18,6 @@ */ package org.apache.rya.streams.client.command; -import static java.util.Objects.requireNonNull; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotEquals; @@ -40,6 +39,8 @@ import org.apache.rya.streams.kafka.serialization.queries.QueryChangeDeserialize import org.apache.rya.streams.kafka.serialization.queries.QueryChangeSerializer; import org.apache.rya.test.kafka.KafkaTestInstanceRule; import org.apache.rya.test.kafka.KafkaTestUtil; +import org.junit.After; +import org.junit.Before; import org.junit.Rule; import org.junit.Test; @@ -48,103 +49,90 @@ import org.junit.Test; */ public class DeleteQueryCommandIT { + private final String ryaInstance = UUID.randomUUID().toString(); + private QueryRepository queryRepo; + @Rule public KafkaTestInstanceRule kafka = new KafkaTestInstanceRule(true); - /** - * This test simulates executing many commands and each of them use their own InMemoryQueryRepository. We need - * to re-create the repo outside of the command to ensure it has the most up to date values inside of it. - * - * @param ryaInstance - The rya instance the repository is connected to. (not null) - * @param createTopic - Set this to true if the topic doesn't exist yet. - */ - private QueryRepository makeQueryRepository(final String ryaInstance, final boolean createTopic) { - requireNonNull(ryaInstance); - + @Before + public void setup() { // Make sure the topic that the change log uses exists. - final String changeLogTopic = KafkaTopics.queryChangeLogTopic("" + ryaInstance); - if(createTopic) { - kafka.createTopic(changeLogTopic); - } + final String changeLogTopic = KafkaTopics.queryChangeLogTopic(ryaInstance); + System.out.println("Test Change Log Topic: " + changeLogTopic); + kafka.createTopic(changeLogTopic); // Setup the QueryRepository used by the test. final Producer<?, QueryChange> queryProducer = KafkaTestUtil.makeProducer(kafka, StringSerializer.class, QueryChangeSerializer.class); final Consumer<?, QueryChange>queryConsumer = KafkaTestUtil.fromStartConsumer(kafka, StringDeserializer.class, QueryChangeDeserializer.class); final QueryChangeLog changeLog = new KafkaQueryChangeLog(queryProducer, queryConsumer, changeLogTopic); - return new InMemoryQueryRepository(changeLog); + queryRepo = new InMemoryQueryRepository(changeLog); + } + + @After + public void cleanup() throws Exception { + queryRepo.close(); } @Test public void shortParams() throws Exception { - final String ryaInstance = UUID.randomUUID().toString(); - // Add a few queries to Rya Streams. - try(QueryRepository repo = makeQueryRepository(ryaInstance, true)) { - repo.add("query1"); - final UUID query2Id = repo.add("query2").getQueryId(); - repo.add("query3"); - - // Show that all three of the queries were added. - Set<StreamsQuery> queries = repo.list(); - assertEquals(3, queries.size()); - - // Delete query 2 using the delete query command. - final String[] deleteArgs = new String[] { - "-r", "" + ryaInstance, - "-i", kafka.getKafkaHostname(), - "-p", kafka.getKafkaPort(), - "-q", query2Id.toString() - }; - - final DeleteQueryCommand deleteCommand = new DeleteQueryCommand(); - deleteCommand.execute(deleteArgs); - - // Show query2 was deleted. - try(QueryRepository repo2 = makeQueryRepository(ryaInstance, false)) { - queries = repo2.list(); - assertEquals(2, queries.size()); - - for(final StreamsQuery query : queries) { - assertNotEquals(query2Id, query.getQueryId()); - } - } + queryRepo.add("query1", true); + final UUID query2Id = queryRepo.add("query2", false).getQueryId(); + queryRepo.add("query3", true); + + // Show that all three of the queries were added. + Set<StreamsQuery> queries = queryRepo.list(); + assertEquals(3, queries.size()); + + // Delete query 2 using the delete query command. + final String[] deleteArgs = new String[] { + "-r", ryaInstance, + "-i", kafka.getKafkaHostname(), + "-p", kafka.getKafkaPort(), + "-q", query2Id.toString() + }; + + final DeleteQueryCommand deleteCommand = new DeleteQueryCommand(); + deleteCommand.execute(deleteArgs); + + // Show query2 was deleted. + queries = queryRepo.list(); + assertEquals(2, queries.size()); + + for(final StreamsQuery query : queries) { + assertNotEquals(query2Id, query.getQueryId()); } } @Test public void longParams() throws Exception { - final String ryaInstance = UUID.randomUUID().toString(); - // Add a few queries to Rya Streams. - try(QueryRepository repo = makeQueryRepository(ryaInstance, true)) { - repo.add("query1"); - final UUID query2Id = repo.add("query2").getQueryId(); - repo.add("query3"); - - // Show that all three of the queries were added. - Set<StreamsQuery> queries = repo.list(); - assertEquals(3, queries.size()); - - // Delete query 2 using the delete query command. - final String[] deleteArgs = new String[] { - "--ryaInstance", "" + ryaInstance, - "--kafkaHostname", kafka.getKafkaHostname(), - "--kafkaPort", kafka.getKafkaPort(), - "--queryID", query2Id.toString() - }; - - final DeleteQueryCommand deleteCommand = new DeleteQueryCommand(); - deleteCommand.execute(deleteArgs); - - // Show query2 was deleted. - try(QueryRepository repo2 = makeQueryRepository(ryaInstance, false)) { - queries = repo2.list(); - assertEquals(2, queries.size()); - - for(final StreamsQuery query : queries) { - assertNotEquals(query2Id, query.getQueryId()); - } - } + queryRepo.add("query1", true); + final UUID query2Id = queryRepo.add("query2", false).getQueryId(); + queryRepo.add("query3", true); + + // Show that all three of the queries were added. + Set<StreamsQuery> queries = queryRepo.list(); + assertEquals(3, queries.size()); + + // Delete query 2 using the delete query command. + final String[] deleteArgs = new String[] { + "--ryaInstance", "" + ryaInstance, + "--kafkaHostname", kafka.getKafkaHostname(), + "--kafkaPort", kafka.getKafkaPort(), + "--queryID", query2Id.toString() + }; + + final DeleteQueryCommand deleteCommand = new DeleteQueryCommand(); + deleteCommand.execute(deleteArgs); + + // Show query2 was deleted. + queries = queryRepo.list(); + assertEquals(2, queries.size()); + + for(final StreamsQuery query : queries) { + assertNotEquals(query2Id, query.getQueryId()); } } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/3d4a5d0e/extras/rya.streams/client/src/test/java/org/apache/rya/streams/client/command/ListQueryCommandIT.java ---------------------------------------------------------------------- diff --git a/extras/rya.streams/client/src/test/java/org/apache/rya/streams/client/command/ListQueryCommandIT.java b/extras/rya.streams/client/src/test/java/org/apache/rya/streams/client/command/ListQueryCommandIT.java index 00b4ce0..1399142 100644 --- a/extras/rya.streams/client/src/test/java/org/apache/rya/streams/client/command/ListQueryCommandIT.java +++ b/extras/rya.streams/client/src/test/java/org/apache/rya/streams/client/command/ListQueryCommandIT.java @@ -71,9 +71,9 @@ public class ListQueryCommandIT { @Test public void shortParams() throws Exception { // Add a few queries to Rya Streams. - queryRepo.add("query1"); - queryRepo.add("query2"); - queryRepo.add("query3"); + queryRepo.add("query1", true); + queryRepo.add("query2", false); + queryRepo.add("query3", true); // Execute the List Queries command. final String[] args = new String[] { @@ -89,9 +89,9 @@ public class ListQueryCommandIT { @Test public void longParams() throws Exception { // Add a few queries to Rya Streams. - queryRepo.add("query1"); - queryRepo.add("query2"); - queryRepo.add("query3"); + queryRepo.add("query1", true); + queryRepo.add("query2", false); + queryRepo.add("query3", true); // Execute the List Queries command. final String[] args = new String[] { http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/3d4a5d0e/extras/rya.streams/client/src/test/java/org/apache/rya/streams/client/command/RunQueryCommandIT.java ---------------------------------------------------------------------- diff --git a/extras/rya.streams/client/src/test/java/org/apache/rya/streams/client/command/RunQueryCommandIT.java b/extras/rya.streams/client/src/test/java/org/apache/rya/streams/client/command/RunQueryCommandIT.java index f2100e8..3389d6b 100644 --- a/extras/rya.streams/client/src/test/java/org/apache/rya/streams/client/command/RunQueryCommandIT.java +++ b/extras/rya.streams/client/src/test/java/org/apache/rya/streams/client/command/RunQueryCommandIT.java @@ -114,7 +114,7 @@ public class RunQueryCommandIT { @Test public void runQuery() throws Exception { // Register a query with the Query Repository. - final StreamsQuery sQuery = queryRepo.add("SELECT * WHERE { ?person <urn:worksAt> ?business . }"); + final StreamsQuery sQuery = queryRepo.add("SELECT * WHERE { ?person <urn:worksAt> ?business . }", true); // Arguments that run the query we just registered with Rya Streams. final String[] args = new String[] { http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/3d4a5d0e/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/queries/KafkaQueryChangeLog.java ---------------------------------------------------------------------- diff --git a/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/queries/KafkaQueryChangeLog.java b/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/queries/KafkaQueryChangeLog.java index 9403e4b..2822272 100644 --- a/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/queries/KafkaQueryChangeLog.java +++ b/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/queries/KafkaQueryChangeLog.java @@ -140,18 +140,13 @@ public class KafkaQueryChangeLog implements QueryChangeLog { @Override public boolean hasNext() throws QueryChangeLogException { - if (iterCache == null || !iterCache.hasNext()) { - populateCache(); - } + maybePopulateCache(); return iterCache.hasNext(); } @Override public ChangeLogEntry<QueryChange> next() throws QueryChangeLogException { - if (iterCache == null && iterCache.hasNext()) { - populateCache(); - } - + maybePopulateCache(); if (iterCache.hasNext()) { return iterCache.next(); } @@ -167,14 +162,14 @@ public class KafkaQueryChangeLog implements QueryChangeLog { consumer.unsubscribe(); } - private void populateCache() { - final ConsumerRecords<?, QueryChange> records = consumer.poll(3000L); - final List<ChangeLogEntry<QueryChange>> changes = new ArrayList<>(); - records.forEach( - record -> - changes.add(new ChangeLogEntry<>(record.offset(), record.value())) - ); - iterCache = changes.iterator(); + private void maybePopulateCache() { + // If the cache isn't initialized yet, or it is empty, then check to see if there is more to put into it. + if (iterCache == null || !iterCache.hasNext()) { + final ConsumerRecords<?, QueryChange> records = consumer.poll(3000L); + final List<ChangeLogEntry<QueryChange>> changes = new ArrayList<>(); + records.forEach(record -> changes.add(new ChangeLogEntry<>(record.offset(), record.value()))); + iterCache = changes.iterator(); + } } } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/3d4a5d0e/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/interactor/KafkaRunQueryIT.java ---------------------------------------------------------------------- diff --git a/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/interactor/KafkaRunQueryIT.java b/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/interactor/KafkaRunQueryIT.java index 33b3a92..9a773f0 100644 --- a/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/interactor/KafkaRunQueryIT.java +++ b/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/interactor/KafkaRunQueryIT.java @@ -86,7 +86,7 @@ public class KafkaRunQueryIT { final QueryRepository queries = new InMemoryQueryRepository( new InMemoryQueryChangeLog() ); // Add the query to the query repository. - final StreamsQuery sQuery = queries.add("SELECT * WHERE { ?person <urn:worksAt> ?business . }"); + final StreamsQuery sQuery = queries.add("SELECT * WHERE { ?person <urn:worksAt> ?business . }", true); final UUID queryId = sQuery.getQueryId(); final String resultsTopic = KafkaTopics.queryResultsTopic(queryId); http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/3d4a5d0e/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/queries/KafkaQueryChangeLogIT.java ---------------------------------------------------------------------- diff --git a/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/queries/KafkaQueryChangeLogIT.java b/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/queries/KafkaQueryChangeLogIT.java index 04c81ed..c2b821f 100644 --- a/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/queries/KafkaQueryChangeLogIT.java +++ b/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/queries/KafkaQueryChangeLogIT.java @@ -19,6 +19,7 @@ package org.apache.rya.streams.kafka.queries; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; import java.util.ArrayList; import java.util.List; @@ -78,7 +79,7 @@ public class KafkaQueryChangeLogIT extends KafkaITBase { public void testWrite() throws Exception { final String sparql = "SOME QUERY HERE"; final UUID uuid = UUID.randomUUID(); - final QueryChange newChange = QueryChange.create(uuid, sparql); + final QueryChange newChange = QueryChange.create(uuid, sparql, true); changeLog.write(newChange); consumer.subscribe(Lists.newArrayList(topic)); @@ -90,6 +91,17 @@ public class KafkaQueryChangeLogIT extends KafkaITBase { } @Test + public void readSingleWrite() throws Exception { + // Write a single change to the log. + final QueryChange change = QueryChange.create(UUID.randomUUID(), "query", true); + changeLog.write(change); + + // Read that entry from the log. + final QueryChange readChange = changeLog.readFromStart().next().getEntry(); + assertEquals(change, readChange); + } + + @Test public void readFromBegining() throws Exception { final List<QueryChange> expected = write10ChangesToChangeLog(); @@ -175,12 +187,34 @@ public class KafkaQueryChangeLogIT extends KafkaITBase { assertEquals(0, count); } + @Test + public void multipleClients() throws Exception { + // Create a second KafkaQueryChangeLog objects that connect to the same change log. + final Producer<?, QueryChange> producer2 = KafkaTestUtil.makeProducer(rule, StringSerializer.class, QueryChangeSerializer.class); + final Consumer<?, QueryChange> consumer2 = KafkaTestUtil.fromStartConsumer(rule, StringDeserializer.class, QueryChangeDeserializer.class); + try(final KafkaQueryChangeLog changeLog2 = new KafkaQueryChangeLog(producer2, consumer2, topic)) { + // Show both of them report empty. + assertFalse( changeLog.readFromStart().hasNext() ); + assertFalse( changeLog2.readFromStart().hasNext() ); + + // Write a change to the first log. + final QueryChange change = QueryChange.create(UUID.randomUUID(), "query", true); + changeLog.write(change); + + // Show it's in the first log. + assertEquals(change, changeLog.readFromStart().next().getEntry()); + + // Show it is also seen in the second log. + assertEquals(change, changeLog2.readFromStart().next().getEntry()); + } + } + private List<QueryChange> write10ChangesToChangeLog() throws Exception { final List<QueryChange> changes = new ArrayList<>(); for (int ii = 0; ii < 10; ii++) { final String sparql = "SOME QUERY HERE_" + ii; final UUID uuid = UUID.randomUUID(); - final QueryChange newChange = QueryChange.create(uuid, sparql); + final QueryChange newChange = QueryChange.create(uuid, sparql, true); changeLog.write(newChange); changes.add(newChange); }
