Repository: incubator-rya
Updated Branches:
  refs/heads/master fe7ca5d66 -> c734a4c16


RYA-128 Review issues fixed. Closes #150


Project: http://git-wip-us.apache.org/repos/asf/incubator-rya/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-rya/commit/c734a4c1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-rya/tree/c734a4c1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-rya/diff/c734a4c1

Branch: refs/heads/master
Commit: c734a4c161be8fb489c692d47775f3dd313f948f
Parents: fe7ca5d
Author: David W. Lotts <david.lo...@parsons.com>
Authored: Tue Apr 4 17:14:21 2017 -0400
Committer: David W. Lotts <david.lo...@parsons.com>
Committed: Thu Apr 6 17:43:09 2017 -0400

----------------------------------------------------------------------
 extras/rya.pcj.fluo/pcj.fluo.app/pom.xml        |   1 -
 .../app/export/kafka/KafkaExportParameters.java |  86 ++++++++++
 .../app/export/kafka/KafkaResultExporter.java   |  75 +++++++++
 .../kafka/KafkaResultExporterFactory.java       |  64 ++++++++
 .../KryoVisibilityBindingSetSerializer.java     | 164 +++++++++++++++++++
 .../app/export/rya/BindingSetSerializer.java    | 137 ----------------
 .../app/export/rya/KafkaExportParameters.java   |  84 ----------
 .../app/export/rya/KafkaResultExporter.java     |  75 ---------
 .../export/rya/KafkaResultExporterFactory.java  |  64 --------
 .../fluo/app/observers/QueryResultObserver.java |   2 +-
 .../export/rya/KafkaExportParametersTest.java   |   6 +-
 .../rya.pcj.fluo/pcj.fluo.integration/pom.xml   |   1 -
 .../pcj/fluo/integration/KafkaExportIT.java     |  20 ++-
 pom.xml                                         |  18 ++
 14 files changed, 423 insertions(+), 374 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/c734a4c1/extras/rya.pcj.fluo/pcj.fluo.app/pom.xml
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/pom.xml 
b/extras/rya.pcj.fluo/pcj.fluo.app/pom.xml
index de51008..343713c 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.app/pom.xml
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/pom.xml
@@ -99,7 +99,6 @@ under the License.
             <artifactId>kafka_2.11</artifactId>
             <version>0.10.1.0</version>
             <classifier>test</classifier>
-<!--             <scope>test</scope> -->
             <exclusions>
                 <exclusion>
                     <artifactId>slf4j-log4j12</artifactId>

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/c734a4c1/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/kafka/KafkaExportParameters.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/kafka/KafkaExportParameters.java
 
b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/kafka/KafkaExportParameters.java
new file mode 100644
index 0000000..347a2e2
--- /dev/null
+++ 
b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/kafka/KafkaExportParameters.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.pcj.fluo.app.export.kafka;
+
+import java.util.Map;
+import java.util.Properties;
+
+import org.apache.fluo.api.observer.Observer;
+import org.apache.rya.indexing.pcj.fluo.app.export.ParametersBase;
+
+/**
+ * Provides read/write functions to the parameters map that is passed into an
+ * {@link Observer#init(io.fluo.api.observer.Observer.Context)} method related
+ * to PCJ exporting to a kafka topic.
+ * Remember: if doesn't count unless it is added to params
+ */
+
+public class KafkaExportParameters extends ParametersBase {
+
+    public static final String CONF_EXPORT_TO_KAFKA = 
"pcj.fluo.export.kafka.enabled";
+
+    public KafkaExportParameters(final Map<String, String> params) {
+        super(params);
+    }
+
+    /**
+     * @param isExportToKafka
+     *            - {@code True} if the Fluo application should export
+     *            to Kafka; otherwise {@code false}.
+     */
+    public void setExportToKafka(final boolean isExportToKafka) {
+        setBoolean(params, CONF_EXPORT_TO_KAFKA, isExportToKafka);
+    }
+
+    /**
+     * @return {@code True} if the Fluo application should export to Kafka; 
otherwise
+     *         {@code false}. Defaults to {@code false} if no value is present.
+     */
+    public boolean isExportToKafka() {
+        return getBoolean(params, CONF_EXPORT_TO_KAFKA, false);
+    }
+
+    /**
+     * Add the properties to the params, NOT keeping them separate from the 
other params.
+     * Guaranteed by Properties: Each key and its corresponding value in the 
property list is a string.
+     * 
+     * @param producerConfig
+     */
+    public void addAllProducerConfig(final Properties producerConfig) {
+        for (Object key : producerConfig.keySet().toArray()) {
+            Object value = producerConfig.getProperty(key.toString());
+            this.params.put(key.toString(), value.toString());
+        }
+    }
+
+    /**
+     * Collect all the properties
+     * 
+     * @return all the params (not just kafka producer Configuration) as a 
{@link Properties}
+     */
+    public Properties listAllConfig() {
+        Properties props = new Properties();
+        for (Object key : params.keySet().toArray()) {
+            Object value = params.get(key.toString());
+            props.put(key.toString(), value.toString());
+        }
+        return props;
+    }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/c734a4c1/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/kafka/KafkaResultExporter.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/kafka/KafkaResultExporter.java
 
b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/kafka/KafkaResultExporter.java
new file mode 100644
index 0000000..c40c5da
--- /dev/null
+++ 
b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/kafka/KafkaResultExporter.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.pcj.fluo.app.export.kafka;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import org.apache.fluo.api.client.TransactionBase;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.log4j.Logger;
+import org.apache.rya.indexing.pcj.fluo.app.export.IncrementalResultExporter;
+import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns;
+import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet;
+
+/**
+ * Incrementally exports SPARQL query results to Kafka topics.
+ */
+public class KafkaResultExporter implements IncrementalResultExporter {
+    private final KafkaProducer<String, VisibilityBindingSet> producer;
+    private static final Logger log = 
Logger.getLogger(KafkaResultExporter.class);
+
+    /**
+     * Constructs an instance given a Kafka producer.
+     * 
+     * @param producer
+     *            for sending result set alerts to a broker. (not null)
+     *            Can be created and configured by {@link 
KafkaResultExporterFactory}
+     */
+    public KafkaResultExporter(KafkaProducer<String, VisibilityBindingSet> 
producer) {
+        super();
+        checkNotNull(producer, "Producer is required.");
+        this.producer = producer;
+    }
+
+    /**
+     * Send the results to the topic using the queryID as the topicname
+     */
+    @Override
+    public void export(final TransactionBase fluoTx, final String queryId, 
final VisibilityBindingSet result) throws ResultExportException {
+        checkNotNull(fluoTx);
+        checkNotNull(queryId);
+        checkNotNull(result);
+        try {
+            final String pcjId = fluoTx.gets(queryId, 
FluoQueryColumns.RYA_PCJ_ID);
+            String msg = "out to kafta topic: queryId=" + queryId + " pcjId=" 
+ pcjId + " result=" + result;
+            log.trace(msg);
+
+            // Send result on topic
+            ProducerRecord<String, VisibilityBindingSet> rec = new 
ProducerRecord<String, VisibilityBindingSet>(/* topicname= */ queryId, /* 
value= */ result);
+            // Can add a key if you need to:
+            // ProducerRecord(String topic, K key, V value)
+            producer.send(rec);
+            log.debug("producer.send(rec) completed");
+
+        } catch (final Throwable e) {
+            throw new ResultExportException("A result could not be exported to 
Kafka.", e);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/c734a4c1/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/kafka/KafkaResultExporterFactory.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/kafka/KafkaResultExporterFactory.java
 
b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/kafka/KafkaResultExporterFactory.java
new file mode 100644
index 0000000..995e9d9
--- /dev/null
+++ 
b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/kafka/KafkaResultExporterFactory.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.pcj.fluo.app.export.kafka;
+
+import org.apache.fluo.api.observer.Observer.Context;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.log4j.Logger;
+import org.apache.rya.indexing.pcj.fluo.app.export.IncrementalResultExporter;
+import 
org.apache.rya.indexing.pcj.fluo.app.export.IncrementalResultExporterFactory;
+import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet;
+
+import com.google.common.base.Optional;
+
+/**
+ * Creates instances of {@link KafkaResultExporter}.
+ * <p/>
+ * Configure a Kafka producer by adding several required Key/values as 
described here:
+ * http://kafka.apache.org/documentation.html#producerconfigs
+ * <p/>
+ * Here is a simple example:
+ * <pre>
+ *     Properties producerConfig = new Properties();
+ *     producerConfig.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
"localhost:9092");
+ *     producerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
"org.apache.kafka.common.serialization.ByteArraySerializer");
+ *     producerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
"org.apache.kafka.common.serialization.StringSerializer");
+ * </pre>
+ * 
+ * @see ProducerConfig
+ */
+public class KafkaResultExporterFactory implements 
IncrementalResultExporterFactory {
+    private static final Logger log = 
Logger.getLogger(KafkaResultExporterFactory.class);
+    @Override
+    public Optional<IncrementalResultExporter> build(Context context) throws 
IncrementalExporterFactoryException, ConfigurationException {
+        final KafkaExportParameters exportParams = new 
KafkaExportParameters(context.getObserverConfiguration().toMap());
+        log.debug("KafkaResultExporterFactory.build(): 
params.isExportToKafka()=" + exportParams.isExportToKafka());
+        if (exportParams.isExportToKafka()) {
+            // Setup Kafka connection
+            KafkaProducer<String, VisibilityBindingSet> producer = new 
KafkaProducer<String, VisibilityBindingSet>(exportParams.listAllConfig());
+            // Create the exporter
+            final IncrementalResultExporter exporter = new 
KafkaResultExporter(producer);
+            return Optional.of(exporter);
+        } else {
+            return Optional.absent();
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/c734a4c1/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/kafka/KryoVisibilityBindingSetSerializer.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/kafka/KryoVisibilityBindingSetSerializer.java
 
b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/kafka/KryoVisibilityBindingSetSerializer.java
new file mode 100644
index 0000000..d12233a
--- /dev/null
+++ 
b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/kafka/KryoVisibilityBindingSetSerializer.java
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.pcj.fluo.app.export.kafka;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.util.ArrayList;
+import java.util.Map;
+
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.log4j.Logger;
+import org.apache.rya.api.domain.RyaType;
+import org.apache.rya.api.resolver.RdfToRyaConversions;
+import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet;
+import org.openrdf.model.URI;
+import org.openrdf.model.Value;
+import org.openrdf.model.ValueFactory;
+import org.openrdf.model.impl.URIImpl;
+import org.openrdf.model.impl.ValueFactoryImpl;
+import org.openrdf.model.vocabulary.XMLSchema;
+import org.openrdf.query.Binding;
+import org.openrdf.query.BindingSet;
+import org.openrdf.query.impl.ListBindingSet;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+
+/**
+ * Serialize and deserialize a VisibilityBindingSet using Kyro lib. Great for 
exporting results of queries.
+ *
+ */
+public class KryoVisibilityBindingSetSerializer implements 
Serializer<VisibilityBindingSet>, Deserializer<VisibilityBindingSet> {
+    private static final ThreadLocal<Kryo> kryos = new ThreadLocal<Kryo>() {
+        @Override
+        protected Kryo initialValue() {
+            Kryo kryo = new Kryo();
+            return kryo;
+        };
+    };
+
+    /**
+     * Deserialize a VisibilityBindingSet using Kyro lib. Exporting results of 
queries.
+     * If you don't want to use Kyro, here is an alternative:
+     * return (new VisibilityBindingSetStringConverter()).convert(new 
String(data, StandardCharsets.UTF_8), null);
+     * 
+     * @param topic
+     *            ignored
+     * @param data
+     *            serialized bytes
+     * @return deserialized instance of VisibilityBindingSet
+     */
+    @Override
+    public VisibilityBindingSet deserialize(String topic, byte[] data) {
+        KryoInternalSerializer internalSerializer = new 
KryoInternalSerializer();
+        Input input = new Input(new ByteArrayInputStream(data));
+        return internalSerializer.read(kryos.get(), input, 
VisibilityBindingSet.class);
+    }
+
+    /**
+     * Ignored. Nothing to configure.
+     */
+    @Override
+    public void configure(Map<String, ?> configs, boolean isKey) {
+        // Do nothing.
+    }
+
+    /**
+     * Serialize a VisibilityBindingSet using Kyro lib. Exporting results of 
queries.
+     * If you don't want to use Kyro, here is an alternative:
+     * return (new VisibilityBindingSetStringConverter()).convert(data, 
null).getBytes(StandardCharsets.UTF_8);
+     * 
+     * @param topic
+     *            ignored
+     * @param data
+     *            serialize this instance
+     * @return Serialized form of VisibilityBindingSet
+     */
+    @Override
+    public byte[] serialize(String topic, VisibilityBindingSet data) {
+        KryoInternalSerializer internalSerializer = new 
KryoInternalSerializer();
+        ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        Output output = new Output(baos);
+        internalSerializer.write(kryos.get(), output, data);
+        output.flush();
+        byte[] array = baos.toByteArray();
+        return array;
+    }
+
+    /**
+     * Ignored. Nothing to close.
+     */
+    @Override
+    public void close() {
+        // Do nothing.
+    }
+
+    private static Value makeValue(final String valueString, final URI 
typeURI) {
+        // Convert the String Value into a Value.
+        final ValueFactory valueFactory = ValueFactoryImpl.getInstance();
+        if (typeURI.equals(XMLSchema.ANYURI)) {
+            return valueFactory.createURI(valueString);
+        } else {
+            return valueFactory.createLiteral(valueString, typeURI);
+        }
+    }
+
+    /**
+     * De/Serialize a visibility binding set using the Kryo library.
+     *
+     */
+    private static class KryoInternalSerializer extends 
com.esotericsoftware.kryo.Serializer<VisibilityBindingSet> {
+        private static final Logger log = 
Logger.getLogger(KryoVisibilityBindingSetSerializer.class);
+        @Override
+        public void write(Kryo kryo, Output output, VisibilityBindingSet 
visBindingSet) {
+            log.debug("Serializer writing visBindingSet" + visBindingSet);
+            output.writeString(visBindingSet.getVisibility());
+            // write the number count for the reader.
+            output.writeInt(visBindingSet.size());
+            for (Binding binding : visBindingSet) {
+                output.writeString(binding.getName());
+                final RyaType ryaValue = 
RdfToRyaConversions.convertValue(binding.getValue());
+                final String valueString = ryaValue.getData();
+                final URI type = ryaValue.getDataType();
+                output.writeString(valueString);
+                output.writeString(type.toString());
+            }
+        }
+
+        @Override
+        public VisibilityBindingSet read(Kryo kryo, Input input, 
Class<VisibilityBindingSet> aClass) {
+            log.debug("Serializer reading visBindingSet");
+            String visibility = input.readString();
+            int bindingCount = input.readInt();
+            ArrayList<String> namesList = new ArrayList<String>(bindingCount);
+            ArrayList<Value> valuesList = new ArrayList<Value>(bindingCount);
+            for (int i = bindingCount; i > 0; i--) {
+                namesList.add(input.readString());
+                String valueString = input.readString();
+                final URI type = new URIImpl(input.readString());
+                valuesList.add(makeValue(valueString, type));
+            }
+            BindingSet bindingSet = new ListBindingSet(namesList, valuesList);
+            return new VisibilityBindingSet(bindingSet, visibility);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/c734a4c1/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/BindingSetSerializer.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/BindingSetSerializer.java
 
b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/BindingSetSerializer.java
deleted file mode 100644
index 7b35fec..0000000
--- 
a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/BindingSetSerializer.java
+++ /dev/null
@@ -1,137 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.rya.indexing.pcj.fluo.app.export.rya;
-
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.util.ArrayList;
-import java.util.Map;
-
-import org.apache.kafka.common.serialization.Deserializer;
-import org.apache.kafka.common.serialization.Serializer;
-import org.apache.log4j.Logger;
-import org.apache.rya.api.domain.RyaType;
-import org.apache.rya.api.resolver.RdfToRyaConversions;
-import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet;
-import org.openrdf.model.URI;
-import org.openrdf.model.Value;
-import org.openrdf.model.ValueFactory;
-import org.openrdf.model.impl.URIImpl;
-import org.openrdf.model.impl.ValueFactoryImpl;
-import org.openrdf.model.vocabulary.XMLSchema;
-import org.openrdf.query.Binding;
-import org.openrdf.query.BindingSet;
-import org.openrdf.query.impl.ListBindingSet;
-
-import com.esotericsoftware.kryo.Kryo;
-import com.esotericsoftware.kryo.io.Input;
-import com.esotericsoftware.kryo.io.Output;
-
-public class BindingSetSerializer implements Serializer<VisibilityBindingSet>, 
Deserializer<VisibilityBindingSet> {
-    private static final ThreadLocal<Kryo> kryos = new ThreadLocal<Kryo>() {
-        @Override
-        protected Kryo initialValue() {
-            Kryo kryo = new Kryo();
-            return kryo;
-        };
-    };
-
-    @Override
-    public VisibilityBindingSet deserialize(String topic, byte[] data) {
-        KryoInternalSerializer internalSerializer = new 
KryoInternalSerializer();
-        Input input = new Input(new ByteArrayInputStream(data));
-        return internalSerializer.read(kryos.get(), input, 
VisibilityBindingSet.class);
-        // this is an alternative, or perhaps replace it:
-        // return (new VisibilityBindingSetStringConverter()).convert(new 
String(data, StandardCharsets.UTF_8), null);
-    }
-
-    @Override
-    public void configure(Map<String, ?> configs, boolean isKey) {
-        // Do nothing.
-    }
-
-    @Override
-    public byte[] serialize(String topic, VisibilityBindingSet data) {
-        KryoInternalSerializer internalSerializer = new 
KryoInternalSerializer();
-        ByteArrayOutputStream baos = new ByteArrayOutputStream();
-        Output output = new Output(baos);
-        internalSerializer.write(kryos.get(), output, data);
-        output.flush();
-        byte[] array = baos.toByteArray();
-        return array;
-        // this is an alternative, or perhaps replace it:
-        // return (new VisibilityBindingSetStringConverter()).convert(data, 
null).getBytes(StandardCharsets.UTF_8);
-    }
-
-    @Override
-    public void close() {
-        // Do nothing.
-    }
-
-    private static Value makeValue(final String valueString, final URI 
typeURI) {
-        // Convert the String Value into a Value.
-        final ValueFactory valueFactory = ValueFactoryImpl.getInstance();
-        if (typeURI.equals(XMLSchema.ANYURI)) {
-            return valueFactory.createURI(valueString);
-        } else {
-            return valueFactory.createLiteral(valueString, typeURI);
-        }
-    }
-
-    /**
-     * De/Serialize a visibility binding set using the Kryo library.
-     * TODO rename this KryoSomething and change the package.
-     *
-     */
-    private static class KryoInternalSerializer extends 
com.esotericsoftware.kryo.Serializer<VisibilityBindingSet> {
-        private static final Logger log = 
Logger.getLogger(BindingSetSerializer.class);
-        @Override
-        public void write(Kryo kryo, Output output, VisibilityBindingSet 
visBindingSet) {
-            log.debug("Serializer writing visBindingSet" + visBindingSet);
-            output.writeString(visBindingSet.getVisibility());
-            // write the number count for the reader.
-            output.writeInt(visBindingSet.size());
-            for (Binding binding : visBindingSet) {
-                output.writeString(binding.getName());
-                final RyaType ryaValue = 
RdfToRyaConversions.convertValue(binding.getValue());
-                final String valueString = ryaValue.getData();
-                final URI type = ryaValue.getDataType();
-                output.writeString(valueString);
-                output.writeString(type.toString());
-            }
-        }
-
-        @Override
-        public VisibilityBindingSet read(Kryo kryo, Input input, 
Class<VisibilityBindingSet> aClass) {
-            log.debug("Serializer reading visBindingSet");
-            String visibility = input.readString();
-            int bindingCount = input.readInt();
-            ArrayList<String> namesList = new ArrayList<String>(bindingCount);
-            ArrayList<Value> valuesList = new ArrayList<Value>(bindingCount);
-            for (int i = bindingCount; i > 0; i--) {
-                namesList.add(input.readString());
-                String valueString = input.readString();
-                final URI type = new URIImpl(input.readString());
-                valuesList.add(makeValue(valueString, type));
-            }
-            BindingSet bindingSet = new ListBindingSet(namesList, valuesList);
-            return new VisibilityBindingSet(bindingSet, visibility);
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/c734a4c1/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/KafkaExportParameters.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/KafkaExportParameters.java
 
b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/KafkaExportParameters.java
deleted file mode 100644
index 3dbb1d8..0000000
--- 
a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/KafkaExportParameters.java
+++ /dev/null
@@ -1,84 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.rya.indexing.pcj.fluo.app.export.rya;
-
-import java.util.Map;
-import java.util.Properties;
-
-import org.apache.fluo.api.observer.Observer;
-import org.apache.rya.indexing.pcj.fluo.app.export.ParametersBase;
-
-/**
- * Provides read/write functions to the parameters map that is passed into an
- * {@link Observer#init(io.fluo.api.observer.Observer.Context)} method related
- * to PCJ exporting to a kafka topic.
- * Remember: if doesn't count unless it is added to params
- */
-
-public class KafkaExportParameters extends ParametersBase {
-
-    public static final String CONF_EXPORT_TO_KAFKA = 
"pcj.fluo.export.kafka.enabled";
-
-    public KafkaExportParameters(final Map<String, String> params) {
-        super(params);
-    }
-
-    /**
-     * @param isExportToKafka
-     *            - {@code True} if the Fluo application should export
-     *            to Kafka; otherwise {@code false}.
-     */
-    public void setExportToKafka(final boolean isExportToKafka) {
-        setBoolean(params, CONF_EXPORT_TO_KAFKA, isExportToKafka);
-    }
-
-    /**
-     * @return {@code True} if the Fluo application should export to Kafka; 
otherwise
-     *         {@code false}. Defaults to {@code false} if no value is present.
-     */
-    public boolean isExportToKafka() {
-        return getBoolean(params, CONF_EXPORT_TO_KAFKA, false);
-    }
-
-    /**
-     * Add the properties to the params, NOT keeping them separate from the 
other params.
-     * Guaranteed by Properties: Each key and its corresponding value in the 
property list is a string.
-     * 
-     * @param producerConfig
-     */
-    public void setProducerConfig(final Properties producerConfig) {
-        for (Object key : producerConfig.keySet().toArray()) {
-            Object value = producerConfig.getProperty(key.toString());
-            this.params.put(key.toString(), value.toString());
-        }
-    }
-
-    /**
-     * @return all the params (not just kafka producer Configuration) as a 
{@link Properties}
-     */
-    public Properties getProducerConfig() {
-        Properties props = new Properties();
-        for (Object key : params.keySet().toArray()) {
-            Object value = params.get(key.toString());
-            props.put(key.toString(), value.toString());
-        }
-        return props;
-    }
-
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/c734a4c1/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/KafkaResultExporter.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/KafkaResultExporter.java
 
b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/KafkaResultExporter.java
deleted file mode 100644
index 362efa7..0000000
--- 
a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/KafkaResultExporter.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.rya.indexing.pcj.fluo.app.export.rya;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-
-import org.apache.fluo.api.client.TransactionBase;
-import org.apache.kafka.clients.producer.KafkaProducer;
-import org.apache.kafka.clients.producer.ProducerRecord;
-import org.apache.log4j.Logger;
-import org.apache.rya.indexing.pcj.fluo.app.export.IncrementalResultExporter;
-import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns;
-import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet;
-
-/**
- * Incrementally exports SPARQL query results to Kafka topics.
- */
-public class KafkaResultExporter implements IncrementalResultExporter {
-    private final KafkaProducer<String, VisibilityBindingSet> producer;
-    private static final Logger log = 
Logger.getLogger(KafkaResultExporter.class);
-
-    /**
-     * Constructs an instance given a Kafka producer.
-     * 
-     * @param producer
-     *            for sending result set alerts to a broker. (not null)
-     *            created and configured by {@link KafkaResultExporterFactory}
-     */
-    public KafkaResultExporter(KafkaProducer<String, VisibilityBindingSet> 
producer) {
-        super();
-        checkNotNull(producer, "Producer is required.");
-        this.producer = producer;
-    }
-
-    /**
-     * Send the results to the topic using the queryID as the topicname
-     */
-    @Override
-    public void export(final TransactionBase fluoTx, final String queryId, 
final VisibilityBindingSet result) throws ResultExportException {
-        checkNotNull(fluoTx);
-        checkNotNull(queryId);
-        checkNotNull(result);
-        try {
-            final String pcjId = fluoTx.gets(queryId, 
FluoQueryColumns.RYA_PCJ_ID);
-            String msg = "out to kafta topic: queryId=" + queryId + " pcjId=" 
+ pcjId + " result=" + result;
-            log.info(msg);
-
-            // Send result on topic
-            ProducerRecord<String, VisibilityBindingSet> rec = new 
ProducerRecord<String, VisibilityBindingSet>(/* topicname= */ queryId, /* 
value= */ result);
-            // Can add a key if you need to:
-            // ProducerRecord(String topic, K key, V value)
-            producer.send(rec);
-            log.debug("producer.send(rec) completed");
-
-        } catch (final Throwable e) {
-            throw new ResultExportException("A result could not be exported to 
Kafka.", e);
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/c734a4c1/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/KafkaResultExporterFactory.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/KafkaResultExporterFactory.java
 
b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/KafkaResultExporterFactory.java
deleted file mode 100644
index 9418720..0000000
--- 
a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/KafkaResultExporterFactory.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.rya.indexing.pcj.fluo.app.export.rya;
-
-import org.apache.fluo.api.observer.Observer.Context;
-import org.apache.kafka.clients.producer.KafkaProducer;
-import org.apache.kafka.clients.producer.ProducerConfig;
-import org.apache.log4j.Logger;
-import org.apache.rya.indexing.pcj.fluo.app.export.IncrementalResultExporter;
-import 
org.apache.rya.indexing.pcj.fluo.app.export.IncrementalResultExporterFactory;
-import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet;
-
-import com.google.common.base.Optional;
-
-/**
- * Creates instances of {@link KafkaResultExporter}.
- * <p/>
- * Configure a Kafka producer by adding several required Key/values as 
described here:
- * http://kafka.apache.org/documentation.html#producerconfigs
- * <p/>
- * Here is a simple example:
- * <pre>
- *     Properties producerConfig = new Properties();
- *     producerConfig.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
"localhost:9092");
- *     producerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
"org.apache.kafka.common.serialization.ByteArraySerializer");
- *     producerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
"org.apache.kafka.common.serialization.StringSerializer");
- * </pre>
- * 
- * @see ProducerConfig
- */
-public class KafkaResultExporterFactory implements 
IncrementalResultExporterFactory {
-    private static final Logger log = 
Logger.getLogger(KafkaResultExporterFactory.class);
-    @Override
-    public Optional<IncrementalResultExporter> build(Context context) throws 
IncrementalExporterFactoryException, ConfigurationException {
-        final KafkaExportParameters exportParams = new 
KafkaExportParameters(context.getObserverConfiguration().toMap());
-        log.debug("KafkaResultExporterFactory.build(): 
params.isExportToKafka()=" + exportParams.isExportToKafka());
-        if (exportParams.isExportToKafka()) {
-            // Setup Kafka connection
-            KafkaProducer<String, VisibilityBindingSet> producer = new 
KafkaProducer<String, VisibilityBindingSet>(exportParams.getProducerConfig());
-            // Create the exporter
-            final IncrementalResultExporter exporter = new 
KafkaResultExporter(producer);
-            return Optional.of(exporter);
-        } else {
-            return Optional.absent();
-        }
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/c734a4c1/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/QueryResultObserver.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/QueryResultObserver.java
 
b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/QueryResultObserver.java
index a8fc6d9..1238c18 100644
--- 
a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/QueryResultObserver.java
+++ 
b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/QueryResultObserver.java
@@ -33,7 +33,7 @@ import 
org.apache.rya.indexing.pcj.fluo.app.export.IncrementalResultExporter;
 import 
org.apache.rya.indexing.pcj.fluo.app.export.IncrementalResultExporter.ResultExportException;
 import 
org.apache.rya.indexing.pcj.fluo.app.export.IncrementalResultExporterFactory;
 import 
org.apache.rya.indexing.pcj.fluo.app.export.IncrementalResultExporterFactory.IncrementalExporterFactoryException;
-import 
org.apache.rya.indexing.pcj.fluo.app.export.rya.KafkaResultExporterFactory;
+import 
org.apache.rya.indexing.pcj.fluo.app.export.kafka.KafkaResultExporterFactory;
 import 
org.apache.rya.indexing.pcj.fluo.app.export.rya.RyaResultExporterFactory;
 import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns;
 import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryMetadataDAO;

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/c734a4c1/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/KafkaExportParametersTest.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/KafkaExportParametersTest.java
 
b/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/KafkaExportParametersTest.java
index 1e5adbf..74193cf 100644
--- 
a/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/KafkaExportParametersTest.java
+++ 
b/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/KafkaExportParametersTest.java
@@ -27,6 +27,8 @@ import java.util.HashMap;
 import java.util.Map;
 import java.util.Properties;
 
+import org.apache.rya.indexing.pcj.fluo.app.export.kafka.KafkaExportParameters;
+import 
org.apache.rya.indexing.pcj.fluo.app.export.kafka.KafkaResultExporterFactory;
 import org.junit.Test;
 
 /**
@@ -71,8 +73,8 @@ public class KafkaExportParametersTest {
         Properties props = new Properties();
         props.put(key1, value1Second);
         props.put(key2, value2);
-        kafkaParams.setProducerConfig(props);
-        Properties propsAfter = kafkaParams.getProducerConfig();
+        kafkaParams.addAllProducerConfig(props);
+        Properties propsAfter = kafkaParams.listAllConfig();
         assertEquals(props, propsAfter);
         assertEquals(params, params);
         assertEquals("Should change identical parameters key", 
params.get(key1), value1Second);

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/c734a4c1/extras/rya.pcj.fluo/pcj.fluo.integration/pom.xml
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.integration/pom.xml 
b/extras/rya.pcj.fluo/pcj.fluo.integration/pom.xml
index b7adad6..ab99ecd 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.integration/pom.xml
+++ b/extras/rya.pcj.fluo/pcj.fluo.integration/pom.xml
@@ -79,7 +79,6 @@
             <artifactId>kafka_2.11</artifactId>
             <version>0.10.1.0</version>
             <classifier>test</classifier>
-<!--             <scope>test</scope> -->
             <exclusions>
                 <exclusion>
                     <artifactId>slf4j-log4j12</artifactId>

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/c734a4c1/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/KafkaExportIT.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/KafkaExportIT.java
 
b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/KafkaExportIT.java
index 10d2530..5e12fac 100644
--- 
a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/KafkaExportIT.java
+++ 
b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/KafkaExportIT.java
@@ -33,6 +33,8 @@ import java.util.Properties;
 import java.util.Set;
 
 import org.I0Itec.zkclient.ZkClient;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
@@ -44,7 +46,7 @@ import org.apache.rya.api.domain.RyaStatement;
 import org.apache.rya.indexing.pcj.fluo.ITBase;
 import org.apache.rya.indexing.pcj.fluo.api.CreatePcj;
 import org.apache.rya.indexing.pcj.fluo.api.InsertTriples;
-import org.apache.rya.indexing.pcj.fluo.app.export.rya.KafkaExportParameters;
+import org.apache.rya.indexing.pcj.fluo.app.export.kafka.KafkaExportParameters;
 import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage;
 import org.apache.rya.indexing.pcj.storage.accumulo.AccumuloPcjStorage;
 import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet;
@@ -77,6 +79,7 @@ import kafka.zk.EmbeddedZookeeper;
  * $ mvn surefire:test -Dtest=KafkaExportIT
  */
 public class KafkaExportIT extends ITBase {
+    private static final Log logger = LogFactory.getLog(KafkaExportIT.class);
 
     private static final String ZKHOST = "127.0.0.1";
     private static final String BROKERHOST = "127.0.0.1";
@@ -112,7 +115,7 @@ public class KafkaExportIT extends ITBase {
         Time mock = new MockTime();
         kafkaServer = TestUtils.createServer(config, mock);
 
-        System.out.println("setup kafka and fluo.");
+        logger.trace("setup kafka and fluo.");
     }
 
     /**
@@ -125,7 +128,6 @@ public class KafkaExportIT extends ITBase {
      */
         @Test
         public void embeddedKafkaTest() throws InterruptedException, 
IOException {
-
             // create topic
             AdminUtils.createTopic(zkUtils, TOPIC, 1, 1, new Properties(), 
RackAwareMode.Disabled$.MODULE$);
 
@@ -157,7 +159,7 @@ public class KafkaExportIT extends ITBase {
             assertEquals(1, records.count());
             Iterator<ConsumerRecord<Integer, byte[]>> recordIterator = 
records.iterator();
             ConsumerRecord<Integer, byte[]> record = recordIterator.next();
-            System.out.printf("offset = %d, key = %s, value = %s", 
record.offset(), record.key(), record.value());
+        logger.trace(String.format("offset = %d, key = %s, value = %s", 
record.offset(), record.key(), record.value()));
             assertEquals(42, (int) record.key());
             assertEquals("test-message", new String(record.value(), 
StandardCharsets.UTF_8));
         consumer.close();
@@ -206,10 +208,10 @@ public class KafkaExportIT extends ITBase {
         ConsumerRecord<Integer, VisibilityBindingSet> unexpectedRecord = null;
         while (recordIterator.hasNext()) {
             ConsumerRecord<Integer, VisibilityBindingSet> record = 
recordIterator.next();
-            System.out.printf("Consumed offset = %d, key = %s, value = %s", 
record.offset(), record.key(), record.value().toString());
+            logger.trace(String.format("Consumed offset = %d, key = %s, value 
= %s", record.offset(), record.key(), record.value().toString()));
             boolean expectedThis = expected.contains(record.value());
             if (!expectedThis) {
-                System.out.println("This consumed record is not expected.");
+                logger.trace("This consumed record is not expected.");
                 unexpectedRecord = record;
             }
             allExpected = allExpected && expectedThis;
@@ -244,7 +246,7 @@ public class KafkaExportIT extends ITBase {
         consumerProps.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "group0");
         consumerProps.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, 
"consumer0");
         
consumerProps.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
"org.apache.kafka.common.serialization.IntegerDeserializer");
-        
consumerProps.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
"org.apache.rya.indexing.pcj.fluo.app.export.rya.BindingSetSerializer");
+        
consumerProps.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
"org.apache.rya.indexing.pcj.fluo.app.export.kafka.KryoVisibilityBindingSetSerializer");
         // "org.apache.kafka.common.serialization.ByteArrayDeserializer");
         consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, 
"earliest"); // to make sure the consumer starts from the beginning of the topic
         /// KafkaConsumer<Integer, byte[]> consumer = new 
KafkaConsumer<>(consumerProps);
@@ -270,9 +272,9 @@ public class KafkaExportIT extends ITBase {
         Properties producerConfig = new Properties();
         producerConfig.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BROKERHOST 
+ ":" + BROKERPORT);
         producerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
"org.apache.kafka.common.serialization.ByteArraySerializer");
-        producerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
"org.apache.rya.indexing.pcj.fluo.app.export.rya.BindingSetSerializer");
+        producerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
"org.apache.rya.indexing.pcj.fluo.app.export.kafka.KryoVisibilityBindingSetSerializer");
         // "org.apache.kafka.common.serialization.StringSerializer");
-        kafkaParams.setProducerConfig(producerConfig);
+        kafkaParams.addAllProducerConfig(producerConfig);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/c734a4c1/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index e635e25..3fa35ba 100644
--- a/pom.xml
+++ b/pom.xml
@@ -135,6 +135,7 @@ under the License.
         <jsr305.version>1.3.9-1</jsr305.version>
         <jcip.version>1.0-1</jcip.version>
         <findbugs.plugin.version>3.0.4</findbugs.plugin.version>
+        <kafka.version>0.10.1.0</kafka.version>
     </properties>
     <dependencyManagement>
         <dependencies>
@@ -666,6 +667,23 @@ under the License.
                     </exclusion>
                 </exclusions>
             </dependency>
+            <!-- Kafka -->
+            <dependency>
+                <groupId>org.apache.kafka</groupId>
+                <artifactId>kafka-clients</artifactId>
+                <version>${kafka.version}</version>
+            </dependency>
+            <dependency>
+                <groupId>org.apache.kafka</groupId>
+                <artifactId>kafka_2.11</artifactId>
+                <version>${kafka.version}</version>
+                <exclusions>
+                    <exclusion>
+                        <artifactId>slf4j-log4j12</artifactId>
+                        <groupId>org.slf4j</groupId>
+                    </exclusion>
+                </exclusions>
+            </dependency>
         </dependencies>
     </dependencyManagement>
 

Reply via email to