Repository: incubator-rya Updated Branches: refs/heads/master 529f2d595 -> b03b18938
RYA-128 closes #121; closes RYA-128 trigger service to Kafka. Project: http://git-wip-us.apache.org/repos/asf/incubator-rya/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-rya/commit/b03b1893 Tree: http://git-wip-us.apache.org/repos/asf/incubator-rya/tree/b03b1893 Diff: http://git-wip-us.apache.org/repos/asf/incubator-rya/diff/b03b1893 Branch: refs/heads/master Commit: b03b18938c55ff3e896b3d9969a7b5dc54753129 Parents: 529f2d5 Author: David W. Lotts <david.lo...@parsons.com> Authored: Tue Nov 1 17:37:07 2016 -0400 Committer: pujav65 <puja...@gmail.com> Committed: Tue Apr 4 08:55:38 2017 -0400 ---------------------------------------------------------------------- .../rya/indexing/pcj/fluo/api/CreatePcj.java | 18 +- extras/rya.pcj.fluo/pcj.fluo.app/pom.xml | 48 ++- .../app/export/rya/BindingSetSerializer.java | 137 +++++++++ .../app/export/rya/KafkaExportParameters.java | 84 ++++++ .../app/export/rya/KafkaResultExporter.java | 75 +++++ .../export/rya/KafkaResultExporterFactory.java | 64 ++++ .../fluo/app/observers/QueryResultObserver.java | 14 +- .../export/rya/KafkaExportParametersTest.java | 97 +++++++ .../rya.pcj.fluo/pcj.fluo.integration/pom.xml | 34 +++ .../apache/rya/indexing/pcj/fluo/ITBase.java | 30 +- .../pcj/fluo/integration/KafkaExportIT.java | 290 +++++++++++++++++++ extras/rya.prospector/pom.xml | 39 +++ 12 files changed, 905 insertions(+), 25 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/b03b1893/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/CreatePcj.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/CreatePcj.java b/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/CreatePcj.java index 6567371..d29191d 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/CreatePcj.java +++ b/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/CreatePcj.java @@ -132,12 +132,12 @@ public class CreatePcj { * @throws SailException Historic PCJ results could not be loaded because of a problem with {@code rya}. * @throws QueryEvaluationException Historic PCJ results could not be loaded because of a problem with {@code rya}. */ - public void withRyaIntegration(final String pcjId, final PrecomputedJoinStorage pcjStorage, final FluoClient fluo, - final Connector accumulo, String ryaInstance ) - throws MalformedQueryException, PcjException, SailException, QueryEvaluationException, RyaDAOException { - requireNonNull(pcjId); - requireNonNull(pcjStorage); - requireNonNull(fluo); + public String withRyaIntegration(final String pcjId, final PrecomputedJoinStorage pcjStorage, final FluoClient fluo, + final Connector accumulo, String ryaInstance ) + throws MalformedQueryException, PcjException, SailException, QueryEvaluationException, RyaDAOException { + requireNonNull(pcjId); + requireNonNull(pcjStorage); + requireNonNull(fluo); requireNonNull(accumulo); requireNonNull(ryaInstance); @@ -162,13 +162,16 @@ public class CreatePcj { final ParsedQuery parsedQuery = new SPARQLParser().parseQuery(sparql, null); final FluoQuery fluoQuery = new SparqlFluoQueryBuilder().make(parsedQuery, nodeIds); + // return queryId to the caller for later monitoring from the export. + String queryId = null; + try (Transaction tx = fluo.newTransaction()) { // Write the query's structure to Fluo. new FluoQueryMetadataDAO().write(tx, fluoQuery); // The results of the query are eventually exported to an instance // of Rya, so store the Rya ID for the PCJ. - final String queryId = fluoQuery.getQueryMetadata().getNodeId(); + queryId = fluoQuery.getQueryMetadata().getNodeId(); tx.set(queryId, FluoQueryColumns.RYA_PCJ_ID, pcjId); tx.set(pcjId, FluoQueryColumns.PCJ_ID_QUERY_ID, queryId); @@ -206,6 +209,7 @@ public class CreatePcj { writeBatch(fluo, triplesBatch); triplesBatch.clear(); } + return queryId; } http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/b03b1893/extras/rya.pcj.fluo/pcj.fluo.app/pom.xml ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/pom.xml b/extras/rya.pcj.fluo/pcj.fluo.app/pom.xml index fd2e582..de51008 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.app/pom.xml +++ b/extras/rya.pcj.fluo/pcj.fluo.app/pom.xml @@ -33,7 +33,9 @@ under the License. A Fluo implementation of Rya Precomputed Join Indexing. This module produces a jar that may be executed by the 'fluo' command line tool as a YARN job. </description> - + <properties> + <kryo.version>3.0.3</kryo.version> + </properties> <dependencies> <!-- Rya Runtime Dependencies. --> <dependency> @@ -62,6 +64,50 @@ under the License. </exclusion> </exclusions> </dependency> + + <dependency> + <groupId>org.apache.kafka</groupId> + <artifactId>kafka-clients</artifactId> + <version>0.10.1.0</version> + </dependency> + <dependency> + <groupId>org.apache.kafka</groupId> + <artifactId>kafka_2.11</artifactId> + <version>0.10.1.0</version> + <exclusions> + <exclusion> + <artifactId>slf4j-log4j12</artifactId> + <groupId>org.slf4j</groupId> + </exclusion> + </exclusions> + </dependency> + <dependency> + <groupId>com.esotericsoftware</groupId> + <artifactId>kryo</artifactId> + <version>${kryo.version}</version> + </dependency> + + <!-- Testing dependencies. --> + <dependency> + <groupId>org.apache.kafka</groupId> + <artifactId>kafka-clients</artifactId> + <version>0.10.1.0</version> + <classifier>test</classifier> + </dependency> + <dependency> + <groupId>org.apache.kafka</groupId> + <artifactId>kafka_2.11</artifactId> + <version>0.10.1.0</version> + <classifier>test</classifier> +<!-- <scope>test</scope> --> + <exclusions> + <exclusion> + <artifactId>slf4j-log4j12</artifactId> + <groupId>org.slf4j</groupId> + </exclusion> + </exclusions> + </dependency> + <!-- Testing dependencies. --> <dependency> <groupId>junit</groupId> http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/b03b1893/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/BindingSetSerializer.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/BindingSetSerializer.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/BindingSetSerializer.java new file mode 100644 index 0000000..7b35fec --- /dev/null +++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/BindingSetSerializer.java @@ -0,0 +1,137 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.indexing.pcj.fluo.app.export.rya; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.util.ArrayList; +import java.util.Map; + +import org.apache.kafka.common.serialization.Deserializer; +import org.apache.kafka.common.serialization.Serializer; +import org.apache.log4j.Logger; +import org.apache.rya.api.domain.RyaType; +import org.apache.rya.api.resolver.RdfToRyaConversions; +import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet; +import org.openrdf.model.URI; +import org.openrdf.model.Value; +import org.openrdf.model.ValueFactory; +import org.openrdf.model.impl.URIImpl; +import org.openrdf.model.impl.ValueFactoryImpl; +import org.openrdf.model.vocabulary.XMLSchema; +import org.openrdf.query.Binding; +import org.openrdf.query.BindingSet; +import org.openrdf.query.impl.ListBindingSet; + +import com.esotericsoftware.kryo.Kryo; +import com.esotericsoftware.kryo.io.Input; +import com.esotericsoftware.kryo.io.Output; + +public class BindingSetSerializer implements Serializer<VisibilityBindingSet>, Deserializer<VisibilityBindingSet> { + private static final ThreadLocal<Kryo> kryos = new ThreadLocal<Kryo>() { + @Override + protected Kryo initialValue() { + Kryo kryo = new Kryo(); + return kryo; + }; + }; + + @Override + public VisibilityBindingSet deserialize(String topic, byte[] data) { + KryoInternalSerializer internalSerializer = new KryoInternalSerializer(); + Input input = new Input(new ByteArrayInputStream(data)); + return internalSerializer.read(kryos.get(), input, VisibilityBindingSet.class); + // this is an alternative, or perhaps replace it: + // return (new VisibilityBindingSetStringConverter()).convert(new String(data, StandardCharsets.UTF_8), null); + } + + @Override + public void configure(Map<String, ?> configs, boolean isKey) { + // Do nothing. + } + + @Override + public byte[] serialize(String topic, VisibilityBindingSet data) { + KryoInternalSerializer internalSerializer = new KryoInternalSerializer(); + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + Output output = new Output(baos); + internalSerializer.write(kryos.get(), output, data); + output.flush(); + byte[] array = baos.toByteArray(); + return array; + // this is an alternative, or perhaps replace it: + // return (new VisibilityBindingSetStringConverter()).convert(data, null).getBytes(StandardCharsets.UTF_8); + } + + @Override + public void close() { + // Do nothing. + } + + private static Value makeValue(final String valueString, final URI typeURI) { + // Convert the String Value into a Value. + final ValueFactory valueFactory = ValueFactoryImpl.getInstance(); + if (typeURI.equals(XMLSchema.ANYURI)) { + return valueFactory.createURI(valueString); + } else { + return valueFactory.createLiteral(valueString, typeURI); + } + } + + /** + * De/Serialize a visibility binding set using the Kryo library. + * TODO rename this KryoSomething and change the package. + * + */ + private static class KryoInternalSerializer extends com.esotericsoftware.kryo.Serializer<VisibilityBindingSet> { + private static final Logger log = Logger.getLogger(BindingSetSerializer.class); + @Override + public void write(Kryo kryo, Output output, VisibilityBindingSet visBindingSet) { + log.debug("Serializer writing visBindingSet" + visBindingSet); + output.writeString(visBindingSet.getVisibility()); + // write the number count for the reader. + output.writeInt(visBindingSet.size()); + for (Binding binding : visBindingSet) { + output.writeString(binding.getName()); + final RyaType ryaValue = RdfToRyaConversions.convertValue(binding.getValue()); + final String valueString = ryaValue.getData(); + final URI type = ryaValue.getDataType(); + output.writeString(valueString); + output.writeString(type.toString()); + } + } + + @Override + public VisibilityBindingSet read(Kryo kryo, Input input, Class<VisibilityBindingSet> aClass) { + log.debug("Serializer reading visBindingSet"); + String visibility = input.readString(); + int bindingCount = input.readInt(); + ArrayList<String> namesList = new ArrayList<String>(bindingCount); + ArrayList<Value> valuesList = new ArrayList<Value>(bindingCount); + for (int i = bindingCount; i > 0; i--) { + namesList.add(input.readString()); + String valueString = input.readString(); + final URI type = new URIImpl(input.readString()); + valuesList.add(makeValue(valueString, type)); + } + BindingSet bindingSet = new ListBindingSet(namesList, valuesList); + return new VisibilityBindingSet(bindingSet, visibility); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/b03b1893/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/KafkaExportParameters.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/KafkaExportParameters.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/KafkaExportParameters.java new file mode 100644 index 0000000..3dbb1d8 --- /dev/null +++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/KafkaExportParameters.java @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.indexing.pcj.fluo.app.export.rya; + +import java.util.Map; +import java.util.Properties; + +import org.apache.fluo.api.observer.Observer; +import org.apache.rya.indexing.pcj.fluo.app.export.ParametersBase; + +/** + * Provides read/write functions to the parameters map that is passed into an + * {@link Observer#init(io.fluo.api.observer.Observer.Context)} method related + * to PCJ exporting to a kafka topic. + * Remember: if doesn't count unless it is added to params + */ + +public class KafkaExportParameters extends ParametersBase { + + public static final String CONF_EXPORT_TO_KAFKA = "pcj.fluo.export.kafka.enabled"; + + public KafkaExportParameters(final Map<String, String> params) { + super(params); + } + + /** + * @param isExportToKafka + * - {@code True} if the Fluo application should export + * to Kafka; otherwise {@code false}. + */ + public void setExportToKafka(final boolean isExportToKafka) { + setBoolean(params, CONF_EXPORT_TO_KAFKA, isExportToKafka); + } + + /** + * @return {@code True} if the Fluo application should export to Kafka; otherwise + * {@code false}. Defaults to {@code false} if no value is present. + */ + public boolean isExportToKafka() { + return getBoolean(params, CONF_EXPORT_TO_KAFKA, false); + } + + /** + * Add the properties to the params, NOT keeping them separate from the other params. + * Guaranteed by Properties: Each key and its corresponding value in the property list is a string. + * + * @param producerConfig + */ + public void setProducerConfig(final Properties producerConfig) { + for (Object key : producerConfig.keySet().toArray()) { + Object value = producerConfig.getProperty(key.toString()); + this.params.put(key.toString(), value.toString()); + } + } + + /** + * @return all the params (not just kafka producer Configuration) as a {@link Properties} + */ + public Properties getProducerConfig() { + Properties props = new Properties(); + for (Object key : params.keySet().toArray()) { + Object value = params.get(key.toString()); + props.put(key.toString(), value.toString()); + } + return props; + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/b03b1893/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/KafkaResultExporter.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/KafkaResultExporter.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/KafkaResultExporter.java new file mode 100644 index 0000000..362efa7 --- /dev/null +++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/KafkaResultExporter.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.indexing.pcj.fluo.app.export.rya; + +import static com.google.common.base.Preconditions.checkNotNull; + +import org.apache.fluo.api.client.TransactionBase; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.log4j.Logger; +import org.apache.rya.indexing.pcj.fluo.app.export.IncrementalResultExporter; +import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns; +import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet; + +/** + * Incrementally exports SPARQL query results to Kafka topics. + */ +public class KafkaResultExporter implements IncrementalResultExporter { + private final KafkaProducer<String, VisibilityBindingSet> producer; + private static final Logger log = Logger.getLogger(KafkaResultExporter.class); + + /** + * Constructs an instance given a Kafka producer. + * + * @param producer + * for sending result set alerts to a broker. (not null) + * created and configured by {@link KafkaResultExporterFactory} + */ + public KafkaResultExporter(KafkaProducer<String, VisibilityBindingSet> producer) { + super(); + checkNotNull(producer, "Producer is required."); + this.producer = producer; + } + + /** + * Send the results to the topic using the queryID as the topicname + */ + @Override + public void export(final TransactionBase fluoTx, final String queryId, final VisibilityBindingSet result) throws ResultExportException { + checkNotNull(fluoTx); + checkNotNull(queryId); + checkNotNull(result); + try { + final String pcjId = fluoTx.gets(queryId, FluoQueryColumns.RYA_PCJ_ID); + String msg = "out to kafta topic: queryId=" + queryId + " pcjId=" + pcjId + " result=" + result; + log.info(msg); + + // Send result on topic + ProducerRecord<String, VisibilityBindingSet> rec = new ProducerRecord<String, VisibilityBindingSet>(/* topicname= */ queryId, /* value= */ result); + // Can add a key if you need to: + // ProducerRecord(String topic, K key, V value) + producer.send(rec); + log.debug("producer.send(rec) completed"); + + } catch (final Throwable e) { + throw new ResultExportException("A result could not be exported to Kafka.", e); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/b03b1893/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/KafkaResultExporterFactory.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/KafkaResultExporterFactory.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/KafkaResultExporterFactory.java new file mode 100644 index 0000000..9418720 --- /dev/null +++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/KafkaResultExporterFactory.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.indexing.pcj.fluo.app.export.rya; + +import org.apache.fluo.api.observer.Observer.Context; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.log4j.Logger; +import org.apache.rya.indexing.pcj.fluo.app.export.IncrementalResultExporter; +import org.apache.rya.indexing.pcj.fluo.app.export.IncrementalResultExporterFactory; +import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet; + +import com.google.common.base.Optional; + +/** + * Creates instances of {@link KafkaResultExporter}. + * <p/> + * Configure a Kafka producer by adding several required Key/values as described here: + * http://kafka.apache.org/documentation.html#producerconfigs + * <p/> + * Here is a simple example: + * <pre> + * Properties producerConfig = new Properties(); + * producerConfig.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); + * producerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer"); + * producerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); + * </pre> + * + * @see ProducerConfig + */ +public class KafkaResultExporterFactory implements IncrementalResultExporterFactory { + private static final Logger log = Logger.getLogger(KafkaResultExporterFactory.class); + @Override + public Optional<IncrementalResultExporter> build(Context context) throws IncrementalExporterFactoryException, ConfigurationException { + final KafkaExportParameters exportParams = new KafkaExportParameters(context.getObserverConfiguration().toMap()); + log.debug("KafkaResultExporterFactory.build(): params.isExportToKafka()=" + exportParams.isExportToKafka()); + if (exportParams.isExportToKafka()) { + // Setup Kafka connection + KafkaProducer<String, VisibilityBindingSet> producer = new KafkaProducer<String, VisibilityBindingSet>(exportParams.getProducerConfig()); + // Create the exporter + final IncrementalResultExporter exporter = new KafkaResultExporter(producer); + return Optional.of(exporter); + } else { + return Optional.absent(); + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/b03b1893/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/QueryResultObserver.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/QueryResultObserver.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/QueryResultObserver.java index bbca128..a8fc6d9 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/QueryResultObserver.java +++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/QueryResultObserver.java @@ -23,11 +23,17 @@ import static org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.NO import java.util.HashMap; import java.util.Map; +import org.apache.fluo.api.client.TransactionBase; +import org.apache.fluo.api.data.Bytes; +import org.apache.fluo.api.data.Column; +import org.apache.fluo.api.observer.AbstractObserver; import org.apache.log4j.Logger; +import org.apache.rya.accumulo.utils.VisibilitySimplifier; import org.apache.rya.indexing.pcj.fluo.app.export.IncrementalResultExporter; import org.apache.rya.indexing.pcj.fluo.app.export.IncrementalResultExporter.ResultExportException; import org.apache.rya.indexing.pcj.fluo.app.export.IncrementalResultExporterFactory; import org.apache.rya.indexing.pcj.fluo.app.export.IncrementalResultExporterFactory.IncrementalExporterFactoryException; +import org.apache.rya.indexing.pcj.fluo.app.export.rya.KafkaResultExporterFactory; import org.apache.rya.indexing.pcj.fluo.app.export.rya.RyaResultExporterFactory; import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns; import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryMetadataDAO; @@ -38,11 +44,6 @@ import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSetStringCo import com.google.common.base.Optional; import com.google.common.collect.ImmutableSet; -import org.apache.fluo.api.client.TransactionBase; -import org.apache.fluo.api.data.Bytes; -import org.apache.fluo.api.data.Column; -import org.apache.fluo.api.observer.AbstractObserver; -import org.apache.rya.accumulo.utils.VisibilitySimplifier; /** * Performs incremental result exporting to the configured destinations. @@ -69,6 +70,7 @@ public class QueryResultObserver extends AbstractObserver { private static final ImmutableSet<IncrementalResultExporterFactory> factories = ImmutableSet.<IncrementalResultExporterFactory>builder() .add(new RyaResultExporterFactory()) + .add(new KafkaResultExporterFactory()) .build(); /** @@ -90,6 +92,8 @@ public class QueryResultObserver extends AbstractObserver { for(final IncrementalResultExporterFactory builder : factories) { try { + log.debug("QueryResultObserver.init(): for each exportersBuilder=" + builder); + final Optional<IncrementalResultExporter> exporter = builder.build(context); if(exporter.isPresent()) { exportersBuilder.add(exporter.get()); http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/b03b1893/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/KafkaExportParametersTest.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/KafkaExportParametersTest.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/KafkaExportParametersTest.java new file mode 100644 index 0000000..1e5adbf --- /dev/null +++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/KafkaExportParametersTest.java @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.indexing.pcj.fluo.app.export.rya; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; + +import java.util.HashMap; +import java.util.Map; +import java.util.Properties; + +import org.junit.Test; + +/** + * Tests the methods of {@link KafkaExportParameters}. + */ +public class KafkaExportParametersTest { + + @Test + public void writeParams() { + final Map<String, String> params = new HashMap<>(); + + // Load some values into the params using the wrapper. + final KafkaExportParameters kafkaParams = new KafkaExportParameters(params); + kafkaParams.setExportToKafka(true); + + // Ensure the params map has the expected values. + final Map<String, String> expectedParams = new HashMap<>(); + expectedParams.put(KafkaExportParameters.CONF_EXPORT_TO_KAFKA, "true"); + assertTrue(kafkaParams.isExportToKafka()); + assertEquals(expectedParams, params); + + // now go the other way. + expectedParams.put(KafkaExportParameters.CONF_EXPORT_TO_KAFKA, "false"); + kafkaParams.setExportToKafka(false); + assertFalse(kafkaParams.isExportToKafka()); + assertEquals(expectedParams, params); + } + @Test + public void writeParamsProps() { + final String key1 = "key1"; + final String value1First = "value1-preserve-this"; + final String value1Second = "value1prop"; + final String key2 = "æå¤äºå¦é±æååæ¥éé©ç¤¾ã"; // http://generator.lorem-ipsum.info/_chinese + final String value2 = "è¯æ²»é®®ç¿æ§ç¤¾è²»èä½µç 極é¨ã"; + + final Map<String, String> params = new HashMap<>(); + // Make sure export key1 is NOT kept separate from producer config key1 + // This is a change, originally they were kept separate. + params.put(key1, value1First); + final KafkaExportParameters kafkaParams = new KafkaExportParameters(params); + // Load some values into the properties using the wrapper. + Properties props = new Properties(); + props.put(key1, value1Second); + props.put(key2, value2); + kafkaParams.setProducerConfig(props); + Properties propsAfter = kafkaParams.getProducerConfig(); + assertEquals(props, propsAfter); + assertEquals(params, params); + assertEquals("Should change identical parameters key", params.get(key1), value1Second); + assertEquals("Props should have params's key", propsAfter.get(key1), value1Second); + assertNotNull("Should have props key", params.get(key2)); + } + + @Test + public void notConfigured() { + final Map<String, String> params = new HashMap<>(); + + // Ensure an unconfigured parameters map will say kafka export is disabled. + final KafkaExportParameters kafkaParams = new KafkaExportParameters(params); + assertFalse(kafkaParams.isExportToKafka()); + } + + @Test + public void testKafkaResultExporterFactory() { + KafkaResultExporterFactory factory = new KafkaResultExporterFactory(); + assertNotNull(factory); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/b03b1893/extras/rya.pcj.fluo/pcj.fluo.integration/pom.xml ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.integration/pom.xml b/extras/rya.pcj.fluo/pcj.fluo.integration/pom.xml index 6bb7105..b7adad6 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.integration/pom.xml +++ b/extras/rya.pcj.fluo/pcj.fluo.integration/pom.xml @@ -56,5 +56,39 @@ <groupId>org.apache.fluo</groupId> <artifactId>fluo-api</artifactId> </dependency> + + <dependency> + <groupId>org.apache.kafka</groupId> + <artifactId>kafka-clients</artifactId> + <version>0.10.1.0</version> + </dependency> + <dependency> + <groupId>org.apache.kafka</groupId> + <artifactId>kafka_2.11</artifactId> + <version>0.10.1.0</version> + <exclusions> + <exclusion> + <artifactId>slf4j-log4j12</artifactId> + <groupId>org.slf4j</groupId> + </exclusion> + </exclusions> + </dependency> + <!-- Testing dependencies. --> + <dependency> + <groupId>org.apache.kafka</groupId> + <artifactId>kafka_2.11</artifactId> + <version>0.10.1.0</version> + <classifier>test</classifier> +<!-- <scope>test</scope> --> + <exclusions> + <exclusion> + <artifactId>slf4j-log4j12</artifactId> + <groupId>org.slf4j</groupId> + </exclusion> + </exclusions> + </dependency> + + + </dependencies> </project> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/b03b1893/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/ITBase.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/ITBase.java b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/ITBase.java index 293426f..fa9a10e 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/ITBase.java +++ b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/ITBase.java @@ -390,11 +390,6 @@ public abstract class ITBase { return conf; } - /** - * Setup a Mini Fluo cluster that uses a temporary directory to store its data. - * - * @return A Mini Fluo cluster. - */ protected MiniFluo startMiniFluo() throws AlreadyInitializedException, TableExistsException { // Setup the observers that will be used by the Fluo PCJ Application. final List<ObserverSpecification> observers = new ArrayList<>(); @@ -403,14 +398,9 @@ public abstract class ITBase { observers.add(new ObserverSpecification(JoinObserver.class.getName())); observers.add(new ObserverSpecification(FilterObserver.class.getName())); + // Set export details for exporting from Fluo to a Rya repository and a subscriber queue. final HashMap<String, String> exportParams = new HashMap<>(); - final RyaExportParameters ryaParams = new RyaExportParameters(exportParams); - ryaParams.setExportToRya(true); - ryaParams.setRyaInstanceName(RYA_INSTANCE_NAME); - ryaParams.setAccumuloInstanceName(instanceName); - ryaParams.setZookeeperServers(zookeepers); - ryaParams.setExporterUsername(ITBase.ACCUMULO_USER); - ryaParams.setExporterPassword(ITBase.ACCUMULO_PASSWORD); + setExportParameters(exportParams); // Configure the export observer to export new PCJ results to the mini accumulo cluster. final ObserverSpecification exportObserverConfig = new ObserverSpecification(QueryResultObserver.class.getName(), exportParams); @@ -433,4 +423,20 @@ public abstract class ITBase { FluoFactory.newAdmin(config).initialize(new FluoAdmin.InitializationOptions().setClearTable(true).setClearZookeeper(true) ); return FluoFactory.newMiniFluo(config); } + + /** + * Set export details for exporting from Fluo to a Rya repository and a subscriber queue. + * Override this if you have custom export destinations. + * + * @param exportParams + */ + protected void setExportParameters(final HashMap<String, String> exportParams) { + final RyaExportParameters ryaParams = new RyaExportParameters(exportParams); + ryaParams.setExportToRya(true); + ryaParams.setRyaInstanceName(RYA_INSTANCE_NAME); + ryaParams.setAccumuloInstanceName(instanceName); + ryaParams.setZookeeperServers(zookeepers); + ryaParams.setExporterUsername(ITBase.ACCUMULO_USER); + ryaParams.setExporterPassword(ITBase.ACCUMULO_PASSWORD); + } } http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/b03b1893/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/KafkaExportIT.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/KafkaExportIT.java b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/KafkaExportIT.java new file mode 100644 index 0000000..10d2530 --- /dev/null +++ b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/KafkaExportIT.java @@ -0,0 +1,290 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.indexing.pcj.fluo.integration; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotEquals; +import static org.junit.Assert.assertTrue; + +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.util.Arrays; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.Properties; +import java.util.Set; + +import org.I0Itec.zkclient.ZkClient; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.rya.api.domain.RyaStatement; +import org.apache.rya.indexing.pcj.fluo.ITBase; +import org.apache.rya.indexing.pcj.fluo.api.CreatePcj; +import org.apache.rya.indexing.pcj.fluo.api.InsertTriples; +import org.apache.rya.indexing.pcj.fluo.app.export.rya.KafkaExportParameters; +import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage; +import org.apache.rya.indexing.pcj.storage.accumulo.AccumuloPcjStorage; +import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet; +import org.junit.Test; +import org.openrdf.model.impl.URIImpl; +import org.openrdf.query.Binding; +import org.openrdf.query.BindingSet; +import org.openrdf.query.impl.BindingImpl; + +import com.google.common.base.Optional; +import com.google.common.collect.Sets; + +import kafka.admin.AdminUtils; +import kafka.admin.RackAwareMode; +import kafka.server.KafkaConfig; +import kafka.server.KafkaServer; +import kafka.utils.MockTime; +import kafka.utils.TestUtils; +import kafka.utils.Time; +import kafka.utils.ZKStringSerializer$; +import kafka.utils.ZkUtils; +import kafka.zk.EmbeddedZookeeper; + +/** + * Performs integration tests over the Fluo application geared towards Kafka PCJ exporting. + * <p> + * These tests might be ignored so that they will not run as unit tests while building the application. + * Run this test from Maven command line: + * $ cd rya/extras/rya.pcj.fluo/pcj.fluo.integration + * $ mvn surefire:test -Dtest=KafkaExportIT + */ +public class KafkaExportIT extends ITBase { + + private static final String ZKHOST = "127.0.0.1"; + private static final String BROKERHOST = "127.0.0.1"; + private static final String BROKERPORT = "9092"; + private static final String TOPIC = "testTopic"; + private ZkUtils zkUtils; + private KafkaServer kafkaServer; + private EmbeddedZookeeper zkServer; + private ZkClient zkClient; + + + /** + * setup mini kafka and call the super to setup mini fluo + * + * @see org.apache.rya.indexing.pcj.fluo.ITBase#setupMiniResources() + */ + @Override + public void setupMiniResources() throws Exception { + super.setupMiniResources(); + + zkServer = new EmbeddedZookeeper(); + String zkConnect = ZKHOST + ":" + zkServer.port(); + zkClient = new ZkClient(zkConnect, 30000, 30000, ZKStringSerializer$.MODULE$); + zkUtils = ZkUtils.apply(zkClient, false); + + // setup Broker + Properties brokerProps = new Properties(); + brokerProps.setProperty("zookeeper.connect", zkConnect); + brokerProps.setProperty("broker.id", "0"); + brokerProps.setProperty("log.dirs", Files.createTempDirectory("kafka-").toAbsolutePath().toString()); + brokerProps.setProperty("listeners", "PLAINTEXT://" + BROKERHOST + ":" + BROKERPORT); + KafkaConfig config = new KafkaConfig(brokerProps); + Time mock = new MockTime(); + kafkaServer = TestUtils.createServer(config, mock); + + System.out.println("setup kafka and fluo."); + } + + /** + * Test kafka without rya code to make sure kafka works in this environment. + * If this test fails then its a testing environment issue, not with Rya. + * Source: https://github.com/asmaier/mini-kafka + * + * @throws InterruptedException + * @throws IOException + */ + @Test + public void embeddedKafkaTest() throws InterruptedException, IOException { + + // create topic + AdminUtils.createTopic(zkUtils, TOPIC, 1, 1, new Properties(), RackAwareMode.Disabled$.MODULE$); + + // setup producer + Properties producerProps = new Properties(); + producerProps.setProperty("bootstrap.servers", BROKERHOST + ":" + BROKERPORT); + producerProps.setProperty("key.serializer","org.apache.kafka.common.serialization.IntegerSerializer"); + producerProps.setProperty("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer"); + KafkaProducer<Integer, byte[]> producer = new KafkaProducer<Integer, byte[]>(producerProps); + + // setup consumer + Properties consumerProps = new Properties(); + consumerProps.setProperty("bootstrap.servers", BROKERHOST + ":" + BROKERPORT); + consumerProps.setProperty("group.id", "group0"); + consumerProps.setProperty("client.id", "consumer0"); + consumerProps.setProperty("key.deserializer","org.apache.kafka.common.serialization.IntegerDeserializer"); + consumerProps.setProperty("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer"); + consumerProps.put("auto.offset.reset", "earliest"); // to make sure the consumer starts from the beginning of the topic + KafkaConsumer<Integer, byte[]> consumer = new KafkaConsumer<>(consumerProps); + consumer.subscribe(Arrays.asList(TOPIC)); + + // send message + ProducerRecord<Integer, byte[]> data = new ProducerRecord<>(TOPIC, 42, "test-message".getBytes(StandardCharsets.UTF_8)); + producer.send(data); + producer.close(); + + // starting consumer + ConsumerRecords<Integer, byte[]> records = consumer.poll(3000); + assertEquals(1, records.count()); + Iterator<ConsumerRecord<Integer, byte[]>> recordIterator = records.iterator(); + ConsumerRecord<Integer, byte[]> record = recordIterator.next(); + System.out.printf("offset = %d, key = %s, value = %s", record.offset(), record.key(), record.value()); + assertEquals(42, (int) record.key()); + assertEquals("test-message", new String(record.value(), StandardCharsets.UTF_8)); + consumer.close(); + } + + @Test + public void newResultsExportedTest() throws Exception { + final String sparql = "SELECT ?customer ?worker ?city " + "{ " + "FILTER(?customer = <http://Alice>) " + "FILTER(?city = <http://London>) " + "?customer <http://talksTo> ?worker. " + "?worker <http://livesIn> ?city. " + "?worker <http://worksAt> <http://Chipotle>. " + "}"; + + // Triples that will be streamed into Fluo after the PCJ has been created. + final Set<RyaStatement> streamedTriples = Sets.newHashSet(makeRyaStatement("http://Alice", "http://talksTo", "http://Bob"), makeRyaStatement("http://Bob", "http://livesIn", "http://London"), makeRyaStatement("http://Bob", "http://worksAt", "http://Chipotle"), + makeRyaStatement("http://Alice", "http://talksTo", "http://Charlie"), makeRyaStatement("http://Charlie", "http://livesIn", "http://London"), makeRyaStatement("http://Charlie", "http://worksAt", "http://Chipotle"), + makeRyaStatement("http://Alice", "http://talksTo", "http://David"), makeRyaStatement("http://David", "http://livesIn", "http://London"), makeRyaStatement("http://David", "http://worksAt", "http://Chipotle"), + makeRyaStatement("http://Alice", "http://talksTo", "http://Eve"), makeRyaStatement("http://Eve", "http://livesIn", "http://Leeds"), makeRyaStatement("http://Eve", "http://worksAt", "http://Chipotle"), + makeRyaStatement("http://Frank", "http://talksTo", "http://Alice"), makeRyaStatement("http://Frank", "http://livesIn", "http://London"), makeRyaStatement("http://Frank", "http://worksAt", "http://Chipotle")); + + // The expected results of the SPARQL query once the PCJ has been computed. + final Set<BindingSet> expected = new HashSet<>(); + expected.add(makeBindingSet(new BindingImpl("customer", new URIImpl("http://Alice")), new BindingImpl("worker", new URIImpl("http://Bob")), new BindingImpl("city", new URIImpl("http://London")))); + expected.add(makeBindingSet(new BindingImpl("customer", new URIImpl("http://Alice")), new BindingImpl("worker", new URIImpl("http://Charlie")), new BindingImpl("city", new URIImpl("http://London")))); + expected.add(makeBindingSet(new BindingImpl("customer", new URIImpl("http://Alice")), new BindingImpl("worker", new URIImpl("http://David")), new BindingImpl("city", new URIImpl("http://London")))); + + // Create the PCJ table. + final PrecomputedJoinStorage pcjStorage = new AccumuloPcjStorage(accumuloConn, RYA_INSTANCE_NAME); + final String pcjId = pcjStorage.createPcj(sparql); + + // Tell the Fluo app to maintain the PCJ. + CreatePcj createPcj = new CreatePcj(); + String QueryIdIsTopicName = createPcj.withRyaIntegration(pcjId, pcjStorage, fluoClient, accumuloConn, RYA_INSTANCE_NAME); + + // Stream the data into Fluo. + new InsertTriples().insert(fluoClient, streamedTriples, Optional.<String> absent()); + + // Fetch the exported results from Accumulo once the observers finish working. + fluo.waitForObservers(); + + /// KafkaConsumer<Integer, byte[]> consumer = makeConsumer(QueryIdIsTopicName); + KafkaConsumer<Integer, VisibilityBindingSet> consumer = makeConsumer(QueryIdIsTopicName); + + // starting consumer polling for messages + /// ConsumerRecords<Integer, byte[]> records = consumer.poll(3000); + ConsumerRecords<Integer, VisibilityBindingSet> records = consumer.poll(3000); + /// Iterator<ConsumerRecord<Integer, byte[]>> recordIterator = records.iterator(); + Iterator<ConsumerRecord<Integer, VisibilityBindingSet>> recordIterator = records.iterator(); + boolean allExpected = true; + ConsumerRecord<Integer, VisibilityBindingSet> unexpectedRecord = null; + while (recordIterator.hasNext()) { + ConsumerRecord<Integer, VisibilityBindingSet> record = recordIterator.next(); + System.out.printf("Consumed offset = %d, key = %s, value = %s", record.offset(), record.key(), record.value().toString()); + boolean expectedThis = expected.contains(record.value()); + if (!expectedThis) { + System.out.println("This consumed record is not expected."); + unexpectedRecord = record; + } + allExpected = allExpected && expectedThis; + } + assertTrue("Must consume expected record: not expected:" + unexpectedRecord, allExpected); + assertNotEquals("Should get some results", 0, records.count()); + // assertEquals(42, (int) record.key()); + // assertEquals("test-message", new String(record.value(), StandardCharsets.UTF_8)); + + } + + /** + * A helper function for creating a {@link BindingSet} from an array of + * {@link Binding}s. + * + * @param bindings + * - The bindings to include in the set. (not null) + * @return A {@link BindingSet} holding the bindings. + */ + protected static BindingSet makeBindingSet(final Binding... bindings) { + return new VisibilityBindingSet(ITBase.makeBindingSet(bindings)); + } + + /** + * @param TopicName + * @return + */ + protected KafkaConsumer<Integer, VisibilityBindingSet> makeConsumer(String TopicName) { + // setup consumer + Properties consumerProps = new Properties(); + consumerProps.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BROKERHOST + ":" + BROKERPORT); + consumerProps.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "group0"); + consumerProps.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, "consumer0"); + consumerProps.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.IntegerDeserializer"); + consumerProps.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.rya.indexing.pcj.fluo.app.export.rya.BindingSetSerializer"); + // "org.apache.kafka.common.serialization.ByteArrayDeserializer"); + consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); // to make sure the consumer starts from the beginning of the topic + /// KafkaConsumer<Integer, byte[]> consumer = new KafkaConsumer<>(consumerProps); + KafkaConsumer<Integer, VisibilityBindingSet> consumer = new KafkaConsumer<>(consumerProps); + consumer.subscribe(Arrays.asList(TopicName)); + return consumer; + } + + /** + * Add info about the kafka queue/topic to receive the export. + * Call super to get the Rya parameters. + * + * @see org.apache.rya.indexing.pcj.fluo.ITBase#setExportParameters(java.util.HashMap) + */ + @Override + protected void setExportParameters(HashMap<String, String> exportParams) { + // Get the defaults + super.setExportParameters(exportParams); + // Add the kafka parameters + final KafkaExportParameters kafkaParams = new KafkaExportParameters(exportParams); + kafkaParams.setExportToKafka(true); + // Configure the Producer + Properties producerConfig = new Properties(); + producerConfig.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BROKERHOST + ":" + BROKERPORT); + producerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer"); + producerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.rya.indexing.pcj.fluo.app.export.rya.BindingSetSerializer"); + // "org.apache.kafka.common.serialization.StringSerializer"); + kafkaParams.setProducerConfig(producerConfig); + } + + /** + * Close all the Kafka mini server and mini-zookeeper + * + * @see org.apache.rya.indexing.pcj.fluo.ITBase#shutdownMiniResources() + */ + @Override + public void shutdownMiniResources() { + super.shutdownMiniResources(); + kafkaServer.shutdown(); + zkClient.close(); + zkServer.shutdown(); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/b03b1893/extras/rya.prospector/pom.xml ---------------------------------------------------------------------- diff --git a/extras/rya.prospector/pom.xml b/extras/rya.prospector/pom.xml index 0a3b6cf..952ab94 100644 --- a/extras/rya.prospector/pom.xml +++ b/extras/rya.prospector/pom.xml @@ -75,6 +75,45 @@ under the License. </excludes> </configuration> </plugin> + <!--This plugin's configuration is used to store Eclipse m2e settings only. It has no influence on the Maven build itself.--> + <plugin> + <groupId>org.eclipse.m2e</groupId> + <artifactId>lifecycle-mapping</artifactId> + <version>1.0.0</version> + <configuration> + <lifecycleMappingMetadata> + <pluginExecutions> + <pluginExecution> + <pluginExecutionFilter> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-compiler-plugin</artifactId> + <versionRange>[3.2,)</versionRange> + <goals> + <goal>compile</goal> + <goal>testCompile</goal> + </goals> + </pluginExecutionFilter> + <action> + <ignore></ignore> + </action> + </pluginExecution> + <pluginExecution> + <pluginExecutionFilter> + <groupId>org.codehaus.groovy</groupId> + <artifactId>groovy-eclipse-compiler</artifactId> + <versionRange>[2.9.1-01,)</versionRange> + <goals> + <goal>add-groovy-build-paths</goal> + </goals> + </pluginExecutionFilter> + <action> + <ignore></ignore> + </action> + </pluginExecution> + </pluginExecutions> + </lifecycleMappingMetadata> + </configuration> + </plugin> </plugins> </pluginManagement> <plugins>