http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/60090ad5/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/IncrementalRyaSubGraphExporter.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/IncrementalRyaSubGraphExporter.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/IncrementalRyaSubGraphExporter.java new file mode 100644 index 0000000..797502c --- /dev/null +++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/IncrementalRyaSubGraphExporter.java @@ -0,0 +1,39 @@ +package org.apache.rya.indexing.pcj.fluo.app.export; +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +import org.apache.rya.api.domain.RyaSubGraph; +import org.apache.rya.indexing.pcj.fluo.app.export.IncrementalBindingSetExporter.ResultExportException; + +/** + * Incrementally exports {@link RyaSubGraph}s that are generated by SPARQL Construct Queries + * from the Rya-Fluo application to the core Rya tables. + * + */ +public interface IncrementalRyaSubGraphExporter extends AutoCloseable { + + /** + * Export a RyaSubGraph that is the result of SPARQL Construct Query. + * + * @param constructID - The Fluo Id of the construct query the created the RyaSubGraph + * @param subgraph - The RyaSubGraph to export (non-null) + * @throws ResultExportException The result could not be exported. + */ + public void export(String constructID, RyaSubGraph subgraph) throws ResultExportException; + +}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/60090ad5/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/IncrementalRyaSubGraphExporterFactory.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/IncrementalRyaSubGraphExporterFactory.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/IncrementalRyaSubGraphExporterFactory.java new file mode 100644 index 0000000..ecbec09 --- /dev/null +++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/IncrementalRyaSubGraphExporterFactory.java @@ -0,0 +1,47 @@ +package org.apache.rya.indexing.pcj.fluo.app.export; +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +import org.apache.fluo.api.observer.Observer.Context; +import org.apache.rya.indexing.pcj.fluo.app.export.IncrementalBindingSetExporterFactory.ConfigurationException; +import org.apache.rya.indexing.pcj.fluo.app.export.IncrementalBindingSetExporterFactory.IncrementalExporterFactoryException; + +import com.google.common.base.Optional; + +/** + * Builds instances of {@link IncrementalRyaSubGraphExporter} using the provided + * configurations. + */ +public interface IncrementalRyaSubGraphExporterFactory { + + /** + * Builds an instance of {@link IncrementalRyaSubGraphExporter} using the + * configurations that are provided. + * + * @param context - Contains the host application's configuration values + * and any parameters that were provided at initialization. (not null) + * @return An exporter if configurations were found in the context; otherwise absent. + * @throws IncrementalExporterFactoryException A non-configuration related + * problem has occurred and the exporter could not be created as a result. + * @throws ConfigurationException Thrown if configuration values were + * provided, but an instance of the exporter could not be initialized + * using them. This could be because they were improperly formatted, + * a required field was missing, or some other configuration based problem. + */ + public Optional<IncrementalRyaSubGraphExporter> build(Context context) throws IncrementalExporterFactoryException, ConfigurationException; +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/60090ad5/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/kafka/KafkaBindingSetExporter.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/kafka/KafkaBindingSetExporter.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/kafka/KafkaBindingSetExporter.java new file mode 100644 index 0000000..152d156 --- /dev/null +++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/kafka/KafkaBindingSetExporter.java @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.indexing.pcj.fluo.app.export.kafka; + +import static com.google.common.base.Preconditions.checkNotNull; + +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; + +import org.apache.fluo.api.client.TransactionBase; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.clients.producer.RecordMetadata; +import org.apache.log4j.Logger; +import org.apache.rya.indexing.pcj.fluo.app.export.IncrementalBindingSetExporter; +import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns; +import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet; + +/** + * Incrementally exports SPARQL query results to Kafka topics. + */ +public class KafkaBindingSetExporter implements IncrementalBindingSetExporter { + private static final Logger log = Logger.getLogger(KafkaBindingSetExporter.class); + + private final KafkaProducer<String, VisibilityBindingSet> producer; + + /** + * Constructs an instance given a Kafka producer. + * + * @param producer + * for sending result set alerts to a broker. (not null) + * Can be created and configured by {@link KafkaBindingSetExporterFactory} + */ + public KafkaBindingSetExporter(KafkaProducer<String, VisibilityBindingSet> producer) { + super(); + checkNotNull(producer, "Producer is required."); + this.producer = producer; + } + + /** + * Send the results to the topic using the queryID as the topicname + */ + @Override + public void export(final TransactionBase fluoTx, final String queryId, final VisibilityBindingSet result) throws ResultExportException { + checkNotNull(fluoTx); + checkNotNull(queryId); + checkNotNull(result); + try { + final String pcjId = fluoTx.gets(queryId, FluoQueryColumns.RYA_PCJ_ID); + final String msg = "out to kafta topic: queryId=" + queryId + " pcjId=" + pcjId + " result=" + result; + log.trace(msg); + + // Send the result to the topic whose name matches the PCJ ID. + final ProducerRecord<String, VisibilityBindingSet> rec = new ProducerRecord<>(pcjId, result); + final Future<RecordMetadata> future = producer.send(rec); + + // Don't let the export return until the result has been written to the topic. Otherwise we may lose results. + future.get(); + + log.debug("producer.send(rec) completed"); + + } catch (final Throwable e) { + throw new ResultExportException("A result could not be exported to Kafka.", e); + } + } + + @Override + public void close() throws Exception { + producer.close(5, TimeUnit.SECONDS); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/60090ad5/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/kafka/KafkaBindingSetExporterFactory.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/kafka/KafkaBindingSetExporterFactory.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/kafka/KafkaBindingSetExporterFactory.java new file mode 100644 index 0000000..5507037 --- /dev/null +++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/kafka/KafkaBindingSetExporterFactory.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.indexing.pcj.fluo.app.export.kafka; + +import org.apache.fluo.api.observer.Observer.Context; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.log4j.Logger; +import org.apache.rya.indexing.pcj.fluo.app.export.IncrementalBindingSetExporter; +import org.apache.rya.indexing.pcj.fluo.app.export.IncrementalBindingSetExporterFactory; +import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet; + +import com.google.common.base.Optional; + +/** + * Creates instances of {@link KafkaBindingSetExporter}. + * <p/> + * Configure a Kafka producer by adding several required Key/values as described here: + * http://kafka.apache.org/documentation.html#producerconfigs + * <p/> + * Here is a simple example: + * <pre> + * Properties producerConfig = new Properties(); + * producerConfig.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); + * producerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer"); + * producerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); + * </pre> + * + * @see ProducerConfig + */ +public class KafkaBindingSetExporterFactory implements IncrementalBindingSetExporterFactory { + private static final Logger log = Logger.getLogger(KafkaBindingSetExporterFactory.class); + @Override + public Optional<IncrementalBindingSetExporter> build(Context context) throws IncrementalExporterFactoryException, ConfigurationException { + final KafkaExportParameters exportParams = new KafkaExportParameters(context.getObserverConfiguration().toMap()); + log.debug("KafkaResultExporterFactory.build(): params.isExportToKafka()=" + exportParams.isExportToKafka()); + if (exportParams.isExportToKafka()) { + // Setup Kafka connection + KafkaProducer<String, VisibilityBindingSet> producer = new KafkaProducer<String, VisibilityBindingSet>(exportParams.listAllConfig()); + // Create the exporter + final IncrementalBindingSetExporter exporter = new KafkaBindingSetExporter(producer); + return Optional.of(exporter); + } else { + return Optional.absent(); + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/60090ad5/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/kafka/KafkaResultExporter.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/kafka/KafkaResultExporter.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/kafka/KafkaResultExporter.java deleted file mode 100644 index 72ec947..0000000 --- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/kafka/KafkaResultExporter.java +++ /dev/null @@ -1,87 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.rya.indexing.pcj.fluo.app.export.kafka; - -import static com.google.common.base.Preconditions.checkNotNull; - -import java.util.concurrent.Future; -import java.util.concurrent.TimeUnit; - -import org.apache.fluo.api.client.TransactionBase; -import org.apache.kafka.clients.producer.KafkaProducer; -import org.apache.kafka.clients.producer.ProducerRecord; -import org.apache.kafka.clients.producer.RecordMetadata; -import org.apache.log4j.Logger; -import org.apache.rya.indexing.pcj.fluo.app.export.IncrementalResultExporter; -import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns; -import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet; - -/** - * Incrementally exports SPARQL query results to Kafka topics. - */ -public class KafkaResultExporter implements IncrementalResultExporter { - private static final Logger log = Logger.getLogger(KafkaResultExporter.class); - - private final KafkaProducer<String, VisibilityBindingSet> producer; - - /** - * Constructs an instance given a Kafka producer. - * - * @param producer - * for sending result set alerts to a broker. (not null) - * Can be created and configured by {@link KafkaResultExporterFactory} - */ - public KafkaResultExporter(final KafkaProducer<String, VisibilityBindingSet> producer) { - super(); - checkNotNull(producer, "Producer is required."); - this.producer = producer; - } - - /** - * Send the results to the topic using the queryID as the topicname - */ - @Override - public void export(final TransactionBase fluoTx, final String queryId, final VisibilityBindingSet result) throws ResultExportException { - checkNotNull(fluoTx); - checkNotNull(queryId); - checkNotNull(result); - try { - final String pcjId = fluoTx.gets(queryId, FluoQueryColumns.RYA_PCJ_ID); - final String msg = "out to kafta topic: queryId=" + queryId + " pcjId=" + pcjId + " result=" + result; - log.trace(msg); - - // Send the result to the topic whose name matches the PCJ ID. - final ProducerRecord<String, VisibilityBindingSet> rec = new ProducerRecord<>(pcjId, result); - final Future<RecordMetadata> future = producer.send(rec); - - // Don't let the export return until the result has been written to the topic. Otherwise we may lose results. - future.get(); - - log.debug("producer.send(rec) completed"); - - } catch (final Throwable e) { - throw new ResultExportException("A result could not be exported to Kafka.", e); - } - } - - @Override - public void close() throws Exception { - producer.close(5, TimeUnit.SECONDS); - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/60090ad5/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/kafka/KafkaResultExporterFactory.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/kafka/KafkaResultExporterFactory.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/kafka/KafkaResultExporterFactory.java deleted file mode 100644 index 995e9d9..0000000 --- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/kafka/KafkaResultExporterFactory.java +++ /dev/null @@ -1,64 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.rya.indexing.pcj.fluo.app.export.kafka; - -import org.apache.fluo.api.observer.Observer.Context; -import org.apache.kafka.clients.producer.KafkaProducer; -import org.apache.kafka.clients.producer.ProducerConfig; -import org.apache.log4j.Logger; -import org.apache.rya.indexing.pcj.fluo.app.export.IncrementalResultExporter; -import org.apache.rya.indexing.pcj.fluo.app.export.IncrementalResultExporterFactory; -import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet; - -import com.google.common.base.Optional; - -/** - * Creates instances of {@link KafkaResultExporter}. - * <p/> - * Configure a Kafka producer by adding several required Key/values as described here: - * http://kafka.apache.org/documentation.html#producerconfigs - * <p/> - * Here is a simple example: - * <pre> - * Properties producerConfig = new Properties(); - * producerConfig.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); - * producerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer"); - * producerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); - * </pre> - * - * @see ProducerConfig - */ -public class KafkaResultExporterFactory implements IncrementalResultExporterFactory { - private static final Logger log = Logger.getLogger(KafkaResultExporterFactory.class); - @Override - public Optional<IncrementalResultExporter> build(Context context) throws IncrementalExporterFactoryException, ConfigurationException { - final KafkaExportParameters exportParams = new KafkaExportParameters(context.getObserverConfiguration().toMap()); - log.debug("KafkaResultExporterFactory.build(): params.isExportToKafka()=" + exportParams.isExportToKafka()); - if (exportParams.isExportToKafka()) { - // Setup Kafka connection - KafkaProducer<String, VisibilityBindingSet> producer = new KafkaProducer<String, VisibilityBindingSet>(exportParams.listAllConfig()); - // Create the exporter - final IncrementalResultExporter exporter = new KafkaResultExporter(producer); - return Optional.of(exporter); - } else { - return Optional.absent(); - } - } - -} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/60090ad5/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/kafka/KafkaRyaSubGraphExporter.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/kafka/KafkaRyaSubGraphExporter.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/kafka/KafkaRyaSubGraphExporter.java new file mode 100644 index 0000000..a15743f --- /dev/null +++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/kafka/KafkaRyaSubGraphExporter.java @@ -0,0 +1,81 @@ +package org.apache.rya.indexing.pcj.fluo.app.export.kafka; +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +import static com.google.common.base.Preconditions.checkNotNull; + +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; + +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.clients.producer.RecordMetadata; +import org.apache.log4j.Logger; +import org.apache.rya.api.domain.RyaSubGraph; +import org.apache.rya.indexing.pcj.fluo.app.export.IncrementalBindingSetExporter.ResultExportException; +import org.apache.rya.indexing.pcj.fluo.app.export.IncrementalRyaSubGraphExporter; + +import com.google.common.base.Preconditions; + +/** + * Exports {@link RyaSubGraph}s to Kafka from Rya Fluo Application + * + */ +public class KafkaRyaSubGraphExporter implements IncrementalRyaSubGraphExporter { + + private final KafkaProducer<String, RyaSubGraph> producer; + private static final Logger log = Logger.getLogger(KafkaRyaSubGraphExporter.class); + + public KafkaRyaSubGraphExporter(KafkaProducer<String, RyaSubGraph> producer) { + checkNotNull(producer); + this.producer = producer; + } + + /** + * Exports the RyaSubGraph to a Kafka topic equivalent to the result returned by {@link RyaSubGraph#getId()} + * @param subgraph - RyaSubGraph exported to Kafka + * @param contructID - rowID of result that is exported. Used for logging purposes. + */ + @Override + public void export(String constructID, RyaSubGraph subGraph) throws ResultExportException { + checkNotNull(constructID); + checkNotNull(subGraph); + try { + // Send the result to the topic whose name matches the PCJ ID. + final ProducerRecord<String, RyaSubGraph> rec = new ProducerRecord<>(subGraph.getId(), subGraph); + final Future<RecordMetadata> future = producer.send(rec); + + // Don't let the export return until the result has been written to the topic. Otherwise we may lose results. + future.get(); + + log.debug("Producer successfully sent record with id: " + constructID + " and statements: " + subGraph.getStatements()); + + } catch (final Throwable e) { + throw new ResultExportException("A result could not be exported to Kafka.", e); + } + } + + /** + * Closes exporter. + */ + @Override + public void close() throws Exception { + producer.close(5, TimeUnit.SECONDS); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/60090ad5/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/kafka/KafkaRyaSubGraphExporterFactory.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/kafka/KafkaRyaSubGraphExporterFactory.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/kafka/KafkaRyaSubGraphExporterFactory.java new file mode 100644 index 0000000..2c1e4c0 --- /dev/null +++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/kafka/KafkaRyaSubGraphExporterFactory.java @@ -0,0 +1,62 @@ +package org.apache.rya.indexing.pcj.fluo.app.export.kafka; +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +import org.apache.fluo.api.observer.Observer.Context; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.log4j.Logger; +import org.apache.rya.api.domain.RyaSubGraph; +import org.apache.rya.indexing.pcj.fluo.app.export.IncrementalBindingSetExporterFactory.ConfigurationException; +import org.apache.rya.indexing.pcj.fluo.app.export.IncrementalBindingSetExporterFactory.IncrementalExporterFactoryException; +import org.apache.rya.indexing.pcj.fluo.app.export.IncrementalRyaSubGraphExporter; +import org.apache.rya.indexing.pcj.fluo.app.export.IncrementalRyaSubGraphExporterFactory; + +import com.google.common.base.Optional; + +/** + * Factory for creating {@link KafkaRyaSubGraphExporter}s that are used for + * exporting {@link RyaSubGraph}s from the Rya Fluo application to Kafka. + * + */ +public class KafkaRyaSubGraphExporterFactory implements IncrementalRyaSubGraphExporterFactory { + + private static final Logger log = Logger.getLogger(KafkaRyaSubGraphExporterFactory.class); + + /** + * Builds a {@link KafkaRyaSubGraphExporter}. + * @param context - {@link Context} object used to pass configuration parameters + * @return an Optional consisting of an IncrementalSubGraphExproter if it can be constructed + * @throws IncrementalExporterFactoryException + * @throws ConfigurationException + */ + @Override + public Optional<IncrementalRyaSubGraphExporter> build(Context context) throws IncrementalExporterFactoryException, ConfigurationException { + final KafkaExportParameters exportParams = new KafkaExportParameters(context.getObserverConfiguration().toMap()); + log.debug("KafkaRyaSubGraphExporterFactory.build(): params.isExportToKafka()=" + exportParams.isExportToKafka()); + if (exportParams.isExportToKafka()) { + // Setup Kafka connection + KafkaProducer<String, RyaSubGraph> producer = new KafkaProducer<String, RyaSubGraph>(exportParams.listAllConfig()); + // Create the exporter + final IncrementalRyaSubGraphExporter exporter = new KafkaRyaSubGraphExporter(producer); + return Optional.of(exporter); + } else { + return Optional.absent(); + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/60090ad5/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/kafka/RyaSubGraphKafkaSerDe.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/kafka/RyaSubGraphKafkaSerDe.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/kafka/RyaSubGraphKafkaSerDe.java new file mode 100644 index 0000000..ed20e8a --- /dev/null +++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/kafka/RyaSubGraphKafkaSerDe.java @@ -0,0 +1,100 @@ +package org.apache.rya.indexing.pcj.fluo.app.export.kafka; +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +import java.io.ByteArrayOutputStream; +import java.util.Map; + +import org.apache.kafka.common.serialization.Deserializer; +import org.apache.kafka.common.serialization.Serializer; +import org.apache.rya.api.domain.RyaSubGraph; +import org.apache.rya.api.domain.serialization.kryo.RyaSubGraphSerializer; + +import com.esotericsoftware.kryo.Kryo; +import com.esotericsoftware.kryo.io.Input; +import com.esotericsoftware.kryo.io.Output; + +/** + * Kafka {@link Serializer} and {@link Deserializer} for {@link RyaSubGraph}s. + * + */ +public class RyaSubGraphKafkaSerDe implements Serializer<RyaSubGraph>, Deserializer<RyaSubGraph> { + + private final Kryo kryo; + + public RyaSubGraphKafkaSerDe() { + kryo = new Kryo(); + kryo.register(RyaSubGraph.class,new RyaSubGraphSerializer()); + } + + /** + * Deserializes from bytes to a RyaSubGraph + * @param bundleBytes - byte representation of RyaSubGraph + * @return - Deserialized RyaSubGraph + */ + public RyaSubGraph fromBytes(byte[] bundleBytes) { + return kryo.readObject(new Input(bundleBytes), RyaSubGraph.class); + } + + /** + * Serializes RyaSubGraph to bytes + * @param bundle - RyaSubGraph to be serialized + * @return - serialized bytes from RyaSubGraph + */ + public byte[] toBytes(RyaSubGraph bundle) { + Output output = new Output(new ByteArrayOutputStream()); + kryo.writeObject(output, bundle, new RyaSubGraphSerializer()); + return output.getBuffer(); + } + + /** + * Deserializes RyaSubGraph + * @param topic - topic that data is associated with (no effect) + * @param bundleBytes - bytes to be deserialized + * @return - deserialized RyaSubGraph + */ + @Override + public RyaSubGraph deserialize(String topic, byte[] bundleBytes) { + return fromBytes(bundleBytes); + } + + /** + * Serializes RyaSubGraph + * @param subgraph - subgraph to be serialized + * @param topic - topic that data is associated with + * @return - serialized bytes from subgraph + */ + @Override + public byte[] serialize(String topic, RyaSubGraph subgraph) { + return toBytes(subgraph); + } + + /** + * Closes serializer (no effect) + */ + @Override + public void close() { + } + + /** + * Configures serializer (no effect) + */ + @Override + public void configure(Map<String, ?> arg0, boolean arg1) { + } +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/60090ad5/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/RyaBindingSetExporter.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/RyaBindingSetExporter.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/RyaBindingSetExporter.java new file mode 100644 index 0000000..84d3ce6 --- /dev/null +++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/RyaBindingSetExporter.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.indexing.pcj.fluo.app.export.rya; + +import static com.google.common.base.Preconditions.checkNotNull; +import static java.util.Objects.requireNonNull; + +import java.util.Collections; + +import org.apache.fluo.api.client.TransactionBase; +import org.apache.rya.indexing.pcj.fluo.app.export.IncrementalBindingSetExporter; +import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns; +import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage; +import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage.PCJStorageException; +import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet; + +/** + * Incrementally exports SPARQL query results to Accumulo PCJ tables as they are defined by Rya. + */ +public class RyaBindingSetExporter implements IncrementalBindingSetExporter { + + private final PrecomputedJoinStorage pcjStorage; + + /** + * Constructs an instance of {@link RyaBindingSetExporter}. + * + * @param pcjStorage - The PCJ storage the new results will be exported to. (not null) + */ + public RyaBindingSetExporter(final PrecomputedJoinStorage pcjStorage) { + this.pcjStorage = checkNotNull(pcjStorage); + } + + @Override + public void export( + final TransactionBase fluoTx, + final String queryId, + final VisibilityBindingSet result) throws ResultExportException { + requireNonNull(fluoTx); + requireNonNull(queryId); + requireNonNull(result); + + // Look up the ID the PCJ represents within the PCJ Storage. + final String pcjId = fluoTx.gets(queryId, FluoQueryColumns.RYA_PCJ_ID); + + try { + pcjStorage.addResults(pcjId, Collections.singleton(result)); + } catch (final PCJStorageException e) { + throw new ResultExportException("A result could not be exported to Rya.", e); + } + } + + @Override + public void close() throws Exception { + pcjStorage.close(); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/60090ad5/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/RyaBindingSetExporterFactory.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/RyaBindingSetExporterFactory.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/RyaBindingSetExporterFactory.java new file mode 100644 index 0000000..86d593f --- /dev/null +++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/RyaBindingSetExporterFactory.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.indexing.pcj.fluo.app.export.rya; + +import static com.google.common.base.Preconditions.checkNotNull; + +import org.apache.accumulo.core.client.AccumuloException; +import org.apache.accumulo.core.client.AccumuloSecurityException; +import org.apache.accumulo.core.client.Connector; +import org.apache.accumulo.core.client.Instance; +import org.apache.accumulo.core.client.ZooKeeperInstance; +import org.apache.accumulo.core.client.security.tokens.PasswordToken; +import org.apache.rya.indexing.pcj.fluo.app.export.IncrementalBindingSetExporter; +import org.apache.rya.indexing.pcj.fluo.app.export.IncrementalBindingSetExporterFactory; +import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage; +import org.apache.rya.indexing.pcj.storage.accumulo.AccumuloPcjStorage; + +import com.google.common.base.Optional; + +import org.apache.fluo.api.observer.Observer.Context; + +/** + * Creates instances of {@link RyaBindingSetExporter}. + */ +public class RyaBindingSetExporterFactory implements IncrementalBindingSetExporterFactory { + + @Override + public Optional<IncrementalBindingSetExporter> build(final Context context) throws IncrementalExporterFactoryException, ConfigurationException { + checkNotNull(context); + + // Wrap the context's parameters for parsing. + final RyaExportParameters params = new RyaExportParameters( context.getObserverConfiguration().toMap() ); + + if(params.isExportToRya()) { + // Setup Zookeeper connection info. + final String accumuloInstance = params.getAccumuloInstanceName().get(); + final String zookeeperServers = params.getZookeeperServers().get().replaceAll(";", ","); + final Instance inst = new ZooKeeperInstance(accumuloInstance, zookeeperServers); + + try { + // Setup Accumulo connection info. + final String exporterUsername = params.getExporterUsername().get(); + final String exporterPassword = params.getExporterPassword().get(); + final Connector accumuloConn = inst.getConnector(exporterUsername, new PasswordToken(exporterPassword)); + + // Setup Rya PCJ Storage. + final String ryaInstanceName = params.getRyaInstanceName().get(); + final PrecomputedJoinStorage pcjStorage = new AccumuloPcjStorage(accumuloConn, ryaInstanceName); + + // Make the exporter. + final IncrementalBindingSetExporter exporter = new RyaBindingSetExporter(pcjStorage); + return Optional.of(exporter); + + } catch (final AccumuloException | AccumuloSecurityException e) { + throw new IncrementalExporterFactoryException("Could not initialize the Accumulo connector using the provided configuration.", e); + } + } else { + return Optional.absent(); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/60090ad5/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/RyaExportParameters.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/RyaExportParameters.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/RyaExportParameters.java index cba6a43..a1ba5b8 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/RyaExportParameters.java +++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/RyaExportParameters.java @@ -45,6 +45,7 @@ public class RyaExportParameters extends ParametersBase { public static final String CONF_EXPORTER_PASSWORD = "pcj.fluo.export.rya.exporterPassword"; public static final String CONF_RYA_INSTANCE_NAME = "pcj.fluo.export.rya.ryaInstanceName"; + public static final String CONF_FLUO_APP_NAME = "pcj.fluo.export.rya.fluo.application.name"; /** * Constructs an instance of {@link RyaExportParameters}. @@ -147,4 +148,18 @@ public class RyaExportParameters extends ParametersBase { public Optional<String> getExporterPassword() { return Optional.fromNullable( params.get(CONF_EXPORTER_PASSWORD) ); } + + /** + * @param fluoApplicationName - The name of the Rya Fluo application + */ + public void setFluoApplicationName(@Nullable final String fluoApplicationName) { + params.put(CONF_FLUO_APP_NAME, fluoApplicationName); + } + + /** + * @return The name of the Rya Fluo application + */ + public Optional<String> getFluoApplicationName() { + return Optional.fromNullable(params.get(CONF_FLUO_APP_NAME)); + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/60090ad5/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/RyaResultExporter.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/RyaResultExporter.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/RyaResultExporter.java deleted file mode 100644 index b8b3c45..0000000 --- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/RyaResultExporter.java +++ /dev/null @@ -1,72 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.rya.indexing.pcj.fluo.app.export.rya; - -import static com.google.common.base.Preconditions.checkNotNull; -import static java.util.Objects.requireNonNull; - -import java.util.Collections; - -import org.apache.fluo.api.client.TransactionBase; -import org.apache.rya.indexing.pcj.fluo.app.export.IncrementalResultExporter; -import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns; -import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage; -import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage.PCJStorageException; -import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet; - -/** - * Incrementally exports SPARQL query results to Accumulo PCJ tables as they are defined by Rya. - */ -public class RyaResultExporter implements IncrementalResultExporter { - - private final PrecomputedJoinStorage pcjStorage; - - /** - * Constructs an instance of {@link RyaResultExporter}. - * - * @param pcjStorage - The PCJ storage the new results will be exported to. (not null) - */ - public RyaResultExporter(final PrecomputedJoinStorage pcjStorage) { - this.pcjStorage = checkNotNull(pcjStorage); - } - - @Override - public void export( - final TransactionBase fluoTx, - final String queryId, - final VisibilityBindingSet result) throws ResultExportException { - requireNonNull(fluoTx); - requireNonNull(queryId); - requireNonNull(result); - - // Look up the ID the PCJ represents within the PCJ Storage. - final String pcjId = fluoTx.gets(queryId, FluoQueryColumns.RYA_PCJ_ID); - - try { - pcjStorage.addResults(pcjId, Collections.singleton(result)); - } catch (final PCJStorageException e) { - throw new ResultExportException("A result could not be exported to Rya.", e); - } - } - - @Override - public void close() throws Exception { - pcjStorage.close(); - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/60090ad5/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/RyaResultExporterFactory.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/RyaResultExporterFactory.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/RyaResultExporterFactory.java deleted file mode 100644 index c695272..0000000 --- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/RyaResultExporterFactory.java +++ /dev/null @@ -1,77 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.rya.indexing.pcj.fluo.app.export.rya; - -import static com.google.common.base.Preconditions.checkNotNull; - -import org.apache.accumulo.core.client.AccumuloException; -import org.apache.accumulo.core.client.AccumuloSecurityException; -import org.apache.accumulo.core.client.Connector; -import org.apache.accumulo.core.client.Instance; -import org.apache.accumulo.core.client.ZooKeeperInstance; -import org.apache.accumulo.core.client.security.tokens.PasswordToken; -import org.apache.rya.indexing.pcj.fluo.app.export.IncrementalResultExporter; -import org.apache.rya.indexing.pcj.fluo.app.export.IncrementalResultExporterFactory; -import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage; -import org.apache.rya.indexing.pcj.storage.accumulo.AccumuloPcjStorage; - -import com.google.common.base.Optional; - -import org.apache.fluo.api.observer.Observer.Context; - -/** - * Creates instances of {@link RyaResultExporter}. - */ -public class RyaResultExporterFactory implements IncrementalResultExporterFactory { - - @Override - public Optional<IncrementalResultExporter> build(final Context context) throws IncrementalExporterFactoryException, ConfigurationException { - checkNotNull(context); - - // Wrap the context's parameters for parsing. - final RyaExportParameters params = new RyaExportParameters( context.getObserverConfiguration().toMap() ); - - if(params.isExportToRya()) { - // Setup Zookeeper connection info. - final String accumuloInstance = params.getAccumuloInstanceName().get(); - final String zookeeperServers = params.getZookeeperServers().get().replaceAll(";", ","); - final Instance inst = new ZooKeeperInstance(accumuloInstance, zookeeperServers); - - try { - // Setup Accumulo connection info. - final String exporterUsername = params.getExporterUsername().get(); - final String exporterPassword = params.getExporterPassword().get(); - final Connector accumuloConn = inst.getConnector(exporterUsername, new PasswordToken(exporterPassword)); - - // Setup Rya PCJ Storage. - final String ryaInstanceName = params.getRyaInstanceName().get(); - final PrecomputedJoinStorage pcjStorage = new AccumuloPcjStorage(accumuloConn, ryaInstanceName); - - // Make the exporter. - final IncrementalResultExporter exporter = new RyaResultExporter(pcjStorage); - return Optional.of(exporter); - - } catch (final AccumuloException | AccumuloSecurityException e) { - throw new IncrementalExporterFactoryException("Could not initialize the Accumulo connector using the provided configuration.", e); - } - } else { - return Optional.absent(); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/60090ad5/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/BindingSetUpdater.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/BindingSetUpdater.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/BindingSetUpdater.java index dc4b3b4..ac131e3 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/BindingSetUpdater.java +++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/BindingSetUpdater.java @@ -26,11 +26,13 @@ import org.apache.fluo.api.data.Column; import org.apache.fluo.api.observer.AbstractObserver; import org.apache.log4j.Logger; import org.apache.rya.indexing.pcj.fluo.app.AggregationResultUpdater; +import org.apache.rya.indexing.pcj.fluo.app.ConstructQueryResultUpdater; import org.apache.rya.indexing.pcj.fluo.app.FilterResultUpdater; import org.apache.rya.indexing.pcj.fluo.app.JoinResultUpdater; import org.apache.rya.indexing.pcj.fluo.app.NodeType; import org.apache.rya.indexing.pcj.fluo.app.QueryResultUpdater; import org.apache.rya.indexing.pcj.fluo.app.query.AggregationMetadata; +import org.apache.rya.indexing.pcj.fluo.app.query.ConstructQueryMetadata; import org.apache.rya.indexing.pcj.fluo.app.query.FilterMetadata; import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryMetadataDAO; import org.apache.rya.indexing.pcj.fluo.app.query.JoinMetadata; @@ -57,6 +59,7 @@ public abstract class BindingSetUpdater extends AbstractObserver { private final FilterResultUpdater filterUpdater = new FilterResultUpdater(); private final QueryResultUpdater queryUpdater = new QueryResultUpdater(); private final AggregationResultUpdater aggregationUpdater = new AggregationResultUpdater(); + private final ConstructQueryResultUpdater constructUpdater = new ConstructQueryResultUpdater(); @Override public abstract ObservedColumn getObservedColumn(); @@ -102,6 +105,15 @@ public abstract class BindingSetUpdater extends AbstractObserver { } break; + case CONSTRUCT: + final ConstructQueryMetadata constructQuery = queryDao.readConstructQueryMetadata(tx, parentNodeId); + try{ + constructUpdater.updateConstructQueryResults(tx, observedBindingSet, constructQuery); + } catch (final Exception e) { + throw new RuntimeException("Could not process a Query node.", e); + } + break; + case FILTER: final FilterMetadata parentFilter = queryDao.readFilterMetadata(tx, parentNodeId); try { http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/60090ad5/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/ConstructQueryResultObserver.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/ConstructQueryResultObserver.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/ConstructQueryResultObserver.java new file mode 100644 index 0000000..f0fef07 --- /dev/null +++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/ConstructQueryResultObserver.java @@ -0,0 +1,198 @@ +package org.apache.rya.indexing.pcj.fluo.app.observers; +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +import static com.google.common.base.Preconditions.checkNotNull; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +import java.io.UnsupportedEncodingException; +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; +import java.util.Set; + +import org.apache.fluo.api.client.TransactionBase; +import org.apache.fluo.api.data.Bytes; +import org.apache.fluo.api.data.Column; +import org.apache.fluo.api.observer.AbstractObserver; +import org.apache.log4j.Logger; +import org.apache.rya.accumulo.utils.VisibilitySimplifier; +import org.apache.rya.api.RdfCloudTripleStoreConstants.TABLE_LAYOUT; +import org.apache.rya.api.domain.RyaStatement; +import org.apache.rya.api.domain.RyaSubGraph; +import org.apache.rya.api.resolver.triple.TripleRow; +import org.apache.rya.api.resolver.triple.TripleRowResolverException; +import org.apache.rya.api.resolver.triple.impl.WholeRowTripleResolver; +import org.apache.rya.indexing.pcj.fluo.app.export.IncrementalBindingSetExporterFactory.IncrementalExporterFactoryException; +import org.apache.rya.indexing.pcj.fluo.app.export.IncrementalRyaSubGraphExporter; +import org.apache.rya.indexing.pcj.fluo.app.export.IncrementalRyaSubGraphExporterFactory; +import org.apache.rya.indexing.pcj.fluo.app.export.kafka.KafkaRyaSubGraphExporterFactory; +import org.apache.rya.indexing.pcj.fluo.app.export.kafka.RyaSubGraphKafkaSerDe; +import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns; + +import com.google.common.base.Optional; +import com.google.common.collect.ImmutableSet; + +/** + * Monitors the Column {@link FluoQueryColumns#CONSTRUCT_STATEMENTS} for new + * Construct Query {@link RyaStatement}s and exports the results using the + * {@link IncrementalRyaSubGraphExporter}s that are registered with this + * Observer. + * + */ +public class ConstructQueryResultObserver extends AbstractObserver { + + private static final WholeRowTripleResolver TRIPLE_RESOLVER = new WholeRowTripleResolver(); + private static final Logger log = Logger.getLogger(ConstructQueryResultObserver.class); + private static final RyaSubGraphKafkaSerDe serializer = new RyaSubGraphKafkaSerDe(); + + /** + * We expect to see the same expressions a lot, so we cache the simplified + * forms. + */ + private final Map<String, String> simplifiedVisibilities = new HashMap<>(); + + /** + * Builders for each type of result exporter we support. + */ + private static final ImmutableSet<IncrementalRyaSubGraphExporterFactory> factories = ImmutableSet + .<IncrementalRyaSubGraphExporterFactory> builder().add(new KafkaRyaSubGraphExporterFactory()).build(); + + /** + * The exporters that are configured. + */ + private ImmutableSet<IncrementalRyaSubGraphExporter> exporters = null; + + /** + * Before running, determine which exporters are configured and set them up. + */ + @Override + public void init(final Context context) { + final ImmutableSet.Builder<IncrementalRyaSubGraphExporter> exportersBuilder = ImmutableSet.builder(); + + for (final IncrementalRyaSubGraphExporterFactory builder : factories) { + try { + log.debug("ConstructQueryResultObserver.init(): for each exportersBuilder=" + builder); + + final Optional<IncrementalRyaSubGraphExporter> exporter = builder.build(context); + if (exporter.isPresent()) { + exportersBuilder.add(exporter.get()); + } + } catch (final IncrementalExporterFactoryException e) { + log.error("Could not initialize a result exporter.", e); + } + } + + exporters = exportersBuilder.build(); + } + + @Override + public ObservedColumn getObservedColumn() { + return new ObservedColumn(FluoQueryColumns.CONSTRUCT_STATEMENTS, NotificationType.STRONG); + } + + @Override + public void process(TransactionBase tx, Bytes row, Column col) throws Exception { + Bytes bytes = tx.get(row, col); + RyaSubGraph subgraph = serializer.fromBytes(bytes.toArray()); + Set<RyaStatement> statements = subgraph.getStatements(); + if (statements.size() > 0) { + byte[] visibility = statements.iterator().next().getColumnVisibility(); + visibility = simplifyVisibilities(visibility); + for(RyaStatement statement: statements) { + statement.setColumnVisibility(visibility); + } + subgraph.setStatements(statements); + + for (IncrementalRyaSubGraphExporter exporter : exporters) { + exporter.export(row.toString(), subgraph); + } + } + //add generated triples back into Fluo for chaining queries together + insertTriples(tx, subgraph.getStatements()); + } + + @Override + public void close() { + if(exporters != null) { + for(final IncrementalRyaSubGraphExporter exporter : exporters) { + try { + exporter.close(); + } catch(final Exception e) { + log.warn("Problem encountered while closing one of the exporters.", e); + } + } + } + } + + private byte[] simplifyVisibilities(byte[] visibilityBytes) throws UnsupportedEncodingException { + // Simplify the result's visibilities and cache new simplified + // visibilities + String visibility = new String(visibilityBytes, "UTF-8"); + if (!simplifiedVisibilities.containsKey(visibility)) { + String simplified = VisibilitySimplifier.simplify(visibility); + simplifiedVisibilities.put(visibility, simplified); + } + return simplifiedVisibilities.get(visibility).getBytes("UTF-8"); + } + + private void insertTriples(TransactionBase tx, final Collection<RyaStatement> triples) { + + for (final RyaStatement triple : triples) { + Optional<byte[]> visibility = Optional.fromNullable(triple.getColumnVisibility()); + try { + tx.set(Bytes.of(spoFormat(triple)), FluoQueryColumns.TRIPLES, Bytes.of(visibility.or(new byte[0]))); + } catch (final TripleRowResolverException e) { + log.error("Could not convert a Triple into the SPO format: " + triple); + } + } + } + + + /** + * Converts a triple into a byte[] holding the Rya SPO representation of it. + * + * @param triple - The triple to convert. (not null) + * @return The Rya SPO representation of the triple. + * @throws TripleRowResolverException The triple could not be converted. + */ + public static byte[] spoFormat(final RyaStatement triple) throws TripleRowResolverException { + checkNotNull(triple); + final Map<TABLE_LAYOUT, TripleRow> serialized = TRIPLE_RESOLVER.serialize(triple); + final TripleRow spoRow = serialized.get(TABLE_LAYOUT.SPO); + return spoRow.getRow(); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/60090ad5/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/QueryResultObserver.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/QueryResultObserver.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/QueryResultObserver.java index 28c92af..b675ba7 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/QueryResultObserver.java +++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/QueryResultObserver.java @@ -30,12 +30,12 @@ import org.apache.fluo.api.observer.AbstractObserver; import org.apache.log4j.Logger; import org.apache.rya.accumulo.utils.VisibilitySimplifier; import org.apache.rya.indexing.pcj.fluo.app.VisibilityBindingSetSerDe; -import org.apache.rya.indexing.pcj.fluo.app.export.IncrementalResultExporter; -import org.apache.rya.indexing.pcj.fluo.app.export.IncrementalResultExporter.ResultExportException; -import org.apache.rya.indexing.pcj.fluo.app.export.IncrementalResultExporterFactory; -import org.apache.rya.indexing.pcj.fluo.app.export.IncrementalResultExporterFactory.IncrementalExporterFactoryException; -import org.apache.rya.indexing.pcj.fluo.app.export.kafka.KafkaResultExporterFactory; -import org.apache.rya.indexing.pcj.fluo.app.export.rya.RyaResultExporterFactory; +import org.apache.rya.indexing.pcj.fluo.app.export.IncrementalBindingSetExporter; +import org.apache.rya.indexing.pcj.fluo.app.export.IncrementalBindingSetExporter.ResultExportException; +import org.apache.rya.indexing.pcj.fluo.app.export.IncrementalBindingSetExporterFactory; +import org.apache.rya.indexing.pcj.fluo.app.export.IncrementalBindingSetExporterFactory.IncrementalExporterFactoryException; +import org.apache.rya.indexing.pcj.fluo.app.export.kafka.KafkaBindingSetExporterFactory; +import org.apache.rya.indexing.pcj.fluo.app.export.rya.RyaBindingSetExporterFactory; import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns; import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet; @@ -58,16 +58,16 @@ public class QueryResultObserver extends AbstractObserver { /** * Builders for each type of result exporter we support. */ - private static final ImmutableSet<IncrementalResultExporterFactory> factories = - ImmutableSet.<IncrementalResultExporterFactory>builder() - .add(new RyaResultExporterFactory()) - .add(new KafkaResultExporterFactory()) + private static final ImmutableSet<IncrementalBindingSetExporterFactory> factories = + ImmutableSet.<IncrementalBindingSetExporterFactory>builder() + .add(new RyaBindingSetExporterFactory()) + .add(new KafkaBindingSetExporterFactory()) .build(); /** * The exporters that are configured. */ - private ImmutableSet<IncrementalResultExporter> exporters = null; + private ImmutableSet<IncrementalBindingSetExporter> exporters = null; @Override public ObservedColumn getObservedColumn() { @@ -79,13 +79,13 @@ public class QueryResultObserver extends AbstractObserver { */ @Override public void init(final Context context) { - final ImmutableSet.Builder<IncrementalResultExporter> exportersBuilder = ImmutableSet.builder(); - - for(final IncrementalResultExporterFactory builder : factories) { - log.debug("QueryResultObserver.init(): for each exportersBuilder=" + builder); + final ImmutableSet.Builder<IncrementalBindingSetExporter> exportersBuilder = ImmutableSet.builder(); + for(final IncrementalBindingSetExporterFactory builder : factories) { try { - final Optional<IncrementalResultExporter> exporter = builder.build(context); + log.debug("QueryResultObserver.init(): for each exportersBuilder=" + builder); + + final Optional<IncrementalBindingSetExporter> exporter = builder.build(context); if(exporter.isPresent()) { exportersBuilder.add(exporter.get()); } @@ -117,7 +117,7 @@ public class QueryResultObserver extends AbstractObserver { result.setVisibility( simplifiedVisibilities.get(visibility) ); // Export the result using each of the provided exporters. - for(final IncrementalResultExporter exporter : exporters) { + for(final IncrementalBindingSetExporter exporter : exporters) { try { exporter.export(tx, queryId, result); } catch (final ResultExportException e) { @@ -129,7 +129,7 @@ public class QueryResultObserver extends AbstractObserver { @Override public void close() { if(exporters != null) { - for(final IncrementalResultExporter exporter : exporters) { + for(final IncrementalBindingSetExporter exporter : exporters) { try { exporter.close(); } catch(final Exception e) { http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/60090ad5/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/ConstructQueryMetadata.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/ConstructQueryMetadata.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/ConstructQueryMetadata.java new file mode 100644 index 0000000..e836c5d --- /dev/null +++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/ConstructQueryMetadata.java @@ -0,0 +1,192 @@ +package org.apache.rya.indexing.pcj.fluo.app.query; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +import org.apache.commons.lang3.builder.EqualsBuilder; +import org.apache.rya.indexing.pcj.fluo.app.ConstructGraph; +import org.apache.rya.indexing.pcj.storage.accumulo.VariableOrder; +import org.openrdf.query.BindingSet; + +import com.google.common.base.Objects; +import com.google.common.base.Preconditions; + +import edu.umd.cs.findbugs.annotations.DefaultAnnotation; +import edu.umd.cs.findbugs.annotations.NonNull; + +/** + * Metadata object used to store metadata for Construct Query Nodes found in + * SPARQL queries. + * + */ +public class ConstructQueryMetadata extends CommonNodeMetadata { + + private String childNodeId; + private ConstructGraph graph; + private String sparql; + + /** + * Creates ConstructQueryMetadata object from the provided metadata arguments. + * @param nodeId - id for the ConstructQueryNode + * @param childNodeId - id for the child of the ConstructQueryNode + * @param graph - {@link ConstructGraph} used to project {@link BindingSet}s onto sets of statement representing construct graph + * @param sparql - SPARQL query containing construct graph + */ + public ConstructQueryMetadata(String nodeId, String childNodeId, ConstructGraph graph, String sparql) { + super(nodeId, new VariableOrder("subject", "predicate", "object")); + Preconditions.checkNotNull(childNodeId); + Preconditions.checkNotNull(graph); + Preconditions.checkNotNull(sparql); + this.childNodeId = childNodeId; + this.graph = graph; + this.sparql = sparql; + } + + /** + * @return sparql query string representing this construct query + */ + public String getSparql() { + return sparql; + } + + /** + * @return The node whose results are projected onto the given + * {@link ConstructGraph}. + */ + public String getChildNodeId() { + return childNodeId; + } + + /** + * @return The ConstructGraph used to form statement {@link BindingSet}s for + * this Construct Query + */ + public ConstructGraph getConstructGraph() { + return graph; + } + + @Override + public int hashCode() { + return Objects.hashCode(super.getNodeId(), super.getVariableOrder(), childNodeId, graph, sparql); + } + + @Override + public boolean equals(final Object o) { + if (this == o) { + return true; + } + + if (o instanceof ConstructQueryMetadata) { + ConstructQueryMetadata queryMetadata = (ConstructQueryMetadata) o; + if (super.equals(queryMetadata)) { + return new EqualsBuilder().append(childNodeId, queryMetadata.childNodeId).append(graph, queryMetadata.graph) + .append(sparql, queryMetadata.sparql).isEquals(); + } + return false; + } + return false; + } + + @Override + public String toString() { + return new StringBuilder().append("Construct Query Metadata {\n").append(" Node ID: " + super.getNodeId() + "\n") + .append(" SPARQL QUERY: " + sparql + "\n").append(" Variable Order: " + super.getVariableOrder() + "\n") + .append(" Child Node ID: " + childNodeId + "\n").append(" Construct Graph: " + graph.getProjections() + "\n") + .append("}").toString(); + } + + /** + * Creates a new {@link Builder} for this class. + * + * @return A new {@link Builder} for this class. + */ + public static Builder builder() { + return new Builder(); + } + + /** + * Builds instances of {@link QueryMetadata}. + */ + @DefaultAnnotation(NonNull.class) + public static final class Builder { + + + private String nodeId; + private ConstructGraph graph; + private String childNodeId; + private String sparql; + + /** + * Set the node Id that identifies this Construct Query Node + * + * @param nodeId + * id for this node + * @return This builder so that method invocations may be chained. + */ + public Builder setNodeId(String nodeId) { + this.nodeId = nodeId; + return this; + } + + /** + * Set the SPARQL String representing this construct query + * @param SPARQL string representing this construct query + */ + public Builder setSparql(String sparql) { + this.sparql = sparql; + return this; + } + + /** + * Set the ConstructGraph used to form statement {@link BindingSet}s for + * this Construct Query + * + * @param varOrder + * - ConstructGraph to project {@link BindingSet}s onto RDF + * statements + * @return This builder so that method invocations may be chained. + */ + public Builder setConstructGraph(ConstructGraph graph) { + this.graph = graph; + return this; + } + + /** + * Set the node whose results are projected onto the given + * {@link ConstructGraph}. + * + * @param childNodeId + * - The node whose results are projected onto the given + * {@link ConstructGraph}. + * @return This builder so that method invocations may be chained. + */ + public Builder setChildNodeId(String childNodeId) { + this.childNodeId = childNodeId; + return this; + } + + /** + * @return An instance of {@link ConstructQueryMetadata} build using + * this builder's values. + */ + public ConstructQueryMetadata build() { + return new ConstructQueryMetadata(nodeId, childNodeId, graph, sparql); + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/60090ad5/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQuery.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQuery.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQuery.java index 3230a5d..a701052 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQuery.java +++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQuery.java @@ -29,6 +29,7 @@ import org.apache.commons.lang3.builder.EqualsBuilder; import com.google.common.base.Objects; import com.google.common.base.Optional; +import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableMap; import edu.umd.cs.findbugs.annotations.DefaultAnnotation; @@ -43,11 +44,14 @@ import net.jcip.annotations.Immutable; @DefaultAnnotation(NonNull.class) public class FluoQuery { - private final QueryMetadata queryMetadata; + private final Optional<QueryMetadata> queryMetadata; + private final Optional<ConstructQueryMetadata> constructMetadata; private final ImmutableMap<String, StatementPatternMetadata> statementPatternMetadata; private final ImmutableMap<String, FilterMetadata> filterMetadata; private final ImmutableMap<String, JoinMetadata> joinMetadata; private final ImmutableMap<String, AggregationMetadata> aggregationMetadata; + private final QueryType type; + public static enum QueryType {Projection, Construct}; /** * Constructs an instance of {@link FluoQuery}. Private because applications @@ -67,21 +71,65 @@ public class FluoQuery { final QueryMetadata queryMetadata, final ImmutableMap<String, StatementPatternMetadata> statementPatternMetadata, final ImmutableMap<String, FilterMetadata> filterMetadata, + final ImmutableMap<String, JoinMetadata> joinMetadata, + final ImmutableMap<String, AggregationMetadata> aggregationMetadata) { + this.aggregationMetadata = requireNonNull(aggregationMetadata); + this.queryMetadata = Optional.of(requireNonNull(queryMetadata)); + this.constructMetadata = Optional.absent(); + this.statementPatternMetadata = requireNonNull(statementPatternMetadata); + this.filterMetadata = requireNonNull(filterMetadata); + this.joinMetadata = requireNonNull(joinMetadata); + this.type = QueryType.Projection; + } + + + /** + * Constructs an instance of {@link FluoQuery}. Private because applications + * must use {@link Builder} instead. + * + * @param constructMetadata - The root node of a query that is updated in Fluo. (not null) + * @param statementPatternMetadata - A map from Node ID to Statement Pattern metadata as + * it is represented within the Fluo app. (not null) + * @param filterMetadata A map from Node ID to Filter metadata as it is represented + * within the Fluo app. (not null) + * @param joinMetadata A map from Node ID to Join metadata as it is represented + * within the Fluo app. (not null) + * @param aggregationMetadata - A map from Node ID to Aggregation metadata as it is + * represented within the Fluo app. (not null) + */ + private FluoQuery( + final ConstructQueryMetadata constructMetadata, + final ImmutableMap<String, StatementPatternMetadata> statementPatternMetadata, + final ImmutableMap<String, FilterMetadata> filterMetadata, final ImmutableMap<String, JoinMetadata> joinMetadata, final ImmutableMap<String, AggregationMetadata> aggregationMetadata) { - this.queryMetadata = requireNonNull(queryMetadata); + this.constructMetadata = Optional.of(requireNonNull(constructMetadata)); + this.queryMetadata = Optional.absent(); this.statementPatternMetadata = requireNonNull(statementPatternMetadata); this.filterMetadata = requireNonNull(filterMetadata); this.joinMetadata = requireNonNull(joinMetadata); - this.aggregationMetadata = requireNonNull(aggregationMetadata); + this.aggregationMetadata = aggregationMetadata; + this.type = QueryType.Construct; + } + + /** + * Returns the {@link QueryType} of this query + * @return the QueryType of this query (either Construct or Projection} + */ + public QueryType getQueryType() { + return type; } /** * @return Metadata about the root node of a query that is updated within the Fluo app. */ - public QueryMetadata getQueryMetadata() { + public Optional<QueryMetadata> getQueryMetadata() { return queryMetadata; } + + public Optional<ConstructQueryMetadata> getConstructQueryMetadata() { + return constructMetadata; + } /** * Get a Statement Pattern node's metadata. @@ -175,6 +223,7 @@ public class FluoQuery { final FluoQuery fluoQuery = (FluoQuery)o; return new EqualsBuilder() .append(queryMetadata, fluoQuery.queryMetadata) + .append(constructMetadata, fluoQuery.constructMetadata) .append(statementPatternMetadata, fluoQuery.statementPatternMetadata) .append(filterMetadata, fluoQuery.filterMetadata) .append(joinMetadata, fluoQuery.joinMetadata) @@ -189,8 +238,13 @@ public class FluoQuery { public String toString() { final StringBuilder builder = new StringBuilder(); - if(queryMetadata != null) { - builder.append( queryMetadata.toString() ); + if(queryMetadata.isPresent()) { + builder.append( queryMetadata.get().toString() ); + builder.append("\n"); + } + + if(constructMetadata.isPresent()) { + builder.append( constructMetadata.get().toString() ); builder.append("\n"); } @@ -231,6 +285,7 @@ public class FluoQuery { public static final class Builder { private QueryMetadata.Builder queryBuilder = null; + private ConstructQueryMetadata.Builder constructBuilder = null; private final Map<String, StatementPatternMetadata.Builder> spBuilders = new HashMap<>(); private final Map<String, FilterMetadata.Builder> filterBuilders = new HashMap<>(); private final Map<String, JoinMetadata.Builder> joinBuilders = new HashMap<>(); @@ -239,11 +294,11 @@ public class FluoQuery { /** * Sets the {@link QueryMetadata.Builder} that is used by this builder. * - * @param queryMetadata - The builder representing the query's results. + * @param queryBuilder - The builder representing the query's results. * @return This builder so that method invocation may be chained. */ - public Builder setQueryMetadata(@Nullable final QueryMetadata.Builder queryMetadata) { - this.queryBuilder = queryMetadata; + public Builder setQueryMetadata(@Nullable final QueryMetadata.Builder queryBuilder) { + this.queryBuilder = queryBuilder; return this; } @@ -253,6 +308,26 @@ public class FluoQuery { public Optional<QueryMetadata.Builder> getQueryBuilder() { return Optional.fromNullable( queryBuilder ); } + + /** + * Sets the {@link ConstructQueryMetadata.Builder} that is used by this builder. + * + * @param constructBuilder + * - The builder representing the query's results. + * @return This builder so that method invocation may be chained. + */ + public Builder setConstructQueryMetadata(@Nullable final ConstructQueryMetadata.Builder constructBuilder) { + this.constructBuilder = constructBuilder; + return this; + } + + /** + * @return The Construct Query metadata builder if one has been set. + */ + public Optional<ConstructQueryMetadata.Builder> getConstructQueryBuilder() { + return Optional.fromNullable( constructBuilder ); + } + /** * Adds a new {@link StatementPatternMetadata.Builder} to this builder. @@ -345,12 +420,14 @@ public class FluoQuery { requireNonNull(nodeId); return Optional.fromNullable( joinBuilders.get(nodeId) ); } + /** * @return Creates a {@link FluoQuery} using the values that have been supplied to this builder. */ public FluoQuery build() { - final QueryMetadata queryMetadata = queryBuilder.build(); + Preconditions.checkArgument( + (queryBuilder != null && constructBuilder == null) || (queryBuilder == null && constructBuilder != null)); final ImmutableMap.Builder<String, StatementPatternMetadata> spMetadata = ImmutableMap.builder(); for(final Entry<String, StatementPatternMetadata.Builder> entry : spBuilders.entrySet()) { @@ -372,7 +449,14 @@ public class FluoQuery { aggregateMetadata.put(entry.getKey(), entry.getValue().build()); } - return new FluoQuery(queryMetadata, spMetadata.build(), filterMetadata.build(), joinMetadata.build(), aggregateMetadata.build()); + if(queryBuilder != null) { + return new FluoQuery(queryBuilder.build(), spMetadata.build(), filterMetadata.build(), joinMetadata.build(), aggregateMetadata.build()); + } + //constructBuilder non-null in this case, but no need to check + else { + return new FluoQuery(constructBuilder.build(), spMetadata.build(), filterMetadata.build(), joinMetadata.build(), aggregateMetadata.build()); + } + } } } \ No newline at end of file
