Repository: incubator-rya
Updated Branches:
  refs/heads/master 529f2d595 -> b03b18938


RYA-128 closes #121; closes RYA-128 trigger service to Kafka.


Project: http://git-wip-us.apache.org/repos/asf/incubator-rya/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-rya/commit/b03b1893
Tree: http://git-wip-us.apache.org/repos/asf/incubator-rya/tree/b03b1893
Diff: http://git-wip-us.apache.org/repos/asf/incubator-rya/diff/b03b1893

Branch: refs/heads/master
Commit: b03b18938c55ff3e896b3d9969a7b5dc54753129
Parents: 529f2d5
Author: David W. Lotts <david.lo...@parsons.com>
Authored: Tue Nov 1 17:37:07 2016 -0400
Committer: pujav65 <puja...@gmail.com>
Committed: Tue Apr 4 08:55:38 2017 -0400

----------------------------------------------------------------------
 .../rya/indexing/pcj/fluo/api/CreatePcj.java    |  18 +-
 extras/rya.pcj.fluo/pcj.fluo.app/pom.xml        |  48 ++-
 .../app/export/rya/BindingSetSerializer.java    | 137 +++++++++
 .../app/export/rya/KafkaExportParameters.java   |  84 ++++++
 .../app/export/rya/KafkaResultExporter.java     |  75 +++++
 .../export/rya/KafkaResultExporterFactory.java  |  64 ++++
 .../fluo/app/observers/QueryResultObserver.java |  14 +-
 .../export/rya/KafkaExportParametersTest.java   |  97 +++++++
 .../rya.pcj.fluo/pcj.fluo.integration/pom.xml   |  34 +++
 .../apache/rya/indexing/pcj/fluo/ITBase.java    |  30 +-
 .../pcj/fluo/integration/KafkaExportIT.java     | 290 +++++++++++++++++++
 extras/rya.prospector/pom.xml                   |  39 +++
 12 files changed, 905 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/b03b1893/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/CreatePcj.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/CreatePcj.java
 
b/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/CreatePcj.java
index 6567371..d29191d 100644
--- 
a/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/CreatePcj.java
+++ 
b/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/CreatePcj.java
@@ -132,12 +132,12 @@ public class CreatePcj {
      * @throws SailException Historic PCJ results could not be loaded because 
of a problem with {@code rya}.
      * @throws QueryEvaluationException Historic PCJ results could not be 
loaded because of a problem with {@code rya}.
      */
-       public void withRyaIntegration(final String pcjId, final 
PrecomputedJoinStorage pcjStorage, final FluoClient fluo,
-                       final Connector accumulo, String ryaInstance )
-                                       throws MalformedQueryException, 
PcjException, SailException, QueryEvaluationException, RyaDAOException {
-               requireNonNull(pcjId);
-               requireNonNull(pcjStorage);
-               requireNonNull(fluo);
+    public String  withRyaIntegration(final String pcjId, final 
PrecomputedJoinStorage pcjStorage, final FluoClient fluo,
+            final Connector accumulo, String ryaInstance )
+                    throws MalformedQueryException, PcjException, 
SailException, QueryEvaluationException, RyaDAOException {
+        requireNonNull(pcjId);
+        requireNonNull(pcjStorage);
+        requireNonNull(fluo);
                requireNonNull(accumulo);
                requireNonNull(ryaInstance);
                
@@ -162,13 +162,16 @@ public class CreatePcj {
                final ParsedQuery parsedQuery = new 
SPARQLParser().parseQuery(sparql, null);
                final FluoQuery fluoQuery = new 
SparqlFluoQueryBuilder().make(parsedQuery, nodeIds);
 
+        // return queryId to the caller for later monitoring from the export.
+        String queryId = null;
+
                try (Transaction tx = fluo.newTransaction()) {
                        // Write the query's structure to Fluo.
                        new FluoQueryMetadataDAO().write(tx, fluoQuery);
 
                        // The results of the query are eventually exported to 
an instance
                        // of Rya, so store the Rya ID for the PCJ.
-                       final String queryId = 
fluoQuery.getQueryMetadata().getNodeId();
+            queryId = fluoQuery.getQueryMetadata().getNodeId();
                        tx.set(queryId, FluoQueryColumns.RYA_PCJ_ID, pcjId);
                        tx.set(pcjId, FluoQueryColumns.PCJ_ID_QUERY_ID, 
queryId);
 
@@ -206,6 +209,7 @@ public class CreatePcj {
                        writeBatch(fluo, triplesBatch);
                        triplesBatch.clear();
                }
+        return queryId;
        }
     
     

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/b03b1893/extras/rya.pcj.fluo/pcj.fluo.app/pom.xml
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/pom.xml 
b/extras/rya.pcj.fluo/pcj.fluo.app/pom.xml
index fd2e582..de51008 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.app/pom.xml
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/pom.xml
@@ -33,7 +33,9 @@ under the License.
         A Fluo implementation of Rya Precomputed Join Indexing. This module 
produces
         a jar that may be executed by the 'fluo' command line tool as a YARN 
job.
     </description>
-
+    <properties>
+        <kryo.version>3.0.3</kryo.version>
+    </properties>
     <dependencies>
         <!-- Rya Runtime Dependencies. -->
         <dependency>
@@ -62,6 +64,50 @@ under the License.
                </exclusion>
             </exclusions>
         </dependency>
+        
+        <dependency>
+          <groupId>org.apache.kafka</groupId>
+          <artifactId>kafka-clients</artifactId>
+          <version>0.10.1.0</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.kafka</groupId>
+            <artifactId>kafka_2.11</artifactId>
+            <version>0.10.1.0</version>
+            <exclusions>
+                <exclusion>
+                    <artifactId>slf4j-log4j12</artifactId>
+                    <groupId>org.slf4j</groupId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+                <dependency>
+            <groupId>com.esotericsoftware</groupId>
+            <artifactId>kryo</artifactId>
+            <version>${kryo.version}</version>
+        </dependency>
+        
+        <!-- Testing dependencies. -->
+        <dependency>
+          <groupId>org.apache.kafka</groupId>
+          <artifactId>kafka-clients</artifactId>
+          <version>0.10.1.0</version>
+          <classifier>test</classifier>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.kafka</groupId>
+            <artifactId>kafka_2.11</artifactId>
+            <version>0.10.1.0</version>
+            <classifier>test</classifier>
+<!--             <scope>test</scope> -->
+            <exclusions>
+                <exclusion>
+                    <artifactId>slf4j-log4j12</artifactId>
+                    <groupId>org.slf4j</groupId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+        
         <!-- Testing dependencies. -->
         <dependency>
             <groupId>junit</groupId>

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/b03b1893/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/BindingSetSerializer.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/BindingSetSerializer.java
 
b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/BindingSetSerializer.java
new file mode 100644
index 0000000..7b35fec
--- /dev/null
+++ 
b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/BindingSetSerializer.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.pcj.fluo.app.export.rya;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.util.ArrayList;
+import java.util.Map;
+
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.log4j.Logger;
+import org.apache.rya.api.domain.RyaType;
+import org.apache.rya.api.resolver.RdfToRyaConversions;
+import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet;
+import org.openrdf.model.URI;
+import org.openrdf.model.Value;
+import org.openrdf.model.ValueFactory;
+import org.openrdf.model.impl.URIImpl;
+import org.openrdf.model.impl.ValueFactoryImpl;
+import org.openrdf.model.vocabulary.XMLSchema;
+import org.openrdf.query.Binding;
+import org.openrdf.query.BindingSet;
+import org.openrdf.query.impl.ListBindingSet;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+
+public class BindingSetSerializer implements Serializer<VisibilityBindingSet>, 
Deserializer<VisibilityBindingSet> {
+    private static final ThreadLocal<Kryo> kryos = new ThreadLocal<Kryo>() {
+        @Override
+        protected Kryo initialValue() {
+            Kryo kryo = new Kryo();
+            return kryo;
+        };
+    };
+
+    @Override
+    public VisibilityBindingSet deserialize(String topic, byte[] data) {
+        KryoInternalSerializer internalSerializer = new 
KryoInternalSerializer();
+        Input input = new Input(new ByteArrayInputStream(data));
+        return internalSerializer.read(kryos.get(), input, 
VisibilityBindingSet.class);
+        // this is an alternative, or perhaps replace it:
+        // return (new VisibilityBindingSetStringConverter()).convert(new 
String(data, StandardCharsets.UTF_8), null);
+    }
+
+    @Override
+    public void configure(Map<String, ?> configs, boolean isKey) {
+        // Do nothing.
+    }
+
+    @Override
+    public byte[] serialize(String topic, VisibilityBindingSet data) {
+        KryoInternalSerializer internalSerializer = new 
KryoInternalSerializer();
+        ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        Output output = new Output(baos);
+        internalSerializer.write(kryos.get(), output, data);
+        output.flush();
+        byte[] array = baos.toByteArray();
+        return array;
+        // this is an alternative, or perhaps replace it:
+        // return (new VisibilityBindingSetStringConverter()).convert(data, 
null).getBytes(StandardCharsets.UTF_8);
+    }
+
+    @Override
+    public void close() {
+        // Do nothing.
+    }
+
+    private static Value makeValue(final String valueString, final URI 
typeURI) {
+        // Convert the String Value into a Value.
+        final ValueFactory valueFactory = ValueFactoryImpl.getInstance();
+        if (typeURI.equals(XMLSchema.ANYURI)) {
+            return valueFactory.createURI(valueString);
+        } else {
+            return valueFactory.createLiteral(valueString, typeURI);
+        }
+    }
+
+    /**
+     * De/Serialize a visibility binding set using the Kryo library.
+     * TODO rename this KryoSomething and change the package.
+     *
+     */
+    private static class KryoInternalSerializer extends 
com.esotericsoftware.kryo.Serializer<VisibilityBindingSet> {
+        private static final Logger log = 
Logger.getLogger(BindingSetSerializer.class);
+        @Override
+        public void write(Kryo kryo, Output output, VisibilityBindingSet 
visBindingSet) {
+            log.debug("Serializer writing visBindingSet" + visBindingSet);
+            output.writeString(visBindingSet.getVisibility());
+            // write the number count for the reader.
+            output.writeInt(visBindingSet.size());
+            for (Binding binding : visBindingSet) {
+                output.writeString(binding.getName());
+                final RyaType ryaValue = 
RdfToRyaConversions.convertValue(binding.getValue());
+                final String valueString = ryaValue.getData();
+                final URI type = ryaValue.getDataType();
+                output.writeString(valueString);
+                output.writeString(type.toString());
+            }
+        }
+
+        @Override
+        public VisibilityBindingSet read(Kryo kryo, Input input, 
Class<VisibilityBindingSet> aClass) {
+            log.debug("Serializer reading visBindingSet");
+            String visibility = input.readString();
+            int bindingCount = input.readInt();
+            ArrayList<String> namesList = new ArrayList<String>(bindingCount);
+            ArrayList<Value> valuesList = new ArrayList<Value>(bindingCount);
+            for (int i = bindingCount; i > 0; i--) {
+                namesList.add(input.readString());
+                String valueString = input.readString();
+                final URI type = new URIImpl(input.readString());
+                valuesList.add(makeValue(valueString, type));
+            }
+            BindingSet bindingSet = new ListBindingSet(namesList, valuesList);
+            return new VisibilityBindingSet(bindingSet, visibility);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/b03b1893/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/KafkaExportParameters.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/KafkaExportParameters.java
 
b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/KafkaExportParameters.java
new file mode 100644
index 0000000..3dbb1d8
--- /dev/null
+++ 
b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/KafkaExportParameters.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.pcj.fluo.app.export.rya;
+
+import java.util.Map;
+import java.util.Properties;
+
+import org.apache.fluo.api.observer.Observer;
+import org.apache.rya.indexing.pcj.fluo.app.export.ParametersBase;
+
+/**
+ * Provides read/write functions to the parameters map that is passed into an
+ * {@link Observer#init(io.fluo.api.observer.Observer.Context)} method related
+ * to PCJ exporting to a kafka topic.
+ * Remember: if doesn't count unless it is added to params
+ */
+
+public class KafkaExportParameters extends ParametersBase {
+
+    public static final String CONF_EXPORT_TO_KAFKA = 
"pcj.fluo.export.kafka.enabled";
+
+    public KafkaExportParameters(final Map<String, String> params) {
+        super(params);
+    }
+
+    /**
+     * @param isExportToKafka
+     *            - {@code True} if the Fluo application should export
+     *            to Kafka; otherwise {@code false}.
+     */
+    public void setExportToKafka(final boolean isExportToKafka) {
+        setBoolean(params, CONF_EXPORT_TO_KAFKA, isExportToKafka);
+    }
+
+    /**
+     * @return {@code True} if the Fluo application should export to Kafka; 
otherwise
+     *         {@code false}. Defaults to {@code false} if no value is present.
+     */
+    public boolean isExportToKafka() {
+        return getBoolean(params, CONF_EXPORT_TO_KAFKA, false);
+    }
+
+    /**
+     * Add the properties to the params, NOT keeping them separate from the 
other params.
+     * Guaranteed by Properties: Each key and its corresponding value in the 
property list is a string.
+     * 
+     * @param producerConfig
+     */
+    public void setProducerConfig(final Properties producerConfig) {
+        for (Object key : producerConfig.keySet().toArray()) {
+            Object value = producerConfig.getProperty(key.toString());
+            this.params.put(key.toString(), value.toString());
+        }
+    }
+
+    /**
+     * @return all the params (not just kafka producer Configuration) as a 
{@link Properties}
+     */
+    public Properties getProducerConfig() {
+        Properties props = new Properties();
+        for (Object key : params.keySet().toArray()) {
+            Object value = params.get(key.toString());
+            props.put(key.toString(), value.toString());
+        }
+        return props;
+    }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/b03b1893/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/KafkaResultExporter.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/KafkaResultExporter.java
 
b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/KafkaResultExporter.java
new file mode 100644
index 0000000..362efa7
--- /dev/null
+++ 
b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/KafkaResultExporter.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.pcj.fluo.app.export.rya;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import org.apache.fluo.api.client.TransactionBase;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.log4j.Logger;
+import org.apache.rya.indexing.pcj.fluo.app.export.IncrementalResultExporter;
+import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns;
+import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet;
+
+/**
+ * Incrementally exports SPARQL query results to Kafka topics.
+ */
+public class KafkaResultExporter implements IncrementalResultExporter {
+    private final KafkaProducer<String, VisibilityBindingSet> producer;
+    private static final Logger log = 
Logger.getLogger(KafkaResultExporter.class);
+
+    /**
+     * Constructs an instance given a Kafka producer.
+     * 
+     * @param producer
+     *            for sending result set alerts to a broker. (not null)
+     *            created and configured by {@link KafkaResultExporterFactory}
+     */
+    public KafkaResultExporter(KafkaProducer<String, VisibilityBindingSet> 
producer) {
+        super();
+        checkNotNull(producer, "Producer is required.");
+        this.producer = producer;
+    }
+
+    /**
+     * Send the results to the topic using the queryID as the topicname
+     */
+    @Override
+    public void export(final TransactionBase fluoTx, final String queryId, 
final VisibilityBindingSet result) throws ResultExportException {
+        checkNotNull(fluoTx);
+        checkNotNull(queryId);
+        checkNotNull(result);
+        try {
+            final String pcjId = fluoTx.gets(queryId, 
FluoQueryColumns.RYA_PCJ_ID);
+            String msg = "out to kafta topic: queryId=" + queryId + " pcjId=" 
+ pcjId + " result=" + result;
+            log.info(msg);
+
+            // Send result on topic
+            ProducerRecord<String, VisibilityBindingSet> rec = new 
ProducerRecord<String, VisibilityBindingSet>(/* topicname= */ queryId, /* 
value= */ result);
+            // Can add a key if you need to:
+            // ProducerRecord(String topic, K key, V value)
+            producer.send(rec);
+            log.debug("producer.send(rec) completed");
+
+        } catch (final Throwable e) {
+            throw new ResultExportException("A result could not be exported to 
Kafka.", e);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/b03b1893/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/KafkaResultExporterFactory.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/KafkaResultExporterFactory.java
 
b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/KafkaResultExporterFactory.java
new file mode 100644
index 0000000..9418720
--- /dev/null
+++ 
b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/KafkaResultExporterFactory.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.pcj.fluo.app.export.rya;
+
+import org.apache.fluo.api.observer.Observer.Context;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.log4j.Logger;
+import org.apache.rya.indexing.pcj.fluo.app.export.IncrementalResultExporter;
+import 
org.apache.rya.indexing.pcj.fluo.app.export.IncrementalResultExporterFactory;
+import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet;
+
+import com.google.common.base.Optional;
+
+/**
+ * Creates instances of {@link KafkaResultExporter}.
+ * <p/>
+ * Configure a Kafka producer by adding several required Key/values as 
described here:
+ * http://kafka.apache.org/documentation.html#producerconfigs
+ * <p/>
+ * Here is a simple example:
+ * <pre>
+ *     Properties producerConfig = new Properties();
+ *     producerConfig.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
"localhost:9092");
+ *     producerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
"org.apache.kafka.common.serialization.ByteArraySerializer");
+ *     producerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
"org.apache.kafka.common.serialization.StringSerializer");
+ * </pre>
+ * 
+ * @see ProducerConfig
+ */
+public class KafkaResultExporterFactory implements 
IncrementalResultExporterFactory {
+    private static final Logger log = 
Logger.getLogger(KafkaResultExporterFactory.class);
+    @Override
+    public Optional<IncrementalResultExporter> build(Context context) throws 
IncrementalExporterFactoryException, ConfigurationException {
+        final KafkaExportParameters exportParams = new 
KafkaExportParameters(context.getObserverConfiguration().toMap());
+        log.debug("KafkaResultExporterFactory.build(): 
params.isExportToKafka()=" + exportParams.isExportToKafka());
+        if (exportParams.isExportToKafka()) {
+            // Setup Kafka connection
+            KafkaProducer<String, VisibilityBindingSet> producer = new 
KafkaProducer<String, VisibilityBindingSet>(exportParams.getProducerConfig());
+            // Create the exporter
+            final IncrementalResultExporter exporter = new 
KafkaResultExporter(producer);
+            return Optional.of(exporter);
+        } else {
+            return Optional.absent();
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/b03b1893/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/QueryResultObserver.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/QueryResultObserver.java
 
b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/QueryResultObserver.java
index bbca128..a8fc6d9 100644
--- 
a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/QueryResultObserver.java
+++ 
b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/QueryResultObserver.java
@@ -23,11 +23,17 @@ import static 
org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.NO
 import java.util.HashMap;
 import java.util.Map;
 
+import org.apache.fluo.api.client.TransactionBase;
+import org.apache.fluo.api.data.Bytes;
+import org.apache.fluo.api.data.Column;
+import org.apache.fluo.api.observer.AbstractObserver;
 import org.apache.log4j.Logger;
+import org.apache.rya.accumulo.utils.VisibilitySimplifier;
 import org.apache.rya.indexing.pcj.fluo.app.export.IncrementalResultExporter;
 import 
org.apache.rya.indexing.pcj.fluo.app.export.IncrementalResultExporter.ResultExportException;
 import 
org.apache.rya.indexing.pcj.fluo.app.export.IncrementalResultExporterFactory;
 import 
org.apache.rya.indexing.pcj.fluo.app.export.IncrementalResultExporterFactory.IncrementalExporterFactoryException;
+import 
org.apache.rya.indexing.pcj.fluo.app.export.rya.KafkaResultExporterFactory;
 import 
org.apache.rya.indexing.pcj.fluo.app.export.rya.RyaResultExporterFactory;
 import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns;
 import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryMetadataDAO;
@@ -38,11 +44,6 @@ import 
org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSetStringCo
 
 import com.google.common.base.Optional;
 import com.google.common.collect.ImmutableSet;
-import org.apache.fluo.api.client.TransactionBase;
-import org.apache.fluo.api.data.Bytes;
-import org.apache.fluo.api.data.Column;
-import org.apache.fluo.api.observer.AbstractObserver;
-import org.apache.rya.accumulo.utils.VisibilitySimplifier;
 
 /**
  * Performs incremental result exporting to the configured destinations.
@@ -69,6 +70,7 @@ public class QueryResultObserver extends AbstractObserver {
     private static final ImmutableSet<IncrementalResultExporterFactory> 
factories =
             ImmutableSet.<IncrementalResultExporterFactory>builder()
                 .add(new RyaResultExporterFactory())
+                .add(new KafkaResultExporterFactory())
                 .build();
 
     /**
@@ -90,6 +92,8 @@ public class QueryResultObserver extends AbstractObserver {
 
         for(final IncrementalResultExporterFactory builder : factories) {
             try {
+                log.debug("QueryResultObserver.init(): for each 
exportersBuilder=" + builder);
+
                 final Optional<IncrementalResultExporter> exporter = 
builder.build(context);
                 if(exporter.isPresent()) {
                     exportersBuilder.add(exporter.get());

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/b03b1893/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/KafkaExportParametersTest.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/KafkaExportParametersTest.java
 
b/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/KafkaExportParametersTest.java
new file mode 100644
index 0000000..1e5adbf
--- /dev/null
+++ 
b/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/KafkaExportParametersTest.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.pcj.fluo.app.export.rya;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+
+import org.junit.Test;
+
+/**
+ * Tests the methods of {@link KafkaExportParameters}.
+ */
+public class KafkaExportParametersTest {
+
+    @Test
+    public void writeParams() {
+        final Map<String, String> params = new HashMap<>();
+
+        // Load some values into the params using the wrapper.
+        final KafkaExportParameters kafkaParams = new 
KafkaExportParameters(params);
+        kafkaParams.setExportToKafka(true);
+
+        // Ensure the params map has the expected values.
+        final Map<String, String> expectedParams = new HashMap<>();
+        expectedParams.put(KafkaExportParameters.CONF_EXPORT_TO_KAFKA, "true");
+        assertTrue(kafkaParams.isExportToKafka());
+        assertEquals(expectedParams, params);
+
+        // now go the other way.
+        expectedParams.put(KafkaExportParameters.CONF_EXPORT_TO_KAFKA, 
"false");
+        kafkaParams.setExportToKafka(false);
+        assertFalse(kafkaParams.isExportToKafka());
+        assertEquals(expectedParams, params);
+    }
+    @Test
+    public void writeParamsProps() {
+        final String key1 = "key1";
+        final String value1First = "value1-preserve-this";
+        final String value1Second = "value1prop";
+        final String key2 = "歌古事学週文原問業間革社。"; // 
http://generator.lorem-ipsum.info/_chinese
+        final String value2 = "良治鮮猿性社費著併病極験。";
+
+        final Map<String, String> params = new HashMap<>();
+        // Make sure export key1 is NOT kept separate from producer config key1
+        // This is a change, originally they were kept separate.
+        params.put(key1, value1First);
+        final KafkaExportParameters kafkaParams = new 
KafkaExportParameters(params);
+        // Load some values into the properties using the wrapper.
+        Properties props = new Properties();
+        props.put(key1, value1Second);
+        props.put(key2, value2);
+        kafkaParams.setProducerConfig(props);
+        Properties propsAfter = kafkaParams.getProducerConfig();
+        assertEquals(props, propsAfter);
+        assertEquals(params, params);
+        assertEquals("Should change identical parameters key", 
params.get(key1), value1Second);
+        assertEquals("Props should have params's key", propsAfter.get(key1), 
value1Second);
+        assertNotNull("Should have props key", params.get(key2));
+    }
+
+    @Test
+    public void notConfigured() {
+        final Map<String, String> params = new HashMap<>();
+
+        // Ensure an unconfigured parameters map will say kafka export is 
disabled.
+        final KafkaExportParameters kafkaParams = new 
KafkaExportParameters(params);
+        assertFalse(kafkaParams.isExportToKafka());
+    }
+
+    @Test
+    public void testKafkaResultExporterFactory() {
+        KafkaResultExporterFactory factory = new KafkaResultExporterFactory();
+        assertNotNull(factory);
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/b03b1893/extras/rya.pcj.fluo/pcj.fluo.integration/pom.xml
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.integration/pom.xml 
b/extras/rya.pcj.fluo/pcj.fluo.integration/pom.xml
index 6bb7105..b7adad6 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.integration/pom.xml
+++ b/extras/rya.pcj.fluo/pcj.fluo.integration/pom.xml
@@ -56,5 +56,39 @@
             <groupId>org.apache.fluo</groupId>
             <artifactId>fluo-api</artifactId>
         </dependency>
+
+        <dependency>
+          <groupId>org.apache.kafka</groupId>
+          <artifactId>kafka-clients</artifactId>
+          <version>0.10.1.0</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.kafka</groupId>
+            <artifactId>kafka_2.11</artifactId>
+            <version>0.10.1.0</version>
+            <exclusions>
+                <exclusion>
+                    <artifactId>slf4j-log4j12</artifactId>
+                    <groupId>org.slf4j</groupId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+        <!-- Testing dependencies. -->
+        <dependency>
+            <groupId>org.apache.kafka</groupId>
+            <artifactId>kafka_2.11</artifactId>
+            <version>0.10.1.0</version>
+            <classifier>test</classifier>
+<!--             <scope>test</scope> -->
+            <exclusions>
+                <exclusion>
+                    <artifactId>slf4j-log4j12</artifactId>
+                    <groupId>org.slf4j</groupId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+
+
+
     </dependencies>
 </project>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/b03b1893/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/ITBase.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/ITBase.java
 
b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/ITBase.java
index 293426f..fa9a10e 100644
--- 
a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/ITBase.java
+++ 
b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/ITBase.java
@@ -390,11 +390,6 @@ public abstract class ITBase {
         return conf;
     }
 
-    /**
-     * Setup a Mini Fluo cluster that uses a temporary directory to store its 
data.
-     *
-     * @return A Mini Fluo cluster.
-     */
     protected MiniFluo startMiniFluo() throws AlreadyInitializedException, 
TableExistsException {
         // Setup the observers that will be used by the Fluo PCJ Application.
         final List<ObserverSpecification> observers = new ArrayList<>();
@@ -403,14 +398,9 @@ public abstract class ITBase {
         observers.add(new ObserverSpecification(JoinObserver.class.getName()));
         observers.add(new 
ObserverSpecification(FilterObserver.class.getName()));
 
+        // Set export details for exporting from Fluo to a Rya repository and 
a subscriber queue.
         final HashMap<String, String> exportParams = new HashMap<>();
-        final RyaExportParameters ryaParams = new 
RyaExportParameters(exportParams);
-        ryaParams.setExportToRya(true);
-        ryaParams.setRyaInstanceName(RYA_INSTANCE_NAME);
-        ryaParams.setAccumuloInstanceName(instanceName);
-        ryaParams.setZookeeperServers(zookeepers);
-        ryaParams.setExporterUsername(ITBase.ACCUMULO_USER);
-        ryaParams.setExporterPassword(ITBase.ACCUMULO_PASSWORD);
+        setExportParameters(exportParams);
         
         // Configure the export observer to export new PCJ results to the mini 
accumulo cluster.
         final ObserverSpecification exportObserverConfig = new 
ObserverSpecification(QueryResultObserver.class.getName(), exportParams);
@@ -433,4 +423,20 @@ public abstract class ITBase {
         FluoFactory.newAdmin(config).initialize(new 
FluoAdmin.InitializationOptions().setClearTable(true).setClearZookeeper(true) );
         return FluoFactory.newMiniFluo(config);
     }
+
+    /**
+     * Set export details for exporting from Fluo to a Rya repository and a 
subscriber queue.
+     * Override this if you have custom export destinations.
+     * 
+     * @param exportParams
+     */
+    protected void setExportParameters(final HashMap<String, String> 
exportParams) {
+        final RyaExportParameters ryaParams = new 
RyaExportParameters(exportParams);
+        ryaParams.setExportToRya(true);
+        ryaParams.setRyaInstanceName(RYA_INSTANCE_NAME);
+        ryaParams.setAccumuloInstanceName(instanceName);
+        ryaParams.setZookeeperServers(zookeepers);
+        ryaParams.setExporterUsername(ITBase.ACCUMULO_USER);
+        ryaParams.setExporterPassword(ITBase.ACCUMULO_PASSWORD);
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/b03b1893/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/KafkaExportIT.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/KafkaExportIT.java
 
b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/KafkaExportIT.java
new file mode 100644
index 0000000..10d2530
--- /dev/null
+++ 
b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/KafkaExportIT.java
@@ -0,0 +1,290 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.pcj.fluo.integration;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Properties;
+import java.util.Set;
+
+import org.I0Itec.zkclient.ZkClient;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.rya.api.domain.RyaStatement;
+import org.apache.rya.indexing.pcj.fluo.ITBase;
+import org.apache.rya.indexing.pcj.fluo.api.CreatePcj;
+import org.apache.rya.indexing.pcj.fluo.api.InsertTriples;
+import org.apache.rya.indexing.pcj.fluo.app.export.rya.KafkaExportParameters;
+import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage;
+import org.apache.rya.indexing.pcj.storage.accumulo.AccumuloPcjStorage;
+import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet;
+import org.junit.Test;
+import org.openrdf.model.impl.URIImpl;
+import org.openrdf.query.Binding;
+import org.openrdf.query.BindingSet;
+import org.openrdf.query.impl.BindingImpl;
+
+import com.google.common.base.Optional;
+import com.google.common.collect.Sets;
+
+import kafka.admin.AdminUtils;
+import kafka.admin.RackAwareMode;
+import kafka.server.KafkaConfig;
+import kafka.server.KafkaServer;
+import kafka.utils.MockTime;
+import kafka.utils.TestUtils;
+import kafka.utils.Time;
+import kafka.utils.ZKStringSerializer$;
+import kafka.utils.ZkUtils;
+import kafka.zk.EmbeddedZookeeper;
+
+/**
+ * Performs integration tests over the Fluo application geared towards Kafka 
PCJ exporting.
+ * <p>
+ * These tests might be ignored so that they will not run as unit tests while 
building the application.
+ * Run this test from Maven command line:
+ * $ cd rya/extras/rya.pcj.fluo/pcj.fluo.integration
+ * $ mvn surefire:test -Dtest=KafkaExportIT
+ */
+public class KafkaExportIT extends ITBase {
+
+    private static final String ZKHOST = "127.0.0.1";
+    private static final String BROKERHOST = "127.0.0.1";
+    private static final String BROKERPORT = "9092";
+    private static final String TOPIC = "testTopic";
+    private ZkUtils zkUtils;
+    private KafkaServer kafkaServer;
+    private EmbeddedZookeeper zkServer;
+    private ZkClient zkClient;
+
+
+        /**
+     * setup mini kafka and call the super to setup mini fluo
+     * 
+     * @see org.apache.rya.indexing.pcj.fluo.ITBase#setupMiniResources()
+     */
+    @Override
+    public void setupMiniResources() throws Exception {
+        super.setupMiniResources();
+
+        zkServer = new EmbeddedZookeeper();
+        String zkConnect = ZKHOST + ":" + zkServer.port();
+        zkClient = new ZkClient(zkConnect, 30000, 30000, 
ZKStringSerializer$.MODULE$);
+        zkUtils = ZkUtils.apply(zkClient, false);
+
+        // setup Broker
+        Properties brokerProps = new Properties();
+        brokerProps.setProperty("zookeeper.connect", zkConnect);
+        brokerProps.setProperty("broker.id", "0");
+        brokerProps.setProperty("log.dirs", 
Files.createTempDirectory("kafka-").toAbsolutePath().toString());
+        brokerProps.setProperty("listeners", "PLAINTEXT://" + BROKERHOST + ":" 
+ BROKERPORT);
+        KafkaConfig config = new KafkaConfig(brokerProps);
+        Time mock = new MockTime();
+        kafkaServer = TestUtils.createServer(config, mock);
+
+        System.out.println("setup kafka and fluo.");
+    }
+
+    /**
+     * Test kafka without rya code to make sure kafka works in this 
environment.
+     * If this test fails then its a testing environment issue, not with Rya.
+     * Source: https://github.com/asmaier/mini-kafka
+     * 
+     * @throws InterruptedException
+     * @throws IOException
+     */
+        @Test
+        public void embeddedKafkaTest() throws InterruptedException, 
IOException {
+
+            // create topic
+            AdminUtils.createTopic(zkUtils, TOPIC, 1, 1, new Properties(), 
RackAwareMode.Disabled$.MODULE$);
+
+            // setup producer
+            Properties producerProps = new Properties();
+            producerProps.setProperty("bootstrap.servers", BROKERHOST + ":" + 
BROKERPORT);
+            
producerProps.setProperty("key.serializer","org.apache.kafka.common.serialization.IntegerSerializer");
+            producerProps.setProperty("value.serializer", 
"org.apache.kafka.common.serialization.ByteArraySerializer");
+            KafkaProducer<Integer, byte[]> producer = new 
KafkaProducer<Integer, byte[]>(producerProps);
+
+            // setup consumer
+            Properties consumerProps = new Properties();
+            consumerProps.setProperty("bootstrap.servers", BROKERHOST + ":" + 
BROKERPORT);
+            consumerProps.setProperty("group.id", "group0");
+            consumerProps.setProperty("client.id", "consumer0");
+            
consumerProps.setProperty("key.deserializer","org.apache.kafka.common.serialization.IntegerDeserializer");
+            consumerProps.setProperty("value.deserializer", 
"org.apache.kafka.common.serialization.ByteArrayDeserializer");
+            consumerProps.put("auto.offset.reset", "earliest");  // to make 
sure the consumer starts from the beginning of the topic
+            KafkaConsumer<Integer, byte[]> consumer = new 
KafkaConsumer<>(consumerProps);
+            consumer.subscribe(Arrays.asList(TOPIC));
+
+            // send message
+            ProducerRecord<Integer, byte[]> data = new ProducerRecord<>(TOPIC, 
42, "test-message".getBytes(StandardCharsets.UTF_8));
+            producer.send(data);
+            producer.close();
+
+            // starting consumer
+        ConsumerRecords<Integer, byte[]> records = consumer.poll(3000);
+            assertEquals(1, records.count());
+            Iterator<ConsumerRecord<Integer, byte[]>> recordIterator = 
records.iterator();
+            ConsumerRecord<Integer, byte[]> record = recordIterator.next();
+            System.out.printf("offset = %d, key = %s, value = %s", 
record.offset(), record.key(), record.value());
+            assertEquals(42, (int) record.key());
+            assertEquals("test-message", new String(record.value(), 
StandardCharsets.UTF_8));
+        consumer.close();
+    }
+
+    @Test
+    public void newResultsExportedTest() throws Exception {
+        final String sparql = "SELECT ?customer ?worker ?city " + "{ " + 
"FILTER(?customer = <http://Alice>) " + "FILTER(?city = <http://London>) " + 
"?customer <http://talksTo> ?worker. " + "?worker <http://livesIn> ?city. " + 
"?worker <http://worksAt> <http://Chipotle>. " + "}";
+    
+        // Triples that will be streamed into Fluo after the PCJ has been 
created.
+        final Set<RyaStatement> streamedTriples = 
Sets.newHashSet(makeRyaStatement("http://Alice";, "http://talksTo";, 
"http://Bob";), makeRyaStatement("http://Bob";, "http://livesIn";, 
"http://London";), makeRyaStatement("http://Bob";, "http://worksAt";, 
"http://Chipotle";),
+                        makeRyaStatement("http://Alice";, "http://talksTo";, 
"http://Charlie";), makeRyaStatement("http://Charlie";, "http://livesIn";, 
"http://London";), makeRyaStatement("http://Charlie";, "http://worksAt";, 
"http://Chipotle";),
+                        makeRyaStatement("http://Alice";, "http://talksTo";, 
"http://David";), makeRyaStatement("http://David";, "http://livesIn";, 
"http://London";), makeRyaStatement("http://David";, "http://worksAt";, 
"http://Chipotle";),
+                        makeRyaStatement("http://Alice";, "http://talksTo";, 
"http://Eve";), makeRyaStatement("http://Eve";, "http://livesIn";, 
"http://Leeds";), makeRyaStatement("http://Eve";, "http://worksAt";, 
"http://Chipotle";),
+                        makeRyaStatement("http://Frank";, "http://talksTo";, 
"http://Alice";), makeRyaStatement("http://Frank";, "http://livesIn";, 
"http://London";), makeRyaStatement("http://Frank";, "http://worksAt";, 
"http://Chipotle";));
+    
+        // The expected results of the SPARQL query once the PCJ has been 
computed.
+        final Set<BindingSet> expected = new HashSet<>();
+        expected.add(makeBindingSet(new BindingImpl("customer", new 
URIImpl("http://Alice";)), new BindingImpl("worker", new URIImpl("http://Bob";)), 
new BindingImpl("city", new URIImpl("http://London";))));
+        expected.add(makeBindingSet(new BindingImpl("customer", new 
URIImpl("http://Alice";)), new BindingImpl("worker", new 
URIImpl("http://Charlie";)), new BindingImpl("city", new 
URIImpl("http://London";))));
+        expected.add(makeBindingSet(new BindingImpl("customer", new 
URIImpl("http://Alice";)), new BindingImpl("worker", new 
URIImpl("http://David";)), new BindingImpl("city", new 
URIImpl("http://London";))));
+    
+        // Create the PCJ table.
+        final PrecomputedJoinStorage pcjStorage = new 
AccumuloPcjStorage(accumuloConn, RYA_INSTANCE_NAME);
+        final String pcjId = pcjStorage.createPcj(sparql);
+    
+        // Tell the Fluo app to maintain the PCJ.
+        CreatePcj createPcj = new CreatePcj();
+        String QueryIdIsTopicName = createPcj.withRyaIntegration(pcjId, 
pcjStorage, fluoClient, accumuloConn, RYA_INSTANCE_NAME);
+
+        // Stream the data into Fluo.
+        new InsertTriples().insert(fluoClient, streamedTriples, 
Optional.<String> absent());
+    
+        // Fetch the exported results from Accumulo once the observers finish 
working.
+        fluo.waitForObservers();
+
+        /// KafkaConsumer<Integer, byte[]> consumer = 
makeConsumer(QueryIdIsTopicName);
+        KafkaConsumer<Integer, VisibilityBindingSet> consumer = 
makeConsumer(QueryIdIsTopicName);
+
+        // starting consumer polling for messages
+        /// ConsumerRecords<Integer, byte[]> records = consumer.poll(3000);
+        ConsumerRecords<Integer, VisibilityBindingSet> records = 
consumer.poll(3000);
+        /// Iterator<ConsumerRecord<Integer, byte[]>> recordIterator = 
records.iterator();
+        Iterator<ConsumerRecord<Integer, VisibilityBindingSet>> recordIterator 
= records.iterator();
+        boolean allExpected = true;
+        ConsumerRecord<Integer, VisibilityBindingSet> unexpectedRecord = null;
+        while (recordIterator.hasNext()) {
+            ConsumerRecord<Integer, VisibilityBindingSet> record = 
recordIterator.next();
+            System.out.printf("Consumed offset = %d, key = %s, value = %s", 
record.offset(), record.key(), record.value().toString());
+            boolean expectedThis = expected.contains(record.value());
+            if (!expectedThis) {
+                System.out.println("This consumed record is not expected.");
+                unexpectedRecord = record;
+            }
+            allExpected = allExpected && expectedThis;
+        }
+        assertTrue("Must consume expected record: not expected:" + 
unexpectedRecord, allExpected);
+        assertNotEquals("Should get some results", 0, records.count());
+        // assertEquals(42, (int) record.key());
+        // assertEquals("test-message", new String(record.value(), 
StandardCharsets.UTF_8));
+
+    }
+
+    /**
+     * A helper function for creating a {@link BindingSet} from an array of
+     * {@link Binding}s.
+     *
+     * @param bindings
+     *            - The bindings to include in the set. (not null)
+     * @return A {@link BindingSet} holding the bindings.
+     */
+    protected static BindingSet makeBindingSet(final Binding... bindings) {
+        return new VisibilityBindingSet(ITBase.makeBindingSet(bindings));
+    }
+
+    /**
+     * @param TopicName
+     * @return
+     */
+    protected KafkaConsumer<Integer, VisibilityBindingSet> makeConsumer(String 
TopicName) {
+        // setup consumer
+        Properties consumerProps = new Properties();
+        consumerProps.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 
BROKERHOST + ":" + BROKERPORT);
+        consumerProps.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "group0");
+        consumerProps.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, 
"consumer0");
+        
consumerProps.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
"org.apache.kafka.common.serialization.IntegerDeserializer");
+        
consumerProps.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
"org.apache.rya.indexing.pcj.fluo.app.export.rya.BindingSetSerializer");
+        // "org.apache.kafka.common.serialization.ByteArrayDeserializer");
+        consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, 
"earliest"); // to make sure the consumer starts from the beginning of the topic
+        /// KafkaConsumer<Integer, byte[]> consumer = new 
KafkaConsumer<>(consumerProps);
+        KafkaConsumer<Integer, VisibilityBindingSet> consumer = new 
KafkaConsumer<>(consumerProps);
+        consumer.subscribe(Arrays.asList(TopicName));
+        return consumer;
+    }
+
+    /**
+     * Add info about the kafka queue/topic to receive the export.
+     * Call super to get the Rya parameters.
+     * 
+     * @see 
org.apache.rya.indexing.pcj.fluo.ITBase#setExportParameters(java.util.HashMap)
+     */
+    @Override
+    protected void setExportParameters(HashMap<String, String> exportParams) {
+        // Get the defaults
+        super.setExportParameters(exportParams);
+        // Add the kafka parameters
+        final KafkaExportParameters kafkaParams = new 
KafkaExportParameters(exportParams);
+        kafkaParams.setExportToKafka(true);
+        // Configure the Producer
+        Properties producerConfig = new Properties();
+        producerConfig.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BROKERHOST 
+ ":" + BROKERPORT);
+        producerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
"org.apache.kafka.common.serialization.ByteArraySerializer");
+        producerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
"org.apache.rya.indexing.pcj.fluo.app.export.rya.BindingSetSerializer");
+        // "org.apache.kafka.common.serialization.StringSerializer");
+        kafkaParams.setProducerConfig(producerConfig);
+    }
+
+    /**
+     * Close all the Kafka mini server and mini-zookeeper
+     * 
+     * @see org.apache.rya.indexing.pcj.fluo.ITBase#shutdownMiniResources()
+     */
+    @Override
+    public void shutdownMiniResources() {
+        super.shutdownMiniResources();
+        kafkaServer.shutdown();
+        zkClient.close();
+        zkServer.shutdown();
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/b03b1893/extras/rya.prospector/pom.xml
----------------------------------------------------------------------
diff --git a/extras/rya.prospector/pom.xml b/extras/rya.prospector/pom.xml
index 0a3b6cf..952ab94 100644
--- a/extras/rya.prospector/pom.xml
+++ b/extras/rya.prospector/pom.xml
@@ -75,6 +75,45 @@ under the License.
                         </excludes>
                     </configuration>
                 </plugin>
+                <!--This plugin's configuration is used to store Eclipse m2e 
settings only. It has no influence on the Maven build itself.-->
+                <plugin>
+                    <groupId>org.eclipse.m2e</groupId>
+                    <artifactId>lifecycle-mapping</artifactId>
+                    <version>1.0.0</version>
+                    <configuration>
+                        <lifecycleMappingMetadata>
+                            <pluginExecutions>
+                                <pluginExecution>
+                                    <pluginExecutionFilter>
+                                        
<groupId>org.apache.maven.plugins</groupId>
+                                        
<artifactId>maven-compiler-plugin</artifactId>
+                                        <versionRange>[3.2,)</versionRange>
+                                        <goals>
+                                            <goal>compile</goal>
+                                            <goal>testCompile</goal>
+                                        </goals>
+                                    </pluginExecutionFilter>
+                                    <action>
+                                        <ignore></ignore>
+                                    </action>
+                                </pluginExecution>
+                                <pluginExecution>
+                                    <pluginExecutionFilter>
+                                        <groupId>org.codehaus.groovy</groupId>
+                                        
<artifactId>groovy-eclipse-compiler</artifactId>
+                                        
<versionRange>[2.9.1-01,)</versionRange>
+                                        <goals>
+                                            <goal>add-groovy-build-paths</goal>
+                                        </goals>
+                                    </pluginExecutionFilter>
+                                    <action>
+                                        <ignore></ignore>
+                                    </action>
+                                </pluginExecution>
+                            </pluginExecutions>
+                        </lifecycleMappingMetadata>
+                    </configuration>
+                </plugin>
             </plugins>
         </pluginManagement>
         <plugins>

Reply via email to