RYA-377 Implemented a Kafka version of GetQueryResultStream.

Project: http://git-wip-us.apache.org/repos/asf/incubator-rya/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-rya/commit/fc9775e2
Tree: http://git-wip-us.apache.org/repos/asf/incubator-rya/tree/fc9775e2
Diff: http://git-wip-us.apache.org/repos/asf/incubator-rya/diff/fc9775e2

Branch: refs/heads/master
Commit: fc9775e2639b8efa263c4f54f32f233da7e0b397
Parents: 6056528
Author: kchilton2 <kevin.e.chil...@gmail.com>
Authored: Tue Oct 31 18:24:36 2017 -0400
Committer: caleb <caleb.me...@parsons.com>
Committed: Tue Jan 9 15:13:00 2018 -0500

----------------------------------------------------------------------
 .../streams/api/entity/QueryResultStream.java   |  24 ++-
 .../apache/rya/streams/kafka/KafkaTopics.java   |  21 +-
 .../kafka/entity/KafkaQueryResultStream.java    |  99 ++++++++++
 .../interactor/KafkaGetQueryResultStream.java   | 108 ++++++++++
 .../interactor/KafkaGetQueryResultStreamIT.java | 195 +++++++++++++++++++
 .../rya/test/kafka/KafkaTestInstanceRule.java   |  14 ++
 6 files changed, 455 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/fc9775e2/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/entity/QueryResultStream.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/entity/QueryResultStream.java
 
b/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/entity/QueryResultStream.java
index fdd62df..aa5dcfd 100644
--- 
a/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/entity/QueryResultStream.java
+++ 
b/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/entity/QueryResultStream.java
@@ -18,7 +18,8 @@
  */
 package org.apache.rya.streams.api.entity;
 
-import java.util.Collection;
+import static java.util.Objects.requireNonNull;
+
 import java.util.UUID;
 
 import org.apache.rya.api.model.VisibilityBindingSet;
@@ -31,12 +32,26 @@ import edu.umd.cs.findbugs.annotations.NonNull;
  * An infinite stream of {@link VisibilityBindingSet}s that are the results of 
a query within Rya Streams.
  */
 @DefaultAnnotation(NonNull.class)
-public interface QueryResultStream extends AutoCloseable {
+public abstract class QueryResultStream implements AutoCloseable {
+
+    private final UUID queryId;
+
+    /**
+     * Constructs an instance of {@link QueryResultStream}.
+     *
+     * @param queryId - The query whose results this stream iterates over. 
(not null)
+     */
+    public QueryResultStream(final UUID queryId) {
+        this.queryId = requireNonNull(queryId);
+    }
+
 
     /**
      * @return Identifies which query in Rya Streams this result stream is 
over.
      */
-    public UUID getQueryId();
+    public UUID getQueryId() {
+        return queryId;
+    }
 
     /**
      * Wait at most {@code timeoutMs} milliseconds for the next collection of 
results.
@@ -44,7 +59,8 @@ public interface QueryResultStream extends AutoCloseable {
      * @param timeoutMs - The number of milliseconds to at most wait for the 
next collection of results. (not null)
      * @return The next collection of {@link VisibilityBindingSet}s that are 
the result of the query. Empty if
      *   there where no new results within the timout period.
+     * @throws IllegalStateException If the stream has been closed.
      * @throws RyaStreamsException Could not fetch the next set of results.
      */
-    public Collection<VisibilityBindingSet> poll(long timeoutMs) throws 
RyaStreamsException;
+    public abstract Iterable<VisibilityBindingSet> poll(long timeoutMs) throws 
IllegalStateException, RyaStreamsException;
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/fc9775e2/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/KafkaTopics.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/KafkaTopics.java
 
b/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/KafkaTopics.java
index dfc4c9d..a8fbf23 100644
--- 
a/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/KafkaTopics.java
+++ 
b/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/KafkaTopics.java
@@ -18,6 +18,10 @@
  */
 package org.apache.rya.streams.kafka;
 
+import static java.util.Objects.requireNonNull;
+
+import java.util.UUID;
+
 import org.apache.rya.streams.api.queries.QueryChangeLog;
 
 import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
@@ -30,23 +34,36 @@ import edu.umd.cs.findbugs.annotations.NonNull;
 public class KafkaTopics {
 
     /**
-     * Creates the Kafka topic that will be used for a specific instance of 
Rya's {@link QueryChangeLog}.
+     * Creates the Kafka topic name that is used for a specific instance of 
Rya's {@link QueryChangeLog}.
      *
      * @param ryaInstance - The Rya instance the change log is for. (not null)
      * @return The name of the Kafka topic.
      */
     public static String queryChangeLogTopic(final String ryaInstance) {
+        requireNonNull(ryaInstance);
         return ryaInstance + "-QueryChangeLog";
     }
 
     /**
-     * Creates the Kafka topic that will be used to load statements into the 
Rya Streams system for a specific
+     * Creates the Kafka topic name that is used to load statements into the 
Rya Streams system for a specific
      * instance of Rya.
      *
      * @param ryaInstance - The Rya instance the statements are for. (not null)
      * @return The name of the Kafka topic.
      */
     public static String statementsTopic(final String ryaInstance) {
+        requireNonNull(ryaInstance);
         return ryaInstance + "-Statements";
     }
+
+    /**
+     * Creates the Kafka topic name that is used for a specific query that is 
managed within Rya Streams.
+     *
+     * @param queryId - The id of the query the topic is for.
+     * @return The name of the Kafka topic.
+     */
+    public static String queryResultsTopic(final UUID queryId) {
+        requireNonNull(queryId);
+        return "QueryResults-" + queryId.toString();
+    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/fc9775e2/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/entity/KafkaQueryResultStream.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/entity/KafkaQueryResultStream.java
 
b/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/entity/KafkaQueryResultStream.java
new file mode 100644
index 0000000..360aaa2
--- /dev/null
+++ 
b/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/entity/KafkaQueryResultStream.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.streams.kafka.entity;
+
+import static java.util.Objects.requireNonNull;
+
+import java.util.Iterator;
+import java.util.UUID;
+
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.rya.api.model.VisibilityBindingSet;
+import org.apache.rya.streams.api.entity.QueryResultStream;
+import org.apache.rya.streams.api.exception.RyaStreamsException;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+
+/**
+ * A Kafka implementation of {@link QueryResultStream}. It delegates the 
{@link #poll(long)} method to
+ * a {@link Consumer}. As a result, the starting point of this stream is 
whatever position the consumer
+ * starts at within the Kafka topic.
+ */
+@DefaultAnnotation(NonNull.class)
+public class KafkaQueryResultStream extends QueryResultStream {
+
+    private final Consumer<?, VisibilityBindingSet> consumer;
+
+    /**
+     * Constructs an instance of {@link KafkaQueryResultStream}.
+     *
+     * @param queryId - The query the results are for. (not null)
+     * @param consumer - The consumer that will be polled by this class. (not 
null)
+     */
+    public KafkaQueryResultStream(final UUID queryId, final Consumer<?, 
VisibilityBindingSet> consumer) {
+        super(queryId);
+        this.consumer = requireNonNull(consumer);
+    }
+
+    @Override
+    public Iterable<VisibilityBindingSet> poll(final long timeoutMs) throws 
RyaStreamsException {
+        return new RecordEntryIterable<>( consumer.poll(timeoutMs) );
+    }
+
+    /**
+     * An {@link Iterable} that creates {@link Iterator}s over a {@link 
ConsumerRecords}' values.
+     * This is useful for when you don't care about the key portion of a 
record.
+     *
+     * @param <K> - The type of the record's key.
+     * @param <T> - The type of the record's value.
+     */
+    private final class RecordEntryIterable<K, T> implements Iterable<T> {
+
+        private final ConsumerRecords<K, T> records;
+
+        public RecordEntryIterable(final ConsumerRecords<K, T> records) {
+            this.records = requireNonNull(records);
+        }
+
+        @Override
+        public Iterator<T> iterator() {
+            return new Iterator<T>() {
+                private final Iterator<ConsumerRecord<K, T>> it = 
records.iterator();
+
+                @Override
+                public boolean hasNext() {
+                    return it.hasNext();
+                }
+
+                @Override
+                public T next() {
+                    return it.next().value();
+                }
+            };
+        }
+    }
+
+    @Override
+    public void close() throws Exception {
+        consumer.close();
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/fc9775e2/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/interactor/KafkaGetQueryResultStream.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/interactor/KafkaGetQueryResultStream.java
 
b/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/interactor/KafkaGetQueryResultStream.java
new file mode 100644
index 0000000..b3c3fea
--- /dev/null
+++ 
b/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/interactor/KafkaGetQueryResultStream.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.streams.kafka.interactor;
+
+import static java.util.Objects.requireNonNull;
+
+import java.util.Arrays;
+import java.util.Properties;
+import java.util.UUID;
+
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.rya.api.model.VisibilityBindingSet;
+import org.apache.rya.streams.api.entity.QueryResultStream;
+import org.apache.rya.streams.api.exception.RyaStreamsException;
+import org.apache.rya.streams.api.interactor.GetQueryResultStream;
+import org.apache.rya.streams.kafka.KafkaTopics;
+import org.apache.rya.streams.kafka.entity.KafkaQueryResultStream;
+import 
org.apache.rya.streams.kafka.serialization.VisibilityBindingSetDeserializer;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+
+/**
+ * A Kafka topic implementation of {@link GetQueryResultStream}.
+ */
+@DefaultAnnotation(NonNull.class)
+public class KafkaGetQueryResultStream implements GetQueryResultStream {
+
+    private final String bootstrapServers;
+
+    /**
+     * Constructs an instance of {@link KafkaGetQueryResultStream}.
+     *
+     * @param kafkaHostname - The hostname of the Kafka Broker to connect to. 
(not null)
+     * @param kafkaPort - The port of the Kafka Broker to connect to. (not 
null)
+     */
+    public KafkaGetQueryResultStream(final String kafkaHostname, final String 
kafkaPort) {
+        requireNonNull(kafkaHostname);
+        requireNonNull(kafkaPort);
+        bootstrapServers = kafkaHostname + ":" + kafkaPort;
+    }
+
+    @Override
+    public QueryResultStream fromStart(final UUID queryId) throws 
RyaStreamsException {
+        requireNonNull(queryId);
+
+        // Always start at the earliest point within the topic.
+        return makeStream(queryId, "earliest");
+    }
+
+    @Override
+    public QueryResultStream fromNow(final UUID queryId) throws 
RyaStreamsException {
+        requireNonNull(queryId);
+
+        // Always start at the latest point within the topic.
+        return makeStream(queryId, "latest");
+    }
+
+    private QueryResultStream makeStream(final UUID queryId, final String 
autoOffsetResetConfig) {
+        // Configure which instance of Kafka to connect to.
+        final Properties props = new Properties();
+        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
+
+        // Nothing meaningful is in the key and the value is a 
VisibilityBindingSet.
+        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
StringDeserializer.class);
+        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
VisibilityBindingSetDeserializer.class);
+
+        // Use a UUID for the Group Id so that we never register as part of 
the same group as another consumer.
+        props.put(ConsumerConfig.GROUP_ID_CONFIG, 
UUID.randomUUID().toString());
+
+        // Set a client id so that server side logging can be traced.
+        props.put(ConsumerConfig.CLIENT_ID_CONFIG, "Query-Result-Stream-" + 
queryId);
+
+        // These consumers always start at a specific point and move forwards 
until the caller is finished with
+        // the returned stream, so never commit the consumer's progress.
+        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, 
autoOffsetResetConfig);
+        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
+
+        // We are not closing the consumer here because the returned 
QueryResultStream is responsible for closing the
+        // underlying resources required to process it.
+        final KafkaConsumer<Object, VisibilityBindingSet> consumer = new 
KafkaConsumer<>(props);
+
+        // Register the consumer for the query's results.
+        final String resultTopic = KafkaTopics.queryResultsTopic(queryId);
+        consumer.subscribe(Arrays.asList(resultTopic));
+
+        // Return the result stream.
+        return new KafkaQueryResultStream(queryId, consumer);
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/fc9775e2/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/interactor/KafkaGetQueryResultStreamIT.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/interactor/KafkaGetQueryResultStreamIT.java
 
b/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/interactor/KafkaGetQueryResultStreamIT.java
new file mode 100644
index 0000000..3343f76
--- /dev/null
+++ 
b/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/interactor/KafkaGetQueryResultStreamIT.java
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.streams.kafka.interactor;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+import java.util.UUID;
+
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.rya.api.model.VisibilityBindingSet;
+import org.apache.rya.streams.api.entity.QueryResultStream;
+import org.apache.rya.streams.api.interactor.GetQueryResultStream;
+import org.apache.rya.streams.kafka.KafkaTopics;
+import 
org.apache.rya.streams.kafka.serialization.VisibilityBindingSetSerializer;
+import org.apache.rya.test.kafka.KafkaTestInstanceRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.openrdf.model.ValueFactory;
+import org.openrdf.model.impl.ValueFactoryImpl;
+import org.openrdf.query.impl.MapBindingSet;
+
+/**
+ * Integration tests the methods of {@link KafkaGetQueryResultStream}.
+ */
+public class KafkaGetQueryResultStreamIT {
+
+    @Rule
+    public KafkaTestInstanceRule kafka = new KafkaTestInstanceRule(true);
+
+    /**
+     * @return A {@link Producer} that is able to write {@link 
VisibilityBindingSet}s.
+     */
+    private Producer<?, VisibilityBindingSet> makeProducer() {
+        final Properties producerProps = kafka.createBootstrapServerConfig();
+        producerProps.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
StringSerializer.class.getName());
+        
producerProps.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
VisibilityBindingSetSerializer.class.getName());
+        return new KafkaProducer<>(producerProps);
+    }
+
+    /**
+     * Polls a {@link QueryResultStream} until it has either polled too many 
times without hitting
+     * the target number of results, or it hits the target number of results.
+     *
+     * @param pollMs - How long each poll could take.
+     * @param pollIterations - The maximum nubmer of polls that will be 
attempted.
+     * @param targetSize - The number of results to read before stopping.
+     * @param stream - The stream that will be polled.
+     * @return The results that were read from the stream.
+     * @throws Exception If the poll failed.
+     */
+    private List<VisibilityBindingSet> pollForResults(
+            final int pollMs,
+            final int pollIterations,
+            final int targetSize,
+            final QueryResultStream stream)  throws Exception{
+        final List<VisibilityBindingSet> read = new ArrayList<>();
+
+        int i = 0;
+        while(read.size() < targetSize && i < pollIterations) {
+            for(final VisibilityBindingSet visBs : stream.poll(pollMs)) {
+                read.add( visBs );
+            }
+            i++;
+        }
+
+        return read;
+    }
+
+    @Test
+    public void fromStart() throws Exception {
+        // Create an ID for the query.
+        final UUID queryId = UUID.randomUUID();
+
+        // Create a list of test VisibilityBindingSets.
+        final List<VisibilityBindingSet> original = new ArrayList<>();
+
+        final ValueFactory vf = new ValueFactoryImpl();
+        MapBindingSet bs = new MapBindingSet();
+        bs.addBinding("urn:name", vf.createLiteral("Alice"));
+        original.add(new VisibilityBindingSet(bs, "a|b|c"));
+
+        bs = new MapBindingSet();
+        bs.addBinding("urn:name", vf.createLiteral("Bob"));
+        original.add(new VisibilityBindingSet(bs, "a"));
+
+        bs = new MapBindingSet();
+        bs.addBinding("urn:name", vf.createLiteral("Charlie"));
+        original.add(new VisibilityBindingSet(bs, "b|c"));
+
+        // Write some entries to the query result topic in Kafka.
+        try(final Producer<?, VisibilityBindingSet> producer = makeProducer()) 
{
+            final String resultTopic = KafkaTopics.queryResultsTopic(queryId);
+            for(final VisibilityBindingSet visBs : original) {
+                producer.send(new ProducerRecord<>(resultTopic, visBs));
+            }
+        }
+
+        // Use the interactor that is being tested to read all of the 
visibility binding sets.
+        final GetQueryResultStream interactor = new 
KafkaGetQueryResultStream(kafka.getKafkaHostname(), kafka.getKafkaPort());
+        final List<VisibilityBindingSet> read = pollForResults(500, 3, 3, 
interactor.fromStart(queryId));
+
+        // Show the fetched binding sets match the original, as well as their 
order.
+        assertEquals(original, read);
+    }
+
+    @Test
+    public void fromNow() throws Exception {
+        // Create an ID for the query.
+        final UUID queryId = UUID.randomUUID();
+
+        try(final Producer<?, VisibilityBindingSet> producer = makeProducer()) 
{
+            final String resultTopic = KafkaTopics.queryResultsTopic(queryId);
+
+            // Write a single visibility binding set to the query's result 
topic. This will not appear in the expected results.
+            final ValueFactory vf = new ValueFactoryImpl();
+            MapBindingSet bs = new MapBindingSet();
+            bs.addBinding("urn:name", vf.createLiteral("Alice"));
+            producer.send(new ProducerRecord<>(resultTopic, new 
VisibilityBindingSet(bs, "a|b|c")));
+            producer.flush();
+
+            // Use the interactor that is being tested to read all of the 
visibility binding sets that appear after this point.
+            final GetQueryResultStream interactor = new 
KafkaGetQueryResultStream(kafka.getKafkaHostname(), kafka.getKafkaPort());
+            try(QueryResultStream results = interactor.fromNow(queryId)) {
+                // Read results from the stream.
+                List<VisibilityBindingSet> read = new ArrayList<>();
+                for(final VisibilityBindingSet visBs : results.poll(500)) {
+                    read.add(visBs);
+                }
+
+                // Show nothing has been read.
+                assertTrue(read.isEmpty());
+
+                // Write two more entries to the result topic. These will be 
seen by the result stream.
+                final List<VisibilityBindingSet> original = new ArrayList<>();
+
+                bs = new MapBindingSet();
+                bs.addBinding("urn:name", vf.createLiteral("Bob"));
+                original.add(new VisibilityBindingSet(bs, "a"));
+
+                bs = new MapBindingSet();
+                bs.addBinding("urn:name", vf.createLiteral("Charlie"));
+                original.add(new VisibilityBindingSet(bs, "b|c"));
+
+                for(final VisibilityBindingSet visBs : original) {
+                    producer.send(new ProducerRecord<>(resultTopic, visBs));
+                }
+                producer.flush();
+
+                // Read the results from the result stream.
+                read = pollForResults(500, 3, 2, results);
+
+                // Show the new entries were read.
+                assertEquals(original, read);
+            }
+        }
+    }
+
+    @Test(expected = IllegalStateException.class)
+    public void pollClosedStream() throws Exception {
+        // Create an ID for the query.
+        final UUID queryId = UUID.randomUUID();
+
+        // Use the interactor that is being tested to create a result stream 
and immediately close it.
+        final GetQueryResultStream interactor = new 
KafkaGetQueryResultStream(kafka.getKafkaHostname(), kafka.getKafkaPort());
+        final QueryResultStream results = interactor.fromStart(queryId);
+        results.close();
+
+        // Try to poll the closed stream.
+        results.poll(1);
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/fc9775e2/test/kafka/src/main/java/org/apache/rya/test/kafka/KafkaTestInstanceRule.java
----------------------------------------------------------------------
diff --git 
a/test/kafka/src/main/java/org/apache/rya/test/kafka/KafkaTestInstanceRule.java 
b/test/kafka/src/main/java/org/apache/rya/test/kafka/KafkaTestInstanceRule.java
index 41a0a67..f76fa2b 100644
--- 
a/test/kafka/src/main/java/org/apache/rya/test/kafka/KafkaTestInstanceRule.java
+++ 
b/test/kafka/src/main/java/org/apache/rya/test/kafka/KafkaTestInstanceRule.java
@@ -103,4 +103,18 @@ public class KafkaTestInstanceRule extends 
ExternalResource {
     public Properties createBootstrapServerConfig() {
         return kafkaInstance.createBootstrapServerConfig();
     }
+
+    /**
+     * @return The hostname of the Kafka Broker.
+     */
+    public String getKafkaHostname() {
+        return kafkaInstance.getBrokerHost();
+    }
+
+    /**
+     * @return The port of the Kafka Broker.
+     */
+    public String getKafkaPort() {
+        return kafkaInstance.getBrokerPort();
+    }
 }
\ No newline at end of file

Reply via email to