Repository: incubator-rya Updated Branches: refs/heads/master fe7ca5d66 -> c734a4c16
RYA-128 Review issues fixed. Closes #150 Project: http://git-wip-us.apache.org/repos/asf/incubator-rya/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-rya/commit/c734a4c1 Tree: http://git-wip-us.apache.org/repos/asf/incubator-rya/tree/c734a4c1 Diff: http://git-wip-us.apache.org/repos/asf/incubator-rya/diff/c734a4c1 Branch: refs/heads/master Commit: c734a4c161be8fb489c692d47775f3dd313f948f Parents: fe7ca5d Author: David W. Lotts <david.lo...@parsons.com> Authored: Tue Apr 4 17:14:21 2017 -0400 Committer: David W. Lotts <david.lo...@parsons.com> Committed: Thu Apr 6 17:43:09 2017 -0400 ---------------------------------------------------------------------- extras/rya.pcj.fluo/pcj.fluo.app/pom.xml | 1 - .../app/export/kafka/KafkaExportParameters.java | 86 ++++++++++ .../app/export/kafka/KafkaResultExporter.java | 75 +++++++++ .../kafka/KafkaResultExporterFactory.java | 64 ++++++++ .../KryoVisibilityBindingSetSerializer.java | 164 +++++++++++++++++++ .../app/export/rya/BindingSetSerializer.java | 137 ---------------- .../app/export/rya/KafkaExportParameters.java | 84 ---------- .../app/export/rya/KafkaResultExporter.java | 75 --------- .../export/rya/KafkaResultExporterFactory.java | 64 -------- .../fluo/app/observers/QueryResultObserver.java | 2 +- .../export/rya/KafkaExportParametersTest.java | 6 +- .../rya.pcj.fluo/pcj.fluo.integration/pom.xml | 1 - .../pcj/fluo/integration/KafkaExportIT.java | 20 ++- pom.xml | 18 ++ 14 files changed, 423 insertions(+), 374 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/c734a4c1/extras/rya.pcj.fluo/pcj.fluo.app/pom.xml ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/pom.xml b/extras/rya.pcj.fluo/pcj.fluo.app/pom.xml index de51008..343713c 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.app/pom.xml +++ b/extras/rya.pcj.fluo/pcj.fluo.app/pom.xml @@ -99,7 +99,6 @@ under the License. <artifactId>kafka_2.11</artifactId> <version>0.10.1.0</version> <classifier>test</classifier> -<!-- <scope>test</scope> --> <exclusions> <exclusion> <artifactId>slf4j-log4j12</artifactId> http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/c734a4c1/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/kafka/KafkaExportParameters.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/kafka/KafkaExportParameters.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/kafka/KafkaExportParameters.java new file mode 100644 index 0000000..347a2e2 --- /dev/null +++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/kafka/KafkaExportParameters.java @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.indexing.pcj.fluo.app.export.kafka; + +import java.util.Map; +import java.util.Properties; + +import org.apache.fluo.api.observer.Observer; +import org.apache.rya.indexing.pcj.fluo.app.export.ParametersBase; + +/** + * Provides read/write functions to the parameters map that is passed into an + * {@link Observer#init(io.fluo.api.observer.Observer.Context)} method related + * to PCJ exporting to a kafka topic. + * Remember: if doesn't count unless it is added to params + */ + +public class KafkaExportParameters extends ParametersBase { + + public static final String CONF_EXPORT_TO_KAFKA = "pcj.fluo.export.kafka.enabled"; + + public KafkaExportParameters(final Map<String, String> params) { + super(params); + } + + /** + * @param isExportToKafka + * - {@code True} if the Fluo application should export + * to Kafka; otherwise {@code false}. + */ + public void setExportToKafka(final boolean isExportToKafka) { + setBoolean(params, CONF_EXPORT_TO_KAFKA, isExportToKafka); + } + + /** + * @return {@code True} if the Fluo application should export to Kafka; otherwise + * {@code false}. Defaults to {@code false} if no value is present. + */ + public boolean isExportToKafka() { + return getBoolean(params, CONF_EXPORT_TO_KAFKA, false); + } + + /** + * Add the properties to the params, NOT keeping them separate from the other params. + * Guaranteed by Properties: Each key and its corresponding value in the property list is a string. + * + * @param producerConfig + */ + public void addAllProducerConfig(final Properties producerConfig) { + for (Object key : producerConfig.keySet().toArray()) { + Object value = producerConfig.getProperty(key.toString()); + this.params.put(key.toString(), value.toString()); + } + } + + /** + * Collect all the properties + * + * @return all the params (not just kafka producer Configuration) as a {@link Properties} + */ + public Properties listAllConfig() { + Properties props = new Properties(); + for (Object key : params.keySet().toArray()) { + Object value = params.get(key.toString()); + props.put(key.toString(), value.toString()); + } + return props; + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/c734a4c1/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/kafka/KafkaResultExporter.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/kafka/KafkaResultExporter.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/kafka/KafkaResultExporter.java new file mode 100644 index 0000000..c40c5da --- /dev/null +++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/kafka/KafkaResultExporter.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.indexing.pcj.fluo.app.export.kafka; + +import static com.google.common.base.Preconditions.checkNotNull; + +import org.apache.fluo.api.client.TransactionBase; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.log4j.Logger; +import org.apache.rya.indexing.pcj.fluo.app.export.IncrementalResultExporter; +import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns; +import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet; + +/** + * Incrementally exports SPARQL query results to Kafka topics. + */ +public class KafkaResultExporter implements IncrementalResultExporter { + private final KafkaProducer<String, VisibilityBindingSet> producer; + private static final Logger log = Logger.getLogger(KafkaResultExporter.class); + + /** + * Constructs an instance given a Kafka producer. + * + * @param producer + * for sending result set alerts to a broker. (not null) + * Can be created and configured by {@link KafkaResultExporterFactory} + */ + public KafkaResultExporter(KafkaProducer<String, VisibilityBindingSet> producer) { + super(); + checkNotNull(producer, "Producer is required."); + this.producer = producer; + } + + /** + * Send the results to the topic using the queryID as the topicname + */ + @Override + public void export(final TransactionBase fluoTx, final String queryId, final VisibilityBindingSet result) throws ResultExportException { + checkNotNull(fluoTx); + checkNotNull(queryId); + checkNotNull(result); + try { + final String pcjId = fluoTx.gets(queryId, FluoQueryColumns.RYA_PCJ_ID); + String msg = "out to kafta topic: queryId=" + queryId + " pcjId=" + pcjId + " result=" + result; + log.trace(msg); + + // Send result on topic + ProducerRecord<String, VisibilityBindingSet> rec = new ProducerRecord<String, VisibilityBindingSet>(/* topicname= */ queryId, /* value= */ result); + // Can add a key if you need to: + // ProducerRecord(String topic, K key, V value) + producer.send(rec); + log.debug("producer.send(rec) completed"); + + } catch (final Throwable e) { + throw new ResultExportException("A result could not be exported to Kafka.", e); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/c734a4c1/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/kafka/KafkaResultExporterFactory.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/kafka/KafkaResultExporterFactory.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/kafka/KafkaResultExporterFactory.java new file mode 100644 index 0000000..995e9d9 --- /dev/null +++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/kafka/KafkaResultExporterFactory.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.indexing.pcj.fluo.app.export.kafka; + +import org.apache.fluo.api.observer.Observer.Context; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.log4j.Logger; +import org.apache.rya.indexing.pcj.fluo.app.export.IncrementalResultExporter; +import org.apache.rya.indexing.pcj.fluo.app.export.IncrementalResultExporterFactory; +import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet; + +import com.google.common.base.Optional; + +/** + * Creates instances of {@link KafkaResultExporter}. + * <p/> + * Configure a Kafka producer by adding several required Key/values as described here: + * http://kafka.apache.org/documentation.html#producerconfigs + * <p/> + * Here is a simple example: + * <pre> + * Properties producerConfig = new Properties(); + * producerConfig.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); + * producerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer"); + * producerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); + * </pre> + * + * @see ProducerConfig + */ +public class KafkaResultExporterFactory implements IncrementalResultExporterFactory { + private static final Logger log = Logger.getLogger(KafkaResultExporterFactory.class); + @Override + public Optional<IncrementalResultExporter> build(Context context) throws IncrementalExporterFactoryException, ConfigurationException { + final KafkaExportParameters exportParams = new KafkaExportParameters(context.getObserverConfiguration().toMap()); + log.debug("KafkaResultExporterFactory.build(): params.isExportToKafka()=" + exportParams.isExportToKafka()); + if (exportParams.isExportToKafka()) { + // Setup Kafka connection + KafkaProducer<String, VisibilityBindingSet> producer = new KafkaProducer<String, VisibilityBindingSet>(exportParams.listAllConfig()); + // Create the exporter + final IncrementalResultExporter exporter = new KafkaResultExporter(producer); + return Optional.of(exporter); + } else { + return Optional.absent(); + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/c734a4c1/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/kafka/KryoVisibilityBindingSetSerializer.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/kafka/KryoVisibilityBindingSetSerializer.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/kafka/KryoVisibilityBindingSetSerializer.java new file mode 100644 index 0000000..d12233a --- /dev/null +++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/kafka/KryoVisibilityBindingSetSerializer.java @@ -0,0 +1,164 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.indexing.pcj.fluo.app.export.kafka; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.util.ArrayList; +import java.util.Map; + +import org.apache.kafka.common.serialization.Deserializer; +import org.apache.kafka.common.serialization.Serializer; +import org.apache.log4j.Logger; +import org.apache.rya.api.domain.RyaType; +import org.apache.rya.api.resolver.RdfToRyaConversions; +import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet; +import org.openrdf.model.URI; +import org.openrdf.model.Value; +import org.openrdf.model.ValueFactory; +import org.openrdf.model.impl.URIImpl; +import org.openrdf.model.impl.ValueFactoryImpl; +import org.openrdf.model.vocabulary.XMLSchema; +import org.openrdf.query.Binding; +import org.openrdf.query.BindingSet; +import org.openrdf.query.impl.ListBindingSet; + +import com.esotericsoftware.kryo.Kryo; +import com.esotericsoftware.kryo.io.Input; +import com.esotericsoftware.kryo.io.Output; + +/** + * Serialize and deserialize a VisibilityBindingSet using Kyro lib. Great for exporting results of queries. + * + */ +public class KryoVisibilityBindingSetSerializer implements Serializer<VisibilityBindingSet>, Deserializer<VisibilityBindingSet> { + private static final ThreadLocal<Kryo> kryos = new ThreadLocal<Kryo>() { + @Override + protected Kryo initialValue() { + Kryo kryo = new Kryo(); + return kryo; + }; + }; + + /** + * Deserialize a VisibilityBindingSet using Kyro lib. Exporting results of queries. + * If you don't want to use Kyro, here is an alternative: + * return (new VisibilityBindingSetStringConverter()).convert(new String(data, StandardCharsets.UTF_8), null); + * + * @param topic + * ignored + * @param data + * serialized bytes + * @return deserialized instance of VisibilityBindingSet + */ + @Override + public VisibilityBindingSet deserialize(String topic, byte[] data) { + KryoInternalSerializer internalSerializer = new KryoInternalSerializer(); + Input input = new Input(new ByteArrayInputStream(data)); + return internalSerializer.read(kryos.get(), input, VisibilityBindingSet.class); + } + + /** + * Ignored. Nothing to configure. + */ + @Override + public void configure(Map<String, ?> configs, boolean isKey) { + // Do nothing. + } + + /** + * Serialize a VisibilityBindingSet using Kyro lib. Exporting results of queries. + * If you don't want to use Kyro, here is an alternative: + * return (new VisibilityBindingSetStringConverter()).convert(data, null).getBytes(StandardCharsets.UTF_8); + * + * @param topic + * ignored + * @param data + * serialize this instance + * @return Serialized form of VisibilityBindingSet + */ + @Override + public byte[] serialize(String topic, VisibilityBindingSet data) { + KryoInternalSerializer internalSerializer = new KryoInternalSerializer(); + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + Output output = new Output(baos); + internalSerializer.write(kryos.get(), output, data); + output.flush(); + byte[] array = baos.toByteArray(); + return array; + } + + /** + * Ignored. Nothing to close. + */ + @Override + public void close() { + // Do nothing. + } + + private static Value makeValue(final String valueString, final URI typeURI) { + // Convert the String Value into a Value. + final ValueFactory valueFactory = ValueFactoryImpl.getInstance(); + if (typeURI.equals(XMLSchema.ANYURI)) { + return valueFactory.createURI(valueString); + } else { + return valueFactory.createLiteral(valueString, typeURI); + } + } + + /** + * De/Serialize a visibility binding set using the Kryo library. + * + */ + private static class KryoInternalSerializer extends com.esotericsoftware.kryo.Serializer<VisibilityBindingSet> { + private static final Logger log = Logger.getLogger(KryoVisibilityBindingSetSerializer.class); + @Override + public void write(Kryo kryo, Output output, VisibilityBindingSet visBindingSet) { + log.debug("Serializer writing visBindingSet" + visBindingSet); + output.writeString(visBindingSet.getVisibility()); + // write the number count for the reader. + output.writeInt(visBindingSet.size()); + for (Binding binding : visBindingSet) { + output.writeString(binding.getName()); + final RyaType ryaValue = RdfToRyaConversions.convertValue(binding.getValue()); + final String valueString = ryaValue.getData(); + final URI type = ryaValue.getDataType(); + output.writeString(valueString); + output.writeString(type.toString()); + } + } + + @Override + public VisibilityBindingSet read(Kryo kryo, Input input, Class<VisibilityBindingSet> aClass) { + log.debug("Serializer reading visBindingSet"); + String visibility = input.readString(); + int bindingCount = input.readInt(); + ArrayList<String> namesList = new ArrayList<String>(bindingCount); + ArrayList<Value> valuesList = new ArrayList<Value>(bindingCount); + for (int i = bindingCount; i > 0; i--) { + namesList.add(input.readString()); + String valueString = input.readString(); + final URI type = new URIImpl(input.readString()); + valuesList.add(makeValue(valueString, type)); + } + BindingSet bindingSet = new ListBindingSet(namesList, valuesList); + return new VisibilityBindingSet(bindingSet, visibility); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/c734a4c1/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/BindingSetSerializer.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/BindingSetSerializer.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/BindingSetSerializer.java deleted file mode 100644 index 7b35fec..0000000 --- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/BindingSetSerializer.java +++ /dev/null @@ -1,137 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.rya.indexing.pcj.fluo.app.export.rya; - -import java.io.ByteArrayInputStream; -import java.io.ByteArrayOutputStream; -import java.util.ArrayList; -import java.util.Map; - -import org.apache.kafka.common.serialization.Deserializer; -import org.apache.kafka.common.serialization.Serializer; -import org.apache.log4j.Logger; -import org.apache.rya.api.domain.RyaType; -import org.apache.rya.api.resolver.RdfToRyaConversions; -import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet; -import org.openrdf.model.URI; -import org.openrdf.model.Value; -import org.openrdf.model.ValueFactory; -import org.openrdf.model.impl.URIImpl; -import org.openrdf.model.impl.ValueFactoryImpl; -import org.openrdf.model.vocabulary.XMLSchema; -import org.openrdf.query.Binding; -import org.openrdf.query.BindingSet; -import org.openrdf.query.impl.ListBindingSet; - -import com.esotericsoftware.kryo.Kryo; -import com.esotericsoftware.kryo.io.Input; -import com.esotericsoftware.kryo.io.Output; - -public class BindingSetSerializer implements Serializer<VisibilityBindingSet>, Deserializer<VisibilityBindingSet> { - private static final ThreadLocal<Kryo> kryos = new ThreadLocal<Kryo>() { - @Override - protected Kryo initialValue() { - Kryo kryo = new Kryo(); - return kryo; - }; - }; - - @Override - public VisibilityBindingSet deserialize(String topic, byte[] data) { - KryoInternalSerializer internalSerializer = new KryoInternalSerializer(); - Input input = new Input(new ByteArrayInputStream(data)); - return internalSerializer.read(kryos.get(), input, VisibilityBindingSet.class); - // this is an alternative, or perhaps replace it: - // return (new VisibilityBindingSetStringConverter()).convert(new String(data, StandardCharsets.UTF_8), null); - } - - @Override - public void configure(Map<String, ?> configs, boolean isKey) { - // Do nothing. - } - - @Override - public byte[] serialize(String topic, VisibilityBindingSet data) { - KryoInternalSerializer internalSerializer = new KryoInternalSerializer(); - ByteArrayOutputStream baos = new ByteArrayOutputStream(); - Output output = new Output(baos); - internalSerializer.write(kryos.get(), output, data); - output.flush(); - byte[] array = baos.toByteArray(); - return array; - // this is an alternative, or perhaps replace it: - // return (new VisibilityBindingSetStringConverter()).convert(data, null).getBytes(StandardCharsets.UTF_8); - } - - @Override - public void close() { - // Do nothing. - } - - private static Value makeValue(final String valueString, final URI typeURI) { - // Convert the String Value into a Value. - final ValueFactory valueFactory = ValueFactoryImpl.getInstance(); - if (typeURI.equals(XMLSchema.ANYURI)) { - return valueFactory.createURI(valueString); - } else { - return valueFactory.createLiteral(valueString, typeURI); - } - } - - /** - * De/Serialize a visibility binding set using the Kryo library. - * TODO rename this KryoSomething and change the package. - * - */ - private static class KryoInternalSerializer extends com.esotericsoftware.kryo.Serializer<VisibilityBindingSet> { - private static final Logger log = Logger.getLogger(BindingSetSerializer.class); - @Override - public void write(Kryo kryo, Output output, VisibilityBindingSet visBindingSet) { - log.debug("Serializer writing visBindingSet" + visBindingSet); - output.writeString(visBindingSet.getVisibility()); - // write the number count for the reader. - output.writeInt(visBindingSet.size()); - for (Binding binding : visBindingSet) { - output.writeString(binding.getName()); - final RyaType ryaValue = RdfToRyaConversions.convertValue(binding.getValue()); - final String valueString = ryaValue.getData(); - final URI type = ryaValue.getDataType(); - output.writeString(valueString); - output.writeString(type.toString()); - } - } - - @Override - public VisibilityBindingSet read(Kryo kryo, Input input, Class<VisibilityBindingSet> aClass) { - log.debug("Serializer reading visBindingSet"); - String visibility = input.readString(); - int bindingCount = input.readInt(); - ArrayList<String> namesList = new ArrayList<String>(bindingCount); - ArrayList<Value> valuesList = new ArrayList<Value>(bindingCount); - for (int i = bindingCount; i > 0; i--) { - namesList.add(input.readString()); - String valueString = input.readString(); - final URI type = new URIImpl(input.readString()); - valuesList.add(makeValue(valueString, type)); - } - BindingSet bindingSet = new ListBindingSet(namesList, valuesList); - return new VisibilityBindingSet(bindingSet, visibility); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/c734a4c1/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/KafkaExportParameters.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/KafkaExportParameters.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/KafkaExportParameters.java deleted file mode 100644 index 3dbb1d8..0000000 --- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/KafkaExportParameters.java +++ /dev/null @@ -1,84 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.rya.indexing.pcj.fluo.app.export.rya; - -import java.util.Map; -import java.util.Properties; - -import org.apache.fluo.api.observer.Observer; -import org.apache.rya.indexing.pcj.fluo.app.export.ParametersBase; - -/** - * Provides read/write functions to the parameters map that is passed into an - * {@link Observer#init(io.fluo.api.observer.Observer.Context)} method related - * to PCJ exporting to a kafka topic. - * Remember: if doesn't count unless it is added to params - */ - -public class KafkaExportParameters extends ParametersBase { - - public static final String CONF_EXPORT_TO_KAFKA = "pcj.fluo.export.kafka.enabled"; - - public KafkaExportParameters(final Map<String, String> params) { - super(params); - } - - /** - * @param isExportToKafka - * - {@code True} if the Fluo application should export - * to Kafka; otherwise {@code false}. - */ - public void setExportToKafka(final boolean isExportToKafka) { - setBoolean(params, CONF_EXPORT_TO_KAFKA, isExportToKafka); - } - - /** - * @return {@code True} if the Fluo application should export to Kafka; otherwise - * {@code false}. Defaults to {@code false} if no value is present. - */ - public boolean isExportToKafka() { - return getBoolean(params, CONF_EXPORT_TO_KAFKA, false); - } - - /** - * Add the properties to the params, NOT keeping them separate from the other params. - * Guaranteed by Properties: Each key and its corresponding value in the property list is a string. - * - * @param producerConfig - */ - public void setProducerConfig(final Properties producerConfig) { - for (Object key : producerConfig.keySet().toArray()) { - Object value = producerConfig.getProperty(key.toString()); - this.params.put(key.toString(), value.toString()); - } - } - - /** - * @return all the params (not just kafka producer Configuration) as a {@link Properties} - */ - public Properties getProducerConfig() { - Properties props = new Properties(); - for (Object key : params.keySet().toArray()) { - Object value = params.get(key.toString()); - props.put(key.toString(), value.toString()); - } - return props; - } - -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/c734a4c1/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/KafkaResultExporter.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/KafkaResultExporter.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/KafkaResultExporter.java deleted file mode 100644 index 362efa7..0000000 --- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/KafkaResultExporter.java +++ /dev/null @@ -1,75 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.rya.indexing.pcj.fluo.app.export.rya; - -import static com.google.common.base.Preconditions.checkNotNull; - -import org.apache.fluo.api.client.TransactionBase; -import org.apache.kafka.clients.producer.KafkaProducer; -import org.apache.kafka.clients.producer.ProducerRecord; -import org.apache.log4j.Logger; -import org.apache.rya.indexing.pcj.fluo.app.export.IncrementalResultExporter; -import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns; -import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet; - -/** - * Incrementally exports SPARQL query results to Kafka topics. - */ -public class KafkaResultExporter implements IncrementalResultExporter { - private final KafkaProducer<String, VisibilityBindingSet> producer; - private static final Logger log = Logger.getLogger(KafkaResultExporter.class); - - /** - * Constructs an instance given a Kafka producer. - * - * @param producer - * for sending result set alerts to a broker. (not null) - * created and configured by {@link KafkaResultExporterFactory} - */ - public KafkaResultExporter(KafkaProducer<String, VisibilityBindingSet> producer) { - super(); - checkNotNull(producer, "Producer is required."); - this.producer = producer; - } - - /** - * Send the results to the topic using the queryID as the topicname - */ - @Override - public void export(final TransactionBase fluoTx, final String queryId, final VisibilityBindingSet result) throws ResultExportException { - checkNotNull(fluoTx); - checkNotNull(queryId); - checkNotNull(result); - try { - final String pcjId = fluoTx.gets(queryId, FluoQueryColumns.RYA_PCJ_ID); - String msg = "out to kafta topic: queryId=" + queryId + " pcjId=" + pcjId + " result=" + result; - log.info(msg); - - // Send result on topic - ProducerRecord<String, VisibilityBindingSet> rec = new ProducerRecord<String, VisibilityBindingSet>(/* topicname= */ queryId, /* value= */ result); - // Can add a key if you need to: - // ProducerRecord(String topic, K key, V value) - producer.send(rec); - log.debug("producer.send(rec) completed"); - - } catch (final Throwable e) { - throw new ResultExportException("A result could not be exported to Kafka.", e); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/c734a4c1/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/KafkaResultExporterFactory.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/KafkaResultExporterFactory.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/KafkaResultExporterFactory.java deleted file mode 100644 index 9418720..0000000 --- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/KafkaResultExporterFactory.java +++ /dev/null @@ -1,64 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.rya.indexing.pcj.fluo.app.export.rya; - -import org.apache.fluo.api.observer.Observer.Context; -import org.apache.kafka.clients.producer.KafkaProducer; -import org.apache.kafka.clients.producer.ProducerConfig; -import org.apache.log4j.Logger; -import org.apache.rya.indexing.pcj.fluo.app.export.IncrementalResultExporter; -import org.apache.rya.indexing.pcj.fluo.app.export.IncrementalResultExporterFactory; -import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet; - -import com.google.common.base.Optional; - -/** - * Creates instances of {@link KafkaResultExporter}. - * <p/> - * Configure a Kafka producer by adding several required Key/values as described here: - * http://kafka.apache.org/documentation.html#producerconfigs - * <p/> - * Here is a simple example: - * <pre> - * Properties producerConfig = new Properties(); - * producerConfig.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); - * producerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer"); - * producerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); - * </pre> - * - * @see ProducerConfig - */ -public class KafkaResultExporterFactory implements IncrementalResultExporterFactory { - private static final Logger log = Logger.getLogger(KafkaResultExporterFactory.class); - @Override - public Optional<IncrementalResultExporter> build(Context context) throws IncrementalExporterFactoryException, ConfigurationException { - final KafkaExportParameters exportParams = new KafkaExportParameters(context.getObserverConfiguration().toMap()); - log.debug("KafkaResultExporterFactory.build(): params.isExportToKafka()=" + exportParams.isExportToKafka()); - if (exportParams.isExportToKafka()) { - // Setup Kafka connection - KafkaProducer<String, VisibilityBindingSet> producer = new KafkaProducer<String, VisibilityBindingSet>(exportParams.getProducerConfig()); - // Create the exporter - final IncrementalResultExporter exporter = new KafkaResultExporter(producer); - return Optional.of(exporter); - } else { - return Optional.absent(); - } - } - -} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/c734a4c1/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/QueryResultObserver.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/QueryResultObserver.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/QueryResultObserver.java index a8fc6d9..1238c18 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/QueryResultObserver.java +++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/QueryResultObserver.java @@ -33,7 +33,7 @@ import org.apache.rya.indexing.pcj.fluo.app.export.IncrementalResultExporter; import org.apache.rya.indexing.pcj.fluo.app.export.IncrementalResultExporter.ResultExportException; import org.apache.rya.indexing.pcj.fluo.app.export.IncrementalResultExporterFactory; import org.apache.rya.indexing.pcj.fluo.app.export.IncrementalResultExporterFactory.IncrementalExporterFactoryException; -import org.apache.rya.indexing.pcj.fluo.app.export.rya.KafkaResultExporterFactory; +import org.apache.rya.indexing.pcj.fluo.app.export.kafka.KafkaResultExporterFactory; import org.apache.rya.indexing.pcj.fluo.app.export.rya.RyaResultExporterFactory; import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns; import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryMetadataDAO; http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/c734a4c1/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/KafkaExportParametersTest.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/KafkaExportParametersTest.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/KafkaExportParametersTest.java index 1e5adbf..74193cf 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/KafkaExportParametersTest.java +++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/KafkaExportParametersTest.java @@ -27,6 +27,8 @@ import java.util.HashMap; import java.util.Map; import java.util.Properties; +import org.apache.rya.indexing.pcj.fluo.app.export.kafka.KafkaExportParameters; +import org.apache.rya.indexing.pcj.fluo.app.export.kafka.KafkaResultExporterFactory; import org.junit.Test; /** @@ -71,8 +73,8 @@ public class KafkaExportParametersTest { Properties props = new Properties(); props.put(key1, value1Second); props.put(key2, value2); - kafkaParams.setProducerConfig(props); - Properties propsAfter = kafkaParams.getProducerConfig(); + kafkaParams.addAllProducerConfig(props); + Properties propsAfter = kafkaParams.listAllConfig(); assertEquals(props, propsAfter); assertEquals(params, params); assertEquals("Should change identical parameters key", params.get(key1), value1Second); http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/c734a4c1/extras/rya.pcj.fluo/pcj.fluo.integration/pom.xml ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.integration/pom.xml b/extras/rya.pcj.fluo/pcj.fluo.integration/pom.xml index b7adad6..ab99ecd 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.integration/pom.xml +++ b/extras/rya.pcj.fluo/pcj.fluo.integration/pom.xml @@ -79,7 +79,6 @@ <artifactId>kafka_2.11</artifactId> <version>0.10.1.0</version> <classifier>test</classifier> -<!-- <scope>test</scope> --> <exclusions> <exclusion> <artifactId>slf4j-log4j12</artifactId> http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/c734a4c1/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/KafkaExportIT.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/KafkaExportIT.java b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/KafkaExportIT.java index 10d2530..5e12fac 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/KafkaExportIT.java +++ b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/KafkaExportIT.java @@ -33,6 +33,8 @@ import java.util.Properties; import java.util.Set; import org.I0Itec.zkclient.ZkClient; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; @@ -44,7 +46,7 @@ import org.apache.rya.api.domain.RyaStatement; import org.apache.rya.indexing.pcj.fluo.ITBase; import org.apache.rya.indexing.pcj.fluo.api.CreatePcj; import org.apache.rya.indexing.pcj.fluo.api.InsertTriples; -import org.apache.rya.indexing.pcj.fluo.app.export.rya.KafkaExportParameters; +import org.apache.rya.indexing.pcj.fluo.app.export.kafka.KafkaExportParameters; import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage; import org.apache.rya.indexing.pcj.storage.accumulo.AccumuloPcjStorage; import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet; @@ -77,6 +79,7 @@ import kafka.zk.EmbeddedZookeeper; * $ mvn surefire:test -Dtest=KafkaExportIT */ public class KafkaExportIT extends ITBase { + private static final Log logger = LogFactory.getLog(KafkaExportIT.class); private static final String ZKHOST = "127.0.0.1"; private static final String BROKERHOST = "127.0.0.1"; @@ -112,7 +115,7 @@ public class KafkaExportIT extends ITBase { Time mock = new MockTime(); kafkaServer = TestUtils.createServer(config, mock); - System.out.println("setup kafka and fluo."); + logger.trace("setup kafka and fluo."); } /** @@ -125,7 +128,6 @@ public class KafkaExportIT extends ITBase { */ @Test public void embeddedKafkaTest() throws InterruptedException, IOException { - // create topic AdminUtils.createTopic(zkUtils, TOPIC, 1, 1, new Properties(), RackAwareMode.Disabled$.MODULE$); @@ -157,7 +159,7 @@ public class KafkaExportIT extends ITBase { assertEquals(1, records.count()); Iterator<ConsumerRecord<Integer, byte[]>> recordIterator = records.iterator(); ConsumerRecord<Integer, byte[]> record = recordIterator.next(); - System.out.printf("offset = %d, key = %s, value = %s", record.offset(), record.key(), record.value()); + logger.trace(String.format("offset = %d, key = %s, value = %s", record.offset(), record.key(), record.value())); assertEquals(42, (int) record.key()); assertEquals("test-message", new String(record.value(), StandardCharsets.UTF_8)); consumer.close(); @@ -206,10 +208,10 @@ public class KafkaExportIT extends ITBase { ConsumerRecord<Integer, VisibilityBindingSet> unexpectedRecord = null; while (recordIterator.hasNext()) { ConsumerRecord<Integer, VisibilityBindingSet> record = recordIterator.next(); - System.out.printf("Consumed offset = %d, key = %s, value = %s", record.offset(), record.key(), record.value().toString()); + logger.trace(String.format("Consumed offset = %d, key = %s, value = %s", record.offset(), record.key(), record.value().toString())); boolean expectedThis = expected.contains(record.value()); if (!expectedThis) { - System.out.println("This consumed record is not expected."); + logger.trace("This consumed record is not expected."); unexpectedRecord = record; } allExpected = allExpected && expectedThis; @@ -244,7 +246,7 @@ public class KafkaExportIT extends ITBase { consumerProps.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "group0"); consumerProps.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, "consumer0"); consumerProps.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.IntegerDeserializer"); - consumerProps.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.rya.indexing.pcj.fluo.app.export.rya.BindingSetSerializer"); + consumerProps.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.rya.indexing.pcj.fluo.app.export.kafka.KryoVisibilityBindingSetSerializer"); // "org.apache.kafka.common.serialization.ByteArrayDeserializer"); consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); // to make sure the consumer starts from the beginning of the topic /// KafkaConsumer<Integer, byte[]> consumer = new KafkaConsumer<>(consumerProps); @@ -270,9 +272,9 @@ public class KafkaExportIT extends ITBase { Properties producerConfig = new Properties(); producerConfig.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BROKERHOST + ":" + BROKERPORT); producerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer"); - producerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.rya.indexing.pcj.fluo.app.export.rya.BindingSetSerializer"); + producerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.rya.indexing.pcj.fluo.app.export.kafka.KryoVisibilityBindingSetSerializer"); // "org.apache.kafka.common.serialization.StringSerializer"); - kafkaParams.setProducerConfig(producerConfig); + kafkaParams.addAllProducerConfig(producerConfig); } /** http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/c734a4c1/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index e635e25..3fa35ba 100644 --- a/pom.xml +++ b/pom.xml @@ -135,6 +135,7 @@ under the License. <jsr305.version>1.3.9-1</jsr305.version> <jcip.version>1.0-1</jcip.version> <findbugs.plugin.version>3.0.4</findbugs.plugin.version> + <kafka.version>0.10.1.0</kafka.version> </properties> <dependencyManagement> <dependencies> @@ -666,6 +667,23 @@ under the License. </exclusion> </exclusions> </dependency> + <!-- Kafka --> + <dependency> + <groupId>org.apache.kafka</groupId> + <artifactId>kafka-clients</artifactId> + <version>${kafka.version}</version> + </dependency> + <dependency> + <groupId>org.apache.kafka</groupId> + <artifactId>kafka_2.11</artifactId> + <version>${kafka.version}</version> + <exclusions> + <exclusion> + <artifactId>slf4j-log4j12</artifactId> + <groupId>org.slf4j</groupId> + </exclusion> + </exclusions> + </dependency> </dependencies> </dependencyManagement>