http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/05147266/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/kafka/KafkaBindingSetExporterFactory.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/kafka/KafkaBindingSetExporterFactory.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/kafka/KafkaBindingSetExporterFactory.java index 5507037..b796a6f 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/kafka/KafkaBindingSetExporterFactory.java +++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/kafka/KafkaBindingSetExporterFactory.java @@ -23,7 +23,8 @@ import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.log4j.Logger; import org.apache.rya.indexing.pcj.fluo.app.export.IncrementalBindingSetExporter; -import org.apache.rya.indexing.pcj.fluo.app.export.IncrementalBindingSetExporterFactory; +import org.apache.rya.indexing.pcj.fluo.app.export.IncrementalResultExporter; +import org.apache.rya.indexing.pcj.fluo.app.export.IncrementalResultExporterFactory; import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet; import com.google.common.base.Optional; @@ -44,13 +45,13 @@ import com.google.common.base.Optional; * * @see ProducerConfig */ -public class KafkaBindingSetExporterFactory implements IncrementalBindingSetExporterFactory { +public class KafkaBindingSetExporterFactory implements IncrementalResultExporterFactory { private static final Logger log = Logger.getLogger(KafkaBindingSetExporterFactory.class); @Override - public Optional<IncrementalBindingSetExporter> build(Context context) throws IncrementalExporterFactoryException, ConfigurationException { - final KafkaExportParameters exportParams = new KafkaExportParameters(context.getObserverConfiguration().toMap()); - log.debug("KafkaResultExporterFactory.build(): params.isExportToKafka()=" + exportParams.isExportToKafka()); - if (exportParams.isExportToKafka()) { + public Optional<IncrementalResultExporter> build(Context context) throws IncrementalExporterFactoryException, ConfigurationException { + final KafkaBindingSetExporterParameters exportParams = new KafkaBindingSetExporterParameters(context.getObserverConfiguration().toMap()); + log.debug("KafkaResultExporterFactory.build(): params.isExportToKafka()=" + exportParams.getUseKafkaBindingSetExporter()); + if (exportParams.getUseKafkaBindingSetExporter()) { // Setup Kafka connection KafkaProducer<String, VisibilityBindingSet> producer = new KafkaProducer<String, VisibilityBindingSet>(exportParams.listAllConfig()); // Create the exporter
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/05147266/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/kafka/KafkaBindingSetExporterParameters.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/kafka/KafkaBindingSetExporterParameters.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/kafka/KafkaBindingSetExporterParameters.java new file mode 100644 index 0000000..4550a50 --- /dev/null +++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/kafka/KafkaBindingSetExporterParameters.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.indexing.pcj.fluo.app.export.kafka; + +import java.util.Map; +import java.util.Properties; + +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.common.serialization.StringSerializer; + +import com.google.common.base.Preconditions; + + +public class KafkaBindingSetExporterParameters extends KafkaExportParameterBase { + + public static final String CONF_USE_KAFKA_BINDING_SET_EXPORTER = "pcj.fluo.export.kafka.bindingset.enabled"; + public static final String CONF_KAFKA_BINDING_SET_SERIALIZER = "pcj.fluo.export.kafka.bindingset.serializer"; + + public KafkaBindingSetExporterParameters(final Map<String, String> params) { + super(params); + } + + /** + * Instructs the Fluo application to use the Kafka Binding Set Exporter + * and sets the appropriate Key/Value Serializer parameters for writing BindingSets to Kafka. + * @param useExporter + * - {@code True} if the Fluo application should use the + * {@link KafkaBindingSetExporter}; otherwise {@code false}. + */ + public void setUseKafkaBindingSetExporter(final boolean useExporter) { + setBoolean(params, CONF_USE_KAFKA_BINDING_SET_EXPORTER, useExporter); + } + + /** + * @return {@code True} if the Fluo application should use the {@link KafkaBindingSetExporter}; otherwise + * {@code false}. Defaults to {@code false} if no value is present. + */ + public boolean getUseKafkaBindingSetExporter() { + return getBoolean(params, CONF_USE_KAFKA_BINDING_SET_EXPORTER, false); + } + + /** + * + * @param serializer - Used for Serializing BindingSets pushed to Kafka + */ + public void setKafkaBindingSetSerializer(String serializer) { + params.put(CONF_KAFKA_BINDING_SET_SERIALIZER, Preconditions.checkNotNull(serializer)); + } + + /** + * @return - Serializer used for Serializing BindingSets to Kafka + */ + public String getKafkaBindingSetSerializer() { + return params.getOrDefault(CONF_KAFKA_BINDING_SET_SERIALIZER, KryoVisibilityBindingSetSerializer.class.getName()); + } + + @Override + public Properties listAllConfig() { + Properties props = super.listAllConfig(); + props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); + props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, getKafkaBindingSetSerializer()); + return props; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/05147266/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/kafka/KafkaExportParameterBase.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/kafka/KafkaExportParameterBase.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/kafka/KafkaExportParameterBase.java new file mode 100644 index 0000000..aab3929 --- /dev/null +++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/kafka/KafkaExportParameterBase.java @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.indexing.pcj.fluo.app.export.kafka; + +import java.util.Map; +import java.util.Optional; +import java.util.Properties; + +import org.apache.fluo.api.observer.Observer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.rya.indexing.pcj.fluo.app.export.ParametersBase; + +import jline.internal.Preconditions; + +/** + * Provides read/write functions to the parameters map that is passed into an + * {@link Observer#init(io.fluo.api.observer.Observer.Context)} method related + * to PCJ exporting to a kafka topic. + * Remember: if doesn't count unless it is added to params + */ + +public class KafkaExportParameterBase extends ParametersBase { + + public KafkaExportParameterBase(final Map<String, String> params) { + super(params); + } + + /** + * Sets the bootstrap servers for reading from and writing to Kafka + * @param bootstrapServers - connect string for Kafka brokers + */ + public void setKafkaBootStrapServers(String bootstrapServers) { + params.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, Preconditions.checkNotNull(bootstrapServers)); + } + + /** + * @return Connect string for Kafka servers + */ + public Optional<String> getKafkaBootStrapServers() { + return Optional.ofNullable(params.get(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG)); + } + + /** + * Add the properties to the params, NOT keeping them separate from the other params. + * Guaranteed by Properties: Each key and its corresponding value in the property list is a string. + * + * @param producerConfig + */ + public void addAllProducerConfig(final Properties producerConfig) { + for (Object key : producerConfig.keySet().toArray()) { + Object value = producerConfig.getProperty(key.toString()); + this.params.put(key.toString(), value.toString()); + } + } + + /** + * Collect all the properties + * + * @return all the params (not just kafka producer Configuration) as a {@link Properties} + */ + public Properties listAllConfig() { + Properties props = new Properties(); + for (Object key : params.keySet().toArray()) { + Object value = params.get(key.toString()); + props.put(key.toString(), value.toString()); + } + return props; + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/05147266/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/kafka/KafkaExportParameters.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/kafka/KafkaExportParameters.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/kafka/KafkaExportParameters.java deleted file mode 100644 index 347a2e2..0000000 --- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/kafka/KafkaExportParameters.java +++ /dev/null @@ -1,86 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.rya.indexing.pcj.fluo.app.export.kafka; - -import java.util.Map; -import java.util.Properties; - -import org.apache.fluo.api.observer.Observer; -import org.apache.rya.indexing.pcj.fluo.app.export.ParametersBase; - -/** - * Provides read/write functions to the parameters map that is passed into an - * {@link Observer#init(io.fluo.api.observer.Observer.Context)} method related - * to PCJ exporting to a kafka topic. - * Remember: if doesn't count unless it is added to params - */ - -public class KafkaExportParameters extends ParametersBase { - - public static final String CONF_EXPORT_TO_KAFKA = "pcj.fluo.export.kafka.enabled"; - - public KafkaExportParameters(final Map<String, String> params) { - super(params); - } - - /** - * @param isExportToKafka - * - {@code True} if the Fluo application should export - * to Kafka; otherwise {@code false}. - */ - public void setExportToKafka(final boolean isExportToKafka) { - setBoolean(params, CONF_EXPORT_TO_KAFKA, isExportToKafka); - } - - /** - * @return {@code True} if the Fluo application should export to Kafka; otherwise - * {@code false}. Defaults to {@code false} if no value is present. - */ - public boolean isExportToKafka() { - return getBoolean(params, CONF_EXPORT_TO_KAFKA, false); - } - - /** - * Add the properties to the params, NOT keeping them separate from the other params. - * Guaranteed by Properties: Each key and its corresponding value in the property list is a string. - * - * @param producerConfig - */ - public void addAllProducerConfig(final Properties producerConfig) { - for (Object key : producerConfig.keySet().toArray()) { - Object value = producerConfig.getProperty(key.toString()); - this.params.put(key.toString(), value.toString()); - } - } - - /** - * Collect all the properties - * - * @return all the params (not just kafka producer Configuration) as a {@link Properties} - */ - public Properties listAllConfig() { - Properties props = new Properties(); - for (Object key : params.keySet().toArray()) { - Object value = params.get(key.toString()); - props.put(key.toString(), value.toString()); - } - return props; - } - -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/05147266/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/kafka/KafkaRyaSubGraphExporter.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/kafka/KafkaRyaSubGraphExporter.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/kafka/KafkaRyaSubGraphExporter.java index fa27b46..da26329 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/kafka/KafkaRyaSubGraphExporter.java +++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/kafka/KafkaRyaSubGraphExporter.java @@ -19,6 +19,7 @@ package org.apache.rya.indexing.pcj.fluo.app.export.kafka; */ import static com.google.common.base.Preconditions.checkNotNull; +import java.util.Set; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; @@ -26,8 +27,13 @@ import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.log4j.Logger; +import org.apache.rya.api.client.CreatePCJ.ExportStrategy; +import org.apache.rya.api.client.CreatePCJ.QueryType; import org.apache.rya.api.domain.RyaSubGraph; import org.apache.rya.indexing.pcj.fluo.app.export.IncrementalBindingSetExporter.ResultExportException; + +import com.google.common.collect.Sets; + import org.apache.rya.indexing.pcj.fluo.app.export.IncrementalRyaSubGraphExporter; /** @@ -76,4 +82,14 @@ public class KafkaRyaSubGraphExporter implements IncrementalRyaSubGraphExporter producer.close(5, TimeUnit.SECONDS); } + @Override + public Set<QueryType> getQueryTypes() { + return Sets.newHashSet(QueryType.CONSTRUCT); + } + + @Override + public ExportStrategy getExportStrategy() { + return ExportStrategy.KAFKA; + } + } http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/05147266/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/kafka/KafkaRyaSubGraphExporterFactory.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/kafka/KafkaRyaSubGraphExporterFactory.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/kafka/KafkaRyaSubGraphExporterFactory.java index 2c1e4c0..60e9294 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/kafka/KafkaRyaSubGraphExporterFactory.java +++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/kafka/KafkaRyaSubGraphExporterFactory.java @@ -21,10 +21,9 @@ import org.apache.fluo.api.observer.Observer.Context; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.log4j.Logger; import org.apache.rya.api.domain.RyaSubGraph; -import org.apache.rya.indexing.pcj.fluo.app.export.IncrementalBindingSetExporterFactory.ConfigurationException; -import org.apache.rya.indexing.pcj.fluo.app.export.IncrementalBindingSetExporterFactory.IncrementalExporterFactoryException; +import org.apache.rya.indexing.pcj.fluo.app.export.IncrementalResultExporter; +import org.apache.rya.indexing.pcj.fluo.app.export.IncrementalResultExporterFactory; import org.apache.rya.indexing.pcj.fluo.app.export.IncrementalRyaSubGraphExporter; -import org.apache.rya.indexing.pcj.fluo.app.export.IncrementalRyaSubGraphExporterFactory; import com.google.common.base.Optional; @@ -33,9 +32,11 @@ import com.google.common.base.Optional; * exporting {@link RyaSubGraph}s from the Rya Fluo application to Kafka. * */ -public class KafkaRyaSubGraphExporterFactory implements IncrementalRyaSubGraphExporterFactory { +public class KafkaRyaSubGraphExporterFactory implements IncrementalResultExporterFactory { private static final Logger log = Logger.getLogger(KafkaRyaSubGraphExporterFactory.class); + public static final String CONF_USE_KAFKA_SUBGRAPH_EXPORTER = "pcj.fluo.export.kafka.subgraph.enabled"; + public static final String CONF_KAFKA_SUBGRAPH_SERIALIZER = "pcj.fluo.export.kafka.subgraph.serializer"; /** * Builds a {@link KafkaRyaSubGraphExporter}. @@ -45,10 +46,10 @@ public class KafkaRyaSubGraphExporterFactory implements IncrementalRyaSubGraphEx * @throws ConfigurationException */ @Override - public Optional<IncrementalRyaSubGraphExporter> build(Context context) throws IncrementalExporterFactoryException, ConfigurationException { - final KafkaExportParameters exportParams = new KafkaExportParameters(context.getObserverConfiguration().toMap()); - log.debug("KafkaRyaSubGraphExporterFactory.build(): params.isExportToKafka()=" + exportParams.isExportToKafka()); - if (exportParams.isExportToKafka()) { + public Optional<IncrementalResultExporter> build(Context context) throws IncrementalExporterFactoryException, ConfigurationException { + final KafkaSubGraphExporterParameters exportParams = new KafkaSubGraphExporterParameters(context.getObserverConfiguration().toMap()); + log.debug("KafkaRyaSubGraphExporterFactory.build(): params.isExportToKafka()=" + exportParams.getUseKafkaSubgraphExporter()); + if (exportParams.getUseKafkaSubgraphExporter()) { // Setup Kafka connection KafkaProducer<String, RyaSubGraph> producer = new KafkaProducer<String, RyaSubGraph>(exportParams.listAllConfig()); // Create the exporter http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/05147266/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/kafka/KafkaSubGraphExporterParameters.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/kafka/KafkaSubGraphExporterParameters.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/kafka/KafkaSubGraphExporterParameters.java new file mode 100644 index 0000000..1472fdd --- /dev/null +++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/kafka/KafkaSubGraphExporterParameters.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.indexing.pcj.fluo.app.export.kafka; + +import java.util.Map; +import java.util.Properties; + +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.common.serialization.StringSerializer; + +import com.google.common.base.Preconditions; + + +public class KafkaSubGraphExporterParameters extends KafkaExportParameterBase { + + public static final String CONF_USE_KAFKA_SUBGRAPH_EXPORTER = "pcj.fluo.export.kafka.subgraph.enabled"; + public static final String CONF_KAFKA_SUBGRAPH_SERIALIZER = "pcj.fluo.export.kafka.subgraph.serializer"; + + public KafkaSubGraphExporterParameters(final Map<String, String> params) { + super(params); + } + + /** + * Instructs the Fluo application to use the Kafka BindingSet Exporter + * and sets the appropriate Key/Value Serializer parameters for writing RyaSubGraphs to Kafka. + * @param useExporter + * - {@code True} if the Fluo application should use the + * {@link KafkaRyaSubGraphExporter}; otherwise {@code false}. + */ + public void setUseKafkaSubgraphExporter(final boolean useExporter) { + setBoolean(params, CONF_USE_KAFKA_SUBGRAPH_EXPORTER, useExporter); + } + + /** + * @return {@code True} if the Fluo application should use the {@link KafkaRyaSubGraphExporter}; otherwise + * {@code false}. Defaults to {@code false} if no value is present. + */ + public boolean getUseKafkaSubgraphExporter() { + return getBoolean(params, CONF_USE_KAFKA_SUBGRAPH_EXPORTER, false); + } + + /** + * + * @param serializer - Used for Serializing RyaSubGraphs pushed to Kafka + */ + public void setKafkaSubGraphSerializer(String serializer) { + params.put(CONF_KAFKA_SUBGRAPH_SERIALIZER, Preconditions.checkNotNull(serializer)); + } + + /** + * @return - Serializer used for Serializing RyaSubGraphs to Kafka + */ + public String getKafkaSubGraphSerializer() { + return params.getOrDefault(CONF_KAFKA_SUBGRAPH_SERIALIZER, RyaSubGraphKafkaSerDe.class.getName()); + } + + @Override + public Properties listAllConfig() { + Properties props = super.listAllConfig(); + props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); + props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, getKafkaSubGraphSerializer()); + return props; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/05147266/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/PeriodicBindingSetExporter.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/PeriodicBindingSetExporter.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/PeriodicBindingSetExporter.java new file mode 100644 index 0000000..604462b --- /dev/null +++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/PeriodicBindingSetExporter.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.indexing.pcj.fluo.app.export.rya; + +import static com.google.common.base.Preconditions.checkNotNull; + +import java.util.Collections; +import java.util.Set; + +import org.apache.rya.api.client.CreatePCJ.ExportStrategy; +import org.apache.rya.api.client.CreatePCJ.QueryType; +import org.apache.rya.indexing.pcj.fluo.app.export.IncrementalBindingSetExporter; +import org.apache.rya.indexing.pcj.storage.PeriodicQueryResultStorage; +import org.apache.rya.indexing.pcj.storage.PeriodicQueryStorageException; +import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet; + +import com.google.common.collect.Sets; + +public class PeriodicBindingSetExporter implements IncrementalBindingSetExporter { + + private PeriodicQueryResultStorage periodicStorage; + + /** + * Constructs an instance of {@link PeriodicBindingSetExporter}. + * + * @param pcjStorage - The PCJ storage the new results will be exported to. (not null) + */ + public PeriodicBindingSetExporter(PeriodicQueryResultStorage periodicStorage) { + this.periodicStorage = checkNotNull(periodicStorage); + } + + @Override + public Set<QueryType> getQueryTypes() { + return Sets.newHashSet(QueryType.PERIODIC); + } + + @Override + public ExportStrategy getExportStrategy() { + return ExportStrategy.RYA; + } + + @Override + public void close() throws Exception { + } + + @Override + public void export(String queryId, VisibilityBindingSet result) throws ResultExportException { + try { + periodicStorage.addPeriodicQueryResults(queryId, Collections.singleton(result)); + } catch (PeriodicQueryStorageException e) { + throw new ResultExportException("Could not successfully export the BindingSet: " + result, e); + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/05147266/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/PeriodicBindingSetExporterFactory.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/PeriodicBindingSetExporterFactory.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/PeriodicBindingSetExporterFactory.java new file mode 100644 index 0000000..0a0b767 --- /dev/null +++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/PeriodicBindingSetExporterFactory.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.indexing.pcj.fluo.app.export.rya; + +import static com.google.common.base.Preconditions.checkNotNull; + +import org.apache.accumulo.core.client.AccumuloException; +import org.apache.accumulo.core.client.AccumuloSecurityException; +import org.apache.accumulo.core.client.Connector; +import org.apache.accumulo.core.client.Instance; +import org.apache.accumulo.core.client.ZooKeeperInstance; +import org.apache.accumulo.core.client.security.tokens.PasswordToken; +import org.apache.fluo.api.observer.Observer.Context; +import org.apache.rya.indexing.pcj.fluo.app.export.IncrementalBindingSetExporter; +import org.apache.rya.indexing.pcj.fluo.app.export.IncrementalResultExporter; +import org.apache.rya.indexing.pcj.fluo.app.export.IncrementalResultExporterFactory; +import org.apache.rya.indexing.pcj.storage.PeriodicQueryResultStorage; +import org.apache.rya.indexing.pcj.storage.accumulo.AccumuloPeriodicQueryResultStorage; + +import com.google.common.base.Optional; + +public class PeriodicBindingSetExporterFactory implements IncrementalResultExporterFactory { + + @Override + public Optional<IncrementalResultExporter> build(Context context) throws IncrementalExporterFactoryException, ConfigurationException { + checkNotNull(context); + + // Wrap the context's parameters for parsing. + final RyaExportParameters params = new RyaExportParameters( context.getObserverConfiguration().toMap() ); + + if(params.getUsePeriodicBindingSetExporter()) { + // Setup Zookeeper connection info. + final String accumuloInstance = params.getAccumuloInstanceName().get(); + final String zookeeperServers = params.getZookeeperServers().get().replaceAll(";", ","); + final Instance inst = new ZooKeeperInstance(accumuloInstance, zookeeperServers); + + try { + // Setup Accumulo connection info. + final String exporterUsername = params.getExporterUsername().get(); + final String exporterPassword = params.getExporterPassword().get(); + final Connector accumuloConn = inst.getConnector(exporterUsername, new PasswordToken(exporterPassword)); + + // Setup Rya PCJ Storage. + final String ryaInstanceName = params.getRyaInstanceName().get(); + final PeriodicQueryResultStorage periodicStorage = new AccumuloPeriodicQueryResultStorage(accumuloConn, ryaInstanceName); + + // Make the exporter. + final IncrementalBindingSetExporter exporter = new PeriodicBindingSetExporter(periodicStorage); + return Optional.of(exporter); + + } catch (final AccumuloException | AccumuloSecurityException e) { + throw new IncrementalExporterFactoryException("Could not initialize the Accumulo connector using the provided configuration.", e); + } + } else { + return Optional.absent(); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/05147266/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/RyaBindingSetExporter.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/RyaBindingSetExporter.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/RyaBindingSetExporter.java index 54c39b7..8a9dbe4 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/RyaBindingSetExporter.java +++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/RyaBindingSetExporter.java @@ -22,55 +22,42 @@ import static com.google.common.base.Preconditions.checkNotNull; import static java.util.Objects.requireNonNull; import java.util.Collections; +import java.util.Set; -import org.apache.fluo.api.client.TransactionBase; -import org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants; +import org.apache.rya.api.client.CreatePCJ.ExportStrategy; +import org.apache.rya.api.client.CreatePCJ.QueryType; import org.apache.rya.indexing.pcj.fluo.app.export.IncrementalBindingSetExporter; -import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns; -import org.apache.rya.indexing.pcj.storage.PeriodicQueryResultStorage; -import org.apache.rya.indexing.pcj.storage.PeriodicQueryStorageException; import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage; import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage.PCJStorageException; import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet; +import com.google.common.collect.Sets; + /** * Incrementally exports SPARQL query results to Accumulo PCJ tables as they are defined by Rya. */ public class RyaBindingSetExporter implements IncrementalBindingSetExporter { private final PrecomputedJoinStorage pcjStorage; - private final PeriodicQueryResultStorage periodicStorage; /** * Constructs an instance of {@link RyaBindingSetExporter}. * * @param pcjStorage - The PCJ storage the new results will be exported to. (not null) */ - public RyaBindingSetExporter(final PrecomputedJoinStorage pcjStorage, PeriodicQueryResultStorage periodicStorage) { + public RyaBindingSetExporter(final PrecomputedJoinStorage pcjStorage) { this.pcjStorage = checkNotNull(pcjStorage); - this.periodicStorage = checkNotNull(periodicStorage); } @Override - public void export( - final TransactionBase fluoTx, - final String queryId, - final VisibilityBindingSet result) throws ResultExportException { - requireNonNull(fluoTx); + public void export(final String queryId, final VisibilityBindingSet result) throws ResultExportException { requireNonNull(queryId); requireNonNull(result); - // Look up the ID the PCJ represents within the PCJ Storage. - final String pcjId = fluoTx.gets(queryId, FluoQueryColumns.RYA_PCJ_ID); - try { - if (result.hasBinding(IncrementalUpdateConstants.PERIODIC_BIN_ID)) { - periodicStorage.addPeriodicQueryResults(pcjId, Collections.singleton(result)); - } else { - pcjStorage.addResults(pcjId, Collections.singleton(result)); - } - } catch (final PCJStorageException | PeriodicQueryStorageException e) { - throw new ResultExportException("A result could not be exported to Rya.", e); + pcjStorage.addResults(queryId, Collections.singleton(result)); + } catch (PCJStorageException e) { + throw new ResultExportException("Unable to successfully export the result: " + result, e); } } @@ -78,4 +65,14 @@ public class RyaBindingSetExporter implements IncrementalBindingSetExporter { public void close() throws Exception { pcjStorage.close(); } + + @Override + public Set<QueryType> getQueryTypes() { + return Sets.newHashSet(QueryType.PROJECTION); + } + + @Override + public ExportStrategy getExportStrategy() { + return ExportStrategy.RYA; + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/05147266/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/RyaBindingSetExporterFactory.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/RyaBindingSetExporterFactory.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/RyaBindingSetExporterFactory.java index 82ce9c6..a87243e 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/RyaBindingSetExporterFactory.java +++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/RyaBindingSetExporterFactory.java @@ -26,8 +26,10 @@ import org.apache.accumulo.core.client.Connector; import org.apache.accumulo.core.client.Instance; import org.apache.accumulo.core.client.ZooKeeperInstance; import org.apache.accumulo.core.client.security.tokens.PasswordToken; +import org.apache.fluo.api.observer.Observer.Context; import org.apache.rya.indexing.pcj.fluo.app.export.IncrementalBindingSetExporter; -import org.apache.rya.indexing.pcj.fluo.app.export.IncrementalBindingSetExporterFactory; +import org.apache.rya.indexing.pcj.fluo.app.export.IncrementalResultExporter; +import org.apache.rya.indexing.pcj.fluo.app.export.IncrementalResultExporterFactory; import org.apache.rya.indexing.pcj.storage.PeriodicQueryResultStorage; import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage; import org.apache.rya.indexing.pcj.storage.accumulo.AccumuloPcjStorage; @@ -35,21 +37,19 @@ import org.apache.rya.indexing.pcj.storage.accumulo.AccumuloPeriodicQueryResultS import com.google.common.base.Optional; -import org.apache.fluo.api.observer.Observer.Context; - /** * Creates instances of {@link RyaBindingSetExporter}. */ -public class RyaBindingSetExporterFactory implements IncrementalBindingSetExporterFactory { +public class RyaBindingSetExporterFactory implements IncrementalResultExporterFactory { @Override - public Optional<IncrementalBindingSetExporter> build(final Context context) throws IncrementalExporterFactoryException, ConfigurationException { + public Optional<IncrementalResultExporter> build(final Context context) throws IncrementalExporterFactoryException, ConfigurationException { checkNotNull(context); // Wrap the context's parameters for parsing. final RyaExportParameters params = new RyaExportParameters( context.getObserverConfiguration().toMap() ); - if(params.isExportToRya()) { + if(params.getUseRyaBindingSetExporter()) { // Setup Zookeeper connection info. final String accumuloInstance = params.getAccumuloInstanceName().get(); final String zookeeperServers = params.getZookeeperServers().get().replaceAll(";", ","); @@ -64,10 +64,9 @@ public class RyaBindingSetExporterFactory implements IncrementalBindingSetExport // Setup Rya PCJ Storage. final String ryaInstanceName = params.getRyaInstanceName().get(); final PrecomputedJoinStorage pcjStorage = new AccumuloPcjStorage(accumuloConn, ryaInstanceName); - final PeriodicQueryResultStorage periodicStorage = new AccumuloPeriodicQueryResultStorage(accumuloConn, ryaInstanceName); // Make the exporter. - final IncrementalBindingSetExporter exporter = new RyaBindingSetExporter(pcjStorage, periodicStorage); + final IncrementalBindingSetExporter exporter = new RyaBindingSetExporter(pcjStorage); return Optional.of(exporter); } catch (final AccumuloException | AccumuloSecurityException e) { http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/05147266/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/RyaExportParameters.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/RyaExportParameters.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/RyaExportParameters.java index a1ba5b8..aa5d3cd 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/RyaExportParameters.java +++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/RyaExportParameters.java @@ -38,7 +38,8 @@ import org.apache.fluo.api.observer.Observer; @DefaultAnnotation(NonNull.class) public class RyaExportParameters extends ParametersBase { - public static final String CONF_EXPORT_TO_RYA = "pcj.fluo.export.rya.enabled"; + public static final String CONF_USE_RYA_BINDING_SET_EXPORTER = "pcj.fluo.export.rya.bindingset.enabled"; + public static final String CONF_USE_PERIODIC_BINDING_SET_EXPORTER = "pcj.fluo.export.periodic.bindingset.enabled"; public static final String CONF_ACCUMULO_INSTANCE_NAME = "pcj.fluo.export.rya.accumuloInstanceName"; public static final String CONF_ZOOKEEPER_SERVERS = "pcj.fluo.export.rya.zookeeperServers"; public static final String CONF_EXPORTER_USERNAME = "pcj.fluo.export.rya.exporterUsername"; @@ -57,19 +58,35 @@ public class RyaExportParameters extends ParametersBase { } /** - * @param isExportToRya - {@code True} if the Fluo application should export - * to Rya; otherwise {@code false}. + * @param useExporter - {@code True} if the Fluo application should use the {@link RyaBindingSetExporter}; otherwise + * {@code false}. */ - public void setExportToRya(final boolean isExportToRya) { - setBoolean(params, CONF_EXPORT_TO_RYA, isExportToRya); + public void setUseRyaBindingSetExporter(final boolean useExporter) { + setBoolean(params, CONF_USE_RYA_BINDING_SET_EXPORTER, useExporter); } /** - * @return {@code True} if the Fluo application should export to Rya; otherwise + * @return {@code True} if the Fluo application should use the {@link RyaBindingSetExporter}; otherwise * {@code false}. Defaults to {@code false} if no value is present. */ - public boolean isExportToRya() { - return getBoolean(params, CONF_EXPORT_TO_RYA, false); + public boolean getUseRyaBindingSetExporter() { + return getBoolean(params, CONF_USE_RYA_BINDING_SET_EXPORTER, false); + } + + /** + * @param useExporter - {@code True} if the Fluo application should use the + * {@link PeriodicBindingSetExporter}; otherwise {@code false}. + */ + public void setUsePeriodicBindingSetExporter(final boolean useExporter) { + setBoolean(params, CONF_USE_PERIODIC_BINDING_SET_EXPORTER, useExporter); + } + + /** + * @return {@code True} if the Fluo application should use the {@link PeriodicBindingSetExporter}; otherwise + * {@code false}. Defaults to {@code false} if no value is present. + */ + public boolean getUsePeriodicBindingSetExporter() { + return getBoolean(params, CONF_USE_PERIODIC_BINDING_SET_EXPORTER, false); } /** http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/05147266/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/RyaSubGraphExportParameters.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/RyaSubGraphExportParameters.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/RyaSubGraphExportParameters.java new file mode 100644 index 0000000..6a99a7e --- /dev/null +++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/RyaSubGraphExportParameters.java @@ -0,0 +1,120 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.indexing.pcj.fluo.app.export.rya; + +import java.util.Map; +import java.util.Optional; + +import org.apache.fluo.api.config.FluoConfiguration; + +import com.google.common.base.Preconditions; + +import edu.umd.cs.findbugs.annotations.Nullable; + +/** + * This class manages the parameters used to construct the RyaSubGraphExporter. + * + */ +public class RyaSubGraphExportParameters extends RyaExportParameters { + + public static final String CONF_FLUO_INSTANCE = "pcj.fluo.export.rya.fluo.instance"; + public static final String CONF_FLUO_INSTANCE_ZOOKEEPERS = "pcj.fluo.export.rya.fluo.instance.zookeepers"; + public static final String CONF_FLUO_TABLE_NAME = "pcj.fluo.export.rya.fluo.table.name"; + public static final String CONF_USE_RYA_SUBGRAPH_EXPORTER = "pcj.fluo.export.rya.subgraph.enabled"; + + + public RyaSubGraphExportParameters(Map<String, String> params) { + super(params); + } + + /** + * @param useExporter - indicates whether to use the {@link RyaSubGraphExporter} + */ + public void setUseRyaSubGraphExporter(boolean useExporter) { + setBoolean(params, CONF_USE_RYA_SUBGRAPH_EXPORTER, useExporter); + } + + /** + * @return boolean indicating whether to use the {@link RyaSubGraphExporter} + */ + public boolean getUseRyaSubGraphExporter() { + return getBoolean(params, CONF_USE_RYA_SUBGRAPH_EXPORTER, false); + } + + /** + * @param fluoInstance - the Accumulo instance that Fluo is running on + */ + public void setFluoInstanceName(String fluoInstance) { + params.put(CONF_FLUO_INSTANCE, Preconditions.checkNotNull(fluoInstance)); + } + + /** + * @return the Accumulo instance that Fluo is running on + */ + public Optional<String> getFluoInstanceName() { + return Optional.ofNullable(params.get(CONF_FLUO_INSTANCE)); + } + + /** + * @param fluoTable - the name of the Accumulo Fluo table + */ + public void setFluoTable(@Nullable String fluoTable) { + params.put(CONF_FLUO_TABLE_NAME, fluoTable); + } + + /** + * @return the name of the Accumulo Fluo table + */ + public Optional<String> getFluoTable() { + return Optional.ofNullable(params.get(CONF_FLUO_TABLE_NAME)); + } + + /** + * @param zookeepers - the zookeepers for the Fluo instance + */ + public void setFluoZookeepers(@Nullable String zookeepers) { + params.put(CONF_FLUO_INSTANCE_ZOOKEEPERS, zookeepers); + } + + /** + * @return - the zookeepers for the Fluo instance + */ + public Optional<String> getFLuoZookeepers() { + return Optional.ofNullable(params.get(CONF_FLUO_INSTANCE_ZOOKEEPERS)); + } + + /** + * Uses underlying parameter map to build a FluoConfiguration object + * @return - FluoConfiguration for creating a FluoClient + */ + public FluoConfiguration getFluoConfiguration() { + final FluoConfiguration config = new FluoConfiguration(); + config.setMiniStartAccumulo(false); + config.setAccumuloInstance(params.get(CONF_ACCUMULO_INSTANCE_NAME)); + config.setAccumuloUser(params.get(CONF_EXPORTER_USERNAME)); + config.setAccumuloPassword(params.get(CONF_EXPORTER_PASSWORD)); + config.setInstanceZookeepers(params.get(CONF_FLUO_INSTANCE_ZOOKEEPERS)); + config.setAccumuloZookeepers(params.get(CONF_ZOOKEEPER_SERVERS)); + + config.setApplicationName(params.get(CONF_FLUO_APP_NAME)); + config.setAccumuloTable(params.get(CONF_FLUO_TABLE_NAME)); + return config; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/05147266/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/RyaSubGraphExporter.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/RyaSubGraphExporter.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/RyaSubGraphExporter.java new file mode 100644 index 0000000..e33ea97 --- /dev/null +++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/RyaSubGraphExporter.java @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.indexing.pcj.fluo.app.export.rya; + +import static com.google.common.base.Preconditions.checkNotNull; + +import java.util.Collection; +import java.util.Map; +import java.util.Set; + +import org.apache.fluo.api.client.FluoClient; +import org.apache.fluo.api.client.TransactionBase; +import org.apache.fluo.api.data.Bytes; +import org.apache.log4j.Logger; +import org.apache.rya.api.RdfCloudTripleStoreConstants.TABLE_LAYOUT; +import org.apache.rya.api.client.CreatePCJ.ExportStrategy; +import org.apache.rya.api.client.CreatePCJ.QueryType; +import org.apache.rya.api.domain.RyaStatement; +import org.apache.rya.api.domain.RyaSubGraph; +import org.apache.rya.api.resolver.triple.TripleRow; +import org.apache.rya.api.resolver.triple.TripleRowResolverException; +import org.apache.rya.api.resolver.triple.impl.WholeRowTripleResolver; +import org.apache.rya.indexing.pcj.fluo.app.export.IncrementalBindingSetExporter.ResultExportException; +import org.apache.rya.indexing.pcj.fluo.app.export.IncrementalRyaSubGraphExporter; +import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns; + +import com.google.common.base.Optional; +import com.google.common.base.Preconditions; +import com.google.common.collect.Sets; + +/** + * This exporter is used to import {@link RyaSubGraph}s back into Fluo. By ingesting + * RyaSubGraphs back into Fluo, queries can be chained together. + * + */ +public class RyaSubGraphExporter implements IncrementalRyaSubGraphExporter { + + private static final Logger log = Logger.getLogger(RyaSubGraphExporter.class); + private static final WholeRowTripleResolver TRIPLE_RESOLVER = new WholeRowTripleResolver(); + private final FluoClient fluo; + + public RyaSubGraphExporter(FluoClient fluo) { + this.fluo = Preconditions.checkNotNull(fluo); + } + + @Override + public Set<QueryType> getQueryTypes() { + return Sets.newHashSet(QueryType.CONSTRUCT); + } + + @Override + public ExportStrategy getExportStrategy() { + return ExportStrategy.RYA; + } + + @Override + public void close() throws Exception { + fluo.close(); + } + + @Override + public void export(String constructID, RyaSubGraph subgraph) throws ResultExportException { + insertTriples(fluo.newTransaction(), subgraph.getStatements()); + } + + private void insertTriples(TransactionBase tx, final Collection<RyaStatement> triples) { + for (final RyaStatement triple : triples) { + Optional<byte[]> visibility = Optional.fromNullable(triple.getColumnVisibility()); + try { + tx.set(Bytes.of(spoFormat(triple)), FluoQueryColumns.TRIPLES, Bytes.of(visibility.or(new byte[0]))); + } catch (final TripleRowResolverException e) { + log.error("Could not convert a Triple into the SPO format: " + triple); + } + } + } + + /** + * Converts a triple into a byte[] holding the Rya SPO representation of it. + * + * @param triple - The triple to convert. (not null) + * @return The Rya SPO representation of the triple. + * @throws TripleRowResolverException The triple could not be converted. + */ + private static byte[] spoFormat(final RyaStatement triple) throws TripleRowResolverException { + checkNotNull(triple); + final Map<TABLE_LAYOUT, TripleRow> serialized = TRIPLE_RESOLVER.serialize(triple); + final TripleRow spoRow = serialized.get(TABLE_LAYOUT.SPO); + return spoRow.getRow(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/05147266/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/RyaSubGraphExporterFactory.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/RyaSubGraphExporterFactory.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/RyaSubGraphExporterFactory.java new file mode 100644 index 0000000..25f60a5 --- /dev/null +++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/RyaSubGraphExporterFactory.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.indexing.pcj.fluo.app.export.rya; + +import org.apache.fluo.api.client.FluoClient; +import org.apache.fluo.api.client.FluoFactory; +import org.apache.fluo.api.config.FluoConfiguration; +import org.apache.fluo.api.observer.Observer.Context; +import org.apache.rya.indexing.pcj.fluo.app.export.IncrementalResultExporter; +import org.apache.rya.indexing.pcj.fluo.app.export.IncrementalResultExporterFactory; + +import com.google.common.base.Optional; +import com.google.common.base.Preconditions; + +/** + * Factory class for building {@link RyaSubGraphExporter}s. + * + */ +public class RyaSubGraphExporterFactory implements IncrementalResultExporterFactory { + + @Override + public Optional<IncrementalResultExporter> build(Context context) throws IncrementalExporterFactoryException, ConfigurationException { + Preconditions.checkNotNull(context); + + RyaSubGraphExportParameters params = new RyaSubGraphExportParameters(context.getObserverConfiguration().toMap()); + + if (params.getUseRyaSubGraphExporter()) { + try { + //Get FluoConfiguration from params + FluoConfiguration conf = params.getFluoConfiguration(); + FluoClient fluo = FluoFactory.newClient(conf); + + //Create exporter + RyaSubGraphExporter exporter = new RyaSubGraphExporter(fluo); + return Optional.of(exporter); + } catch (Exception e) { + throw new IncrementalExporterFactoryException("Could not initialize the RyaSubGraphExporter", e); + } + } + return Optional.absent(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/05147266/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/AggregationObserver.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/AggregationObserver.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/AggregationObserver.java index 1cb1594..6147fa8 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/AggregationObserver.java +++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/AggregationObserver.java @@ -28,7 +28,6 @@ import org.apache.rya.indexing.pcj.fluo.app.AggregationResultUpdater.ObjectSeria import org.apache.rya.indexing.pcj.fluo.app.BindingSetRow; import org.apache.rya.indexing.pcj.fluo.app.query.AggregationMetadata; import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns; -import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryMetadataDAO; import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet; import org.openrdf.query.BindingSet; @@ -45,8 +44,6 @@ public class AggregationObserver extends BindingSetUpdater { private static final AggregationStateSerDe STATE_SERDE = new ObjectSerializationAggregationStateSerDe(); - private final FluoQueryMetadataDAO queryDao = new FluoQueryMetadataDAO(); - @Override public ObservedColumn getObservedColumn() { return new ObservedColumn(FluoQueryColumns.AGGREGATION_BINDING_SET, NotificationType.STRONG); http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/05147266/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/BindingSetUpdater.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/BindingSetUpdater.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/BindingSetUpdater.java index 7d0fd5e..c0cfa1d 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/BindingSetUpdater.java +++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/BindingSetUpdater.java @@ -55,7 +55,7 @@ import edu.umd.cs.findbugs.annotations.NonNull; public abstract class BindingSetUpdater extends AbstractObserver { private static final Logger log = Logger.getLogger(BindingSetUpdater.class); // DAO - private final FluoQueryMetadataDAO queryDao = new FluoQueryMetadataDAO(); + protected final FluoQueryMetadataDAO queryDao = new FluoQueryMetadataDAO(); // Updaters private final JoinResultUpdater joinUpdater = new JoinResultUpdater(); http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/05147266/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/ConstructQueryResultObserver.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/ConstructQueryResultObserver.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/ConstructQueryResultObserver.java index f0fef07..61e7244 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/ConstructQueryResultObserver.java +++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/ConstructQueryResultObserver.java @@ -1,4 +1,3 @@ -package org.apache.rya.indexing.pcj.fluo.app.observers; /* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file @@ -17,54 +16,20 @@ package org.apache.rya.indexing.pcj.fluo.app.observers; * specific language governing permissions and limitations * under the License. */ -import static com.google.common.base.Preconditions.checkNotNull; - -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -import java.io.UnsupportedEncodingException; -import java.util.Collection; -import java.util.HashMap; -import java.util.Map; -import java.util.Set; +package org.apache.rya.indexing.pcj.fluo.app.observers; import org.apache.fluo.api.client.TransactionBase; import org.apache.fluo.api.data.Bytes; import org.apache.fluo.api.data.Column; import org.apache.fluo.api.observer.AbstractObserver; import org.apache.log4j.Logger; -import org.apache.rya.accumulo.utils.VisibilitySimplifier; -import org.apache.rya.api.RdfCloudTripleStoreConstants.TABLE_LAYOUT; import org.apache.rya.api.domain.RyaStatement; -import org.apache.rya.api.domain.RyaSubGraph; -import org.apache.rya.api.resolver.triple.TripleRow; -import org.apache.rya.api.resolver.triple.TripleRowResolverException; -import org.apache.rya.api.resolver.triple.impl.WholeRowTripleResolver; -import org.apache.rya.indexing.pcj.fluo.app.export.IncrementalBindingSetExporterFactory.IncrementalExporterFactoryException; +import org.apache.rya.indexing.pcj.fluo.app.BindingSetRow; +import org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants; +import org.apache.rya.indexing.pcj.fluo.app.NodeType; import org.apache.rya.indexing.pcj.fluo.app.export.IncrementalRyaSubGraphExporter; -import org.apache.rya.indexing.pcj.fluo.app.export.IncrementalRyaSubGraphExporterFactory; -import org.apache.rya.indexing.pcj.fluo.app.export.kafka.KafkaRyaSubGraphExporterFactory; -import org.apache.rya.indexing.pcj.fluo.app.export.kafka.RyaSubGraphKafkaSerDe; import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns; -import com.google.common.base.Optional; -import com.google.common.collect.ImmutableSet; - /** * Monitors the Column {@link FluoQueryColumns#CONSTRUCT_STATEMENTS} for new * Construct Query {@link RyaStatement}s and exports the results using the @@ -74,49 +39,7 @@ import com.google.common.collect.ImmutableSet; */ public class ConstructQueryResultObserver extends AbstractObserver { - private static final WholeRowTripleResolver TRIPLE_RESOLVER = new WholeRowTripleResolver(); private static final Logger log = Logger.getLogger(ConstructQueryResultObserver.class); - private static final RyaSubGraphKafkaSerDe serializer = new RyaSubGraphKafkaSerDe(); - - /** - * We expect to see the same expressions a lot, so we cache the simplified - * forms. - */ - private final Map<String, String> simplifiedVisibilities = new HashMap<>(); - - /** - * Builders for each type of result exporter we support. - */ - private static final ImmutableSet<IncrementalRyaSubGraphExporterFactory> factories = ImmutableSet - .<IncrementalRyaSubGraphExporterFactory> builder().add(new KafkaRyaSubGraphExporterFactory()).build(); - - /** - * The exporters that are configured. - */ - private ImmutableSet<IncrementalRyaSubGraphExporter> exporters = null; - - /** - * Before running, determine which exporters are configured and set them up. - */ - @Override - public void init(final Context context) { - final ImmutableSet.Builder<IncrementalRyaSubGraphExporter> exportersBuilder = ImmutableSet.builder(); - - for (final IncrementalRyaSubGraphExporterFactory builder : factories) { - try { - log.debug("ConstructQueryResultObserver.init(): for each exportersBuilder=" + builder); - - final Optional<IncrementalRyaSubGraphExporter> exporter = builder.build(context); - if (exporter.isPresent()) { - exportersBuilder.add(exporter.get()); - } - } catch (final IncrementalExporterFactoryException e) { - log.error("Could not initialize a result exporter.", e); - } - } - - exporters = exportersBuilder.build(); - } @Override public ObservedColumn getObservedColumn() { @@ -125,74 +48,20 @@ public class ConstructQueryResultObserver extends AbstractObserver { @Override public void process(TransactionBase tx, Bytes row, Column col) throws Exception { + + //Build row for parent that result will be written to + BindingSetRow bsRow = BindingSetRow.make(row); + String constructNodeId = bsRow.getNodeId(); + String bsString= bsRow.getBindingSetString(); + String parentNodeId = tx.get(Bytes.of(constructNodeId), FluoQueryColumns.CONSTRUCT_PARENT_NODE_ID).toString(); + String rowString = parentNodeId + IncrementalUpdateConstants.NODEID_BS_DELIM + bsString; + + //Get NodeType of the parent node + NodeType parentType = NodeType.fromNodeId(parentNodeId).get(); + //Get data for the ConstructQuery result Bytes bytes = tx.get(row, col); - RyaSubGraph subgraph = serializer.fromBytes(bytes.toArray()); - Set<RyaStatement> statements = subgraph.getStatements(); - if (statements.size() > 0) { - byte[] visibility = statements.iterator().next().getColumnVisibility(); - visibility = simplifyVisibilities(visibility); - for(RyaStatement statement: statements) { - statement.setColumnVisibility(visibility); - } - subgraph.setStatements(statements); - - for (IncrementalRyaSubGraphExporter exporter : exporters) { - exporter.export(row.toString(), subgraph); - } - } - //add generated triples back into Fluo for chaining queries together - insertTriples(tx, subgraph.getStatements()); - } - - @Override - public void close() { - if(exporters != null) { - for(final IncrementalRyaSubGraphExporter exporter : exporters) { - try { - exporter.close(); - } catch(final Exception e) { - log.warn("Problem encountered while closing one of the exporters.", e); - } - } - } - } - - private byte[] simplifyVisibilities(byte[] visibilityBytes) throws UnsupportedEncodingException { - // Simplify the result's visibilities and cache new simplified - // visibilities - String visibility = new String(visibilityBytes, "UTF-8"); - if (!simplifiedVisibilities.containsKey(visibility)) { - String simplified = VisibilitySimplifier.simplify(visibility); - simplifiedVisibilities.put(visibility, simplified); - } - return simplifiedVisibilities.get(visibility).getBytes("UTF-8"); + //Write result to parent + tx.set(Bytes.of(rowString), parentType.getResultColumn(), bytes); } - - private void insertTriples(TransactionBase tx, final Collection<RyaStatement> triples) { - - for (final RyaStatement triple : triples) { - Optional<byte[]> visibility = Optional.fromNullable(triple.getColumnVisibility()); - try { - tx.set(Bytes.of(spoFormat(triple)), FluoQueryColumns.TRIPLES, Bytes.of(visibility.or(new byte[0]))); - } catch (final TripleRowResolverException e) { - log.error("Could not convert a Triple into the SPO format: " + triple); - } - } - } - - - /** - * Converts a triple into a byte[] holding the Rya SPO representation of it. - * - * @param triple - The triple to convert. (not null) - * @return The Rya SPO representation of the triple. - * @throws TripleRowResolverException The triple could not be converted. - */ - public static byte[] spoFormat(final RyaStatement triple) throws TripleRowResolverException { - checkNotNull(triple); - final Map<TABLE_LAYOUT, TripleRow> serialized = TRIPLE_RESOLVER.serialize(triple); - final TripleRow spoRow = serialized.get(TABLE_LAYOUT.SPO); - return spoRow.getRow(); - } - + } http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/05147266/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/FilterObserver.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/FilterObserver.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/FilterObserver.java index ee03334..b4edfea 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/FilterObserver.java +++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/FilterObserver.java @@ -25,7 +25,6 @@ import org.apache.fluo.api.data.Bytes; import org.apache.rya.indexing.pcj.fluo.app.BindingSetRow; import org.apache.rya.indexing.pcj.fluo.app.query.FilterMetadata; import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns; -import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryMetadataDAO; import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet; import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSetSerDe; import org.openrdf.query.BindingSet; @@ -39,8 +38,6 @@ public class FilterObserver extends BindingSetUpdater { private static final VisibilityBindingSetSerDe BS_SERDE = new VisibilityBindingSetSerDe(); - private final FluoQueryMetadataDAO queryDao = new FluoQueryMetadataDAO(); - @Override public ObservedColumn getObservedColumn() { return new ObservedColumn(FluoQueryColumns.FILTER_BINDING_SET, NotificationType.STRONG); http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/05147266/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/JoinObserver.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/JoinObserver.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/JoinObserver.java index 28e31d8..c56a98f 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/JoinObserver.java +++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/JoinObserver.java @@ -24,7 +24,6 @@ import org.apache.fluo.api.client.TransactionBase; import org.apache.fluo.api.data.Bytes; import org.apache.rya.indexing.pcj.fluo.app.BindingSetRow; import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns; -import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryMetadataDAO; import org.apache.rya.indexing.pcj.fluo.app.query.JoinMetadata; import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet; import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSetSerDe; @@ -39,8 +38,6 @@ public class JoinObserver extends BindingSetUpdater { private static final VisibilityBindingSetSerDe BS_SERDE = new VisibilityBindingSetSerDe(); - private final FluoQueryMetadataDAO queryDao = new FluoQueryMetadataDAO(); - @Override public ObservedColumn getObservedColumn() { return new ObservedColumn(FluoQueryColumns.JOIN_BINDING_SET, NotificationType.STRONG); http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/05147266/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/PeriodicQueryObserver.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/PeriodicQueryObserver.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/PeriodicQueryObserver.java index e7072e7..7d96baa 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/PeriodicQueryObserver.java +++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/PeriodicQueryObserver.java @@ -25,7 +25,6 @@ import org.apache.fluo.api.data.Bytes; import org.apache.rya.indexing.pcj.fluo.app.BindingSetRow; import org.apache.rya.indexing.pcj.fluo.app.PeriodicQueryUpdater; import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns; -import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryMetadataDAO; import org.apache.rya.indexing.pcj.fluo.app.query.PeriodicQueryMetadata; import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet; import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSetSerDe; @@ -42,7 +41,6 @@ import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSetSerDe; public class PeriodicQueryObserver extends BindingSetUpdater { private static final VisibilityBindingSetSerDe BS_SERDE = new VisibilityBindingSetSerDe(); - private final FluoQueryMetadataDAO queryDao = new FluoQueryMetadataDAO(); @Override public ObservedColumn getObservedColumn() { http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/05147266/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/ProjectionObserver.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/ProjectionObserver.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/ProjectionObserver.java index b712606..5d73b2e 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/ProjectionObserver.java +++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/ProjectionObserver.java @@ -25,7 +25,6 @@ import org.apache.fluo.api.data.Bytes; import org.apache.log4j.Logger; import org.apache.rya.indexing.pcj.fluo.app.BindingSetRow; import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns; -import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryMetadataDAO; import org.apache.rya.indexing.pcj.fluo.app.query.ProjectionMetadata; import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet; import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSetSerDe; @@ -37,7 +36,6 @@ public class ProjectionObserver extends BindingSetUpdater { private static final Logger log = Logger.getLogger(ProjectionObserver.class); private static final VisibilityBindingSetSerDe BS_SERDE = new VisibilityBindingSetSerDe(); - private final FluoQueryMetadataDAO queryDao = new FluoQueryMetadataDAO(); @Override public ObservedColumn getObservedColumn() { http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/05147266/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/QueryResultObserver.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/QueryResultObserver.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/QueryResultObserver.java index e6368ba..ba7beee 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/QueryResultObserver.java +++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/QueryResultObserver.java @@ -20,24 +20,24 @@ package org.apache.rya.indexing.pcj.fluo.app.observers; import static org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.NODEID_BS_DELIM; -import java.util.HashMap; -import java.util.Map; - import org.apache.fluo.api.client.TransactionBase; import org.apache.fluo.api.data.Bytes; import org.apache.fluo.api.data.Column; import org.apache.fluo.api.observer.AbstractObserver; import org.apache.log4j.Logger; -import org.apache.rya.accumulo.utils.VisibilitySimplifier; +import org.apache.rya.indexing.pcj.fluo.app.export.ExporterManager; import org.apache.rya.indexing.pcj.fluo.app.export.IncrementalBindingSetExporter; -import org.apache.rya.indexing.pcj.fluo.app.export.IncrementalBindingSetExporter.ResultExportException; -import org.apache.rya.indexing.pcj.fluo.app.export.IncrementalBindingSetExporterFactory; -import org.apache.rya.indexing.pcj.fluo.app.export.IncrementalBindingSetExporterFactory.IncrementalExporterFactoryException; +import org.apache.rya.indexing.pcj.fluo.app.export.IncrementalResultExporter; +import org.apache.rya.indexing.pcj.fluo.app.export.IncrementalResultExporterFactory; +import org.apache.rya.indexing.pcj.fluo.app.export.IncrementalResultExporterFactory.IncrementalExporterFactoryException; import org.apache.rya.indexing.pcj.fluo.app.export.kafka.KafkaBindingSetExporterFactory; +import org.apache.rya.indexing.pcj.fluo.app.export.kafka.KafkaRyaSubGraphExporterFactory; +import org.apache.rya.indexing.pcj.fluo.app.export.rya.PeriodicBindingSetExporterFactory; import org.apache.rya.indexing.pcj.fluo.app.export.rya.RyaBindingSetExporterFactory; +import org.apache.rya.indexing.pcj.fluo.app.export.rya.RyaSubGraphExporterFactory; import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns; -import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet; -import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSetSerDe; +import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryMetadataDAO; +import org.apache.rya.indexing.pcj.fluo.app.query.QueryMetadata; import com.google.common.base.Optional; import com.google.common.collect.ImmutableSet; @@ -46,28 +46,23 @@ import com.google.common.collect.ImmutableSet; * Performs incremental result exporting to the configured destinations. */ public class QueryResultObserver extends AbstractObserver { + private static final Logger log = Logger.getLogger(QueryResultObserver.class); - - private static final VisibilityBindingSetSerDe BS_SERDE = new VisibilityBindingSetSerDe(); - - /** - * We expect to see the same expressions a lot, so we cache the simplified forms. - */ - private final Map<String, String> simplifiedVisibilities = new HashMap<>(); - + private static final FluoQueryMetadataDAO dao = new FluoQueryMetadataDAO(); + /** - * Builders for each type of result exporter we support. + * Builders for each type of {@link IncrementalBindingSetExporter} we support. */ - private static final ImmutableSet<IncrementalBindingSetExporterFactory> factories = - ImmutableSet.<IncrementalBindingSetExporterFactory>builder() + private static final ImmutableSet<IncrementalResultExporterFactory> factories = + ImmutableSet.<IncrementalResultExporterFactory>builder() .add(new RyaBindingSetExporterFactory()) .add(new KafkaBindingSetExporterFactory()) + .add(new KafkaRyaSubGraphExporterFactory()) + .add(new RyaSubGraphExporterFactory()) + .add(new PeriodicBindingSetExporterFactory()) .build(); - - /** - * The exporters that are configured. - */ - private ImmutableSet<IncrementalBindingSetExporter> exporters = null; + + private ExporterManager exporterManager; @Override public ObservedColumn getObservedColumn() { @@ -79,63 +74,46 @@ public class QueryResultObserver extends AbstractObserver { */ @Override public void init(final Context context) { - final ImmutableSet.Builder<IncrementalBindingSetExporter> exportersBuilder = ImmutableSet.builder(); - - for(final IncrementalBindingSetExporterFactory builder : factories) { + + ExporterManager.Builder managerBuilder = ExporterManager.builder(); + + for(final IncrementalResultExporterFactory builder : factories) { try { log.debug("QueryResultObserver.init(): for each exportersBuilder=" + builder); - final Optional<IncrementalBindingSetExporter> exporter = builder.build(context); + final Optional<IncrementalResultExporter> exporter = builder.build(context); if(exporter.isPresent()) { - exportersBuilder.add(exporter.get()); + managerBuilder.addIncrementalResultExporter(exporter.get()); } } catch (final IncrementalExporterFactoryException e) { log.error("Could not initialize a result exporter.", e); } } - - exporters = exportersBuilder.build(); + + exporterManager = managerBuilder.build(); } + @Override public void process(final TransactionBase tx, final Bytes brow, final Column col) throws Exception { final String row = brow.toString(); - // Read the SPARQL query and it Binding Set from the row id. + // Read the queryId from the row and get the QueryMetadata. final String queryId = row.split(NODEID_BS_DELIM)[0]; + final QueryMetadata metadata = dao.readQueryMetadata(tx, queryId); // Read the Child Binding Set that will be exported. final Bytes valueBytes = tx.get(brow, col); - final VisibilityBindingSet result = BS_SERDE.deserialize(valueBytes); - // Simplify the result's visibilities. - final String visibility = result.getVisibility(); - if(!simplifiedVisibilities.containsKey(visibility)) { - final String simplified = VisibilitySimplifier.simplify( visibility ); - simplifiedVisibilities.put(visibility, simplified); - } - result.setVisibility( simplifiedVisibilities.get(visibility) ); - - // Export the result using each of the provided exporters. - for(final IncrementalBindingSetExporter exporter : exporters) { - try { - exporter.export(tx, queryId, result); - } catch (final ResultExportException e) { - log.error("Could not export a binding set for query '" + queryId + "'. Binding Set: " + result, e); - } - } + exporterManager.export(metadata.getQueryType(), metadata.getExportStrategies(), queryId, valueBytes); } @Override public void close() { - if(exporters != null) { - for(final IncrementalBindingSetExporter exporter : exporters) { - try { - exporter.close(); - } catch(final Exception e) { - log.warn("Problem encountered while closing one of the exporters.", e); - } - } + try { + exporterManager.close(); + } catch (Exception e) { + log.warn("Encountered problems closing the ExporterManager."); } } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/05147266/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/StatementPatternObserver.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/StatementPatternObserver.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/StatementPatternObserver.java index 69a651e..607267a 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/StatementPatternObserver.java +++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/StatementPatternObserver.java @@ -24,7 +24,6 @@ import org.apache.fluo.api.client.TransactionBase; import org.apache.fluo.api.data.Bytes; import org.apache.rya.indexing.pcj.fluo.app.BindingSetRow; import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns; -import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryMetadataDAO; import org.apache.rya.indexing.pcj.fluo.app.query.StatementPatternMetadata; import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet; import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSetSerDe; @@ -39,9 +38,6 @@ public class StatementPatternObserver extends BindingSetUpdater { private static final VisibilityBindingSetSerDe BS_SERDE = new VisibilityBindingSetSerDe(); - // DAO - private final FluoQueryMetadataDAO queryDao = new FluoQueryMetadataDAO(); - @Override public ObservedColumn getObservedColumn() { return new ObservedColumn(FluoQueryColumns.STATEMENT_PATTERN_BINDING_SET, NotificationType.STRONG);