http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/05147266/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQuery.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQuery.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQuery.java index 65db02c..17ab14f 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQuery.java +++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQuery.java @@ -27,7 +27,8 @@ import java.util.Map; import java.util.Map.Entry; import org.apache.commons.lang3.builder.EqualsBuilder; -import org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.QueryType; +import org.apache.rya.api.client.CreatePCJ.ExportStrategy; +import org.apache.rya.api.client.CreatePCJ.QueryType; import com.google.common.base.Objects; import com.google.common.base.Optional; @@ -92,9 +93,9 @@ public class FluoQuery { this.filterMetadata = requireNonNull(filterMetadata); this.joinMetadata = requireNonNull(joinMetadata); if(constructMetadata.isPresent()) { - this.type = QueryType.Construct; + this.type = QueryType.CONSTRUCT; } else { - this.type = QueryType.Projection; + this.type = QueryType.PROJECTION; } } @@ -568,8 +569,9 @@ public class FluoQuery { /** * @return Creates a {@link FluoQuery} using the values that have been supplied to this builder. + * @throws UnsupportedQueryException */ - public FluoQuery build() { + public FluoQuery build() throws UnsupportedQueryException { checkArgument((projectionBuilders.size() > 0 || constructBuilder != null)); Optional<PeriodicQueryMetadata.Builder> optionalPeriodicQueryBuilder = getPeriodicQueryBuilder(); @@ -603,12 +605,18 @@ public class FluoQuery { aggregateMetadata.put(entry.getKey(), entry.getValue().build()); } + QueryMetadata qMetadata = queryBuilder.build(); + if(constructBuilder != null) { if(periodicQueryMetadata != null) { - throw new IllegalArgumentException("Queries containing sliding window filters and construct query patterns are not supported."); + throw new UnsupportedQueryException("Queries containing sliding window filters and construct query patterns are not supported."); } - return new FluoQuery(queryBuilder.build(), projectionMetadata.build(), Optional.of(constructBuilder.build()), Optional.fromNullable(periodicQueryMetadata), spMetadata.build(), filterMetadata.build(), joinMetadata.build(), aggregateMetadata.build()); + return new FluoQuery(qMetadata, projectionMetadata.build(), Optional.of(constructBuilder.build()), Optional.fromNullable(periodicQueryMetadata), spMetadata.build(), filterMetadata.build(), joinMetadata.build(), aggregateMetadata.build()); } else { + if(aggregationBuilders.size() > 0 && qMetadata.getQueryType() == QueryType.PROJECTION && qMetadata.getExportStrategies().contains(ExportStrategy.RYA)) { + throw new UnsupportedQueryException("Exporting to Rya PCJ tables is currently not supported for queries containing aggregations."); + } + return new FluoQuery(queryBuilder.build(), projectionMetadata.build(), Optional.absent(), Optional.fromNullable(periodicQueryMetadata), spMetadata.build(), filterMetadata.build(), joinMetadata.build(), aggregateMetadata.build()); }
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/05147266/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryColumns.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryColumns.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryColumns.java index 2eae4ff..8569a48 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryColumns.java +++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryColumns.java @@ -162,32 +162,6 @@ public class FluoQueryColumns { */ public static final Column TRIPLES = new Column("triples", "SPO"); - /** - * Stores the Rya assigned PCJ ID that the query's results reflect. This - * value defines where the results will be exported to. - * <p> - * <table border="1" style="width:100%"> - * <tr> <th>Fluo Row</td> <th>Fluo Column</td> <th>Fluo Value</td> </tr> - * <tr> <td>Query ID</td> <td>query:ryaPcjId</td> <td>Identifies which PCJ the results of this query will be exported to.</td> </tr> - * </table> - * </p> - */ - public static final Column RYA_PCJ_ID = new Column("query", "ryaPcjId"); - - /** - * Associates a PCJ ID with a Query ID. This enables a quick lookup of the Query ID from the PCJ ID and is useful of Deleting PCJs. - * <p> - * <table border="1" style="width:100%"> - * <tr> <th>Fluo Row</td> <th>Fluo Column</td> <th>Fluo Value</td> </tr> - * <tr> <td>PCJ ID</td> <td>ryaPcjId:queryId</td> <td>Identifies which Query ID is associated with the given PCJ ID.</td> </tr> - * </table> - * </p> - */ - public static final Column PCJ_ID_QUERY_ID = new Column("ryaPcjId", "queryId"); - - // Sparql to Query ID used to list all queries that are in the system. - public static final Column QUERY_ID = new Column("sparql", "queryId"); - // Query Metadata columns. public static final Column QUERY_NODE_ID = new Column(QUERY_METADATA_CF, "nodeId"); public static final Column QUERY_VARIABLE_ORDER = new Column(QUERY_METADATA_CF, "variableOrder"); http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/05147266/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryMetadataDAO.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryMetadataDAO.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryMetadataDAO.java index 1c34836..d5d9fe7 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryMetadataDAO.java +++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryMetadataDAO.java @@ -34,11 +34,11 @@ import org.apache.fluo.api.client.SnapshotBase; import org.apache.fluo.api.client.TransactionBase; import org.apache.fluo.api.data.Bytes; import org.apache.fluo.api.data.Column; +import org.apache.rya.api.client.CreatePCJ.ExportStrategy; +import org.apache.rya.api.client.CreatePCJ.QueryType; import org.apache.rya.indexing.pcj.fluo.app.ConstructGraph; import org.apache.rya.indexing.pcj.fluo.app.ConstructGraphSerializer; import org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants; -import org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.ExportStrategy; -import org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.QueryType; import org.apache.rya.indexing.pcj.fluo.app.NodeType; import org.apache.rya.indexing.pcj.fluo.app.query.AggregationMetadata.AggregationElement; import org.apache.rya.indexing.pcj.fluo.app.query.JoinMetadata.JoinType; @@ -585,21 +585,13 @@ public class FluoQueryMetadataDAO { requireNonNull(tx); requireNonNull(query); - QueryMetadata queryMetadata = query.getQueryMetadata(); - final String sparql = queryMetadata.getSparql(); - final String queryId = queryMetadata.getNodeId(); - final String pcjId = queryMetadata.getExportId(); - // The results of the query are eventually exported to an instance // of Rya, so store the Rya ID for the PCJ. - tx.set(queryId, FluoQueryColumns.RYA_PCJ_ID, pcjId); - tx.set(pcjId, FluoQueryColumns.PCJ_ID_QUERY_ID, queryId); - tx.set(Bytes.of(sparql), FluoQueryColumns.QUERY_ID, Bytes.of(queryId)); - write(tx, queryMetadata); + write(tx, query.getQueryMetadata()); // Write the rest of the metadata objects. - if (query.getQueryType() == QueryType.Construct) { + if (query.getQueryType() == QueryType.CONSTRUCT) { ConstructQueryMetadata constructMetadata = query.getConstructQueryMetadata().get(); write(tx, constructMetadata); } @@ -636,8 +628,9 @@ public class FluoQueryMetadataDAO { * @param sx - The snapshot that will be used to read the metadata from the Fluo table. (not null) * @param queryId - The ID of the query whose nodes will be read. (not null) * @return The {@link FluoQuery} that was read from table. + * @throws UnsupportedQueryException */ - public FluoQuery readFluoQuery(final SnapshotBase sx, final String queryId) { + public FluoQuery readFluoQuery(final SnapshotBase sx, final String queryId) throws UnsupportedQueryException { requireNonNull(sx); requireNonNull(queryId); http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/05147266/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/QueryMetadata.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/QueryMetadata.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/QueryMetadata.java index e46b405..40c9e03 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/QueryMetadata.java +++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/QueryMetadata.java @@ -24,12 +24,11 @@ import java.util.Optional; import java.util.Set; import org.apache.commons.lang3.builder.EqualsBuilder; -import org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.ExportStrategy; -import org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.QueryType; +import org.apache.rya.api.client.CreatePCJ.ExportStrategy; +import org.apache.rya.api.client.CreatePCJ.QueryType; import org.apache.rya.indexing.pcj.storage.accumulo.VariableOrder; import com.google.common.base.Objects; -import com.google.common.base.Preconditions; import edu.umd.cs.findbugs.annotations.DefaultAnnotation; import edu.umd.cs.findbugs.annotations.NonNull; http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/05147266/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/SparqlFluoQueryBuilder.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/SparqlFluoQueryBuilder.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/SparqlFluoQueryBuilder.java index 7bf6f45..7b21575 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/SparqlFluoQueryBuilder.java +++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/SparqlFluoQueryBuilder.java @@ -40,12 +40,12 @@ import java.util.Set; import java.util.UUID; import java.util.concurrent.atomic.AtomicReference; +import org.apache.rya.api.client.CreatePCJ.ExportStrategy; +import org.apache.rya.api.client.CreatePCJ.QueryType; import org.apache.rya.indexing.pcj.fluo.app.ConstructGraph; import org.apache.rya.indexing.pcj.fluo.app.ConstructProjection; import org.apache.rya.indexing.pcj.fluo.app.FluoStringConverter; import org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants; -import org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.ExportStrategy; -import org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.QueryType; import org.apache.rya.indexing.pcj.fluo.app.NodeType; import org.apache.rya.indexing.pcj.fluo.app.query.AggregationMetadata.AggregationElement; import org.apache.rya.indexing.pcj.fluo.app.query.AggregationMetadata.AggregationType; @@ -106,7 +106,7 @@ public class SparqlFluoQueryBuilder { //Default behavior is to export to Kafka - subject to change when user can //specify their own export strategy - private Set<ExportStrategy> exportStrategies = new HashSet<>(Arrays.asList(ExportStrategy.Kafka)); + private Set<ExportStrategy> exportStrategies = new HashSet<>(Arrays.asList(ExportStrategy.KAFKA)); public SparqlFluoQueryBuilder setSparql(String sparql) { this.sparql = Preconditions.checkNotNull(sparql); @@ -145,7 +145,7 @@ public class SparqlFluoQueryBuilder { return this; } - public FluoQuery build() { + public FluoQuery build() throws UnsupportedQueryException { Preconditions.checkNotNull(sparql); Preconditions.checkNotNull(queryId); Preconditions.checkNotNull(exportStrategies); @@ -172,10 +172,12 @@ public class SparqlFluoQueryBuilder { QueryMetadata.Builder queryBuilder = QueryMetadata.builder(queryId); //sets {@link QueryType} and VariableOrder setVarOrderAndQueryType(queryBuilder, te); - queryBuilder.setSparql(sparql); - queryBuilder.setChildNodeId(childNodeId); - queryBuilder.setExportStrategies(exportStrategies); - queryBuilder.setJoinBatchSize(joinBatchSize); + queryBuilder + .setSparql(sparql) + .setChildNodeId(childNodeId) + .setExportStrategies(exportStrategies) + .setJoinBatchSize(joinBatchSize); + fluoQueryBuilder.setQueryMetadata(queryBuilder); setChildMetadata(fluoQueryBuilder, childNodeId, queryBuilder.getVariableOrder(), queryId); @@ -800,7 +802,7 @@ public class SparqlFluoQueryBuilder { } if(queryType == null) { - queryType = QueryType.Projection; + queryType = QueryType.PROJECTION; } super.meet(node); } @@ -811,14 +813,14 @@ public class SparqlFluoQueryBuilder { } if(queryType == null) { - queryType = QueryType.Construct; + queryType = QueryType.CONSTRUCT; } super.meet(node); } public void meetOther(final QueryModelNode node) throws Exception { if (node instanceof PeriodicQueryNode) { - queryType = QueryType.Periodic; + queryType = QueryType.PERIODIC; } else { super.meetOther(node); } http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/05147266/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/UnsupportedQueryException.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/UnsupportedQueryException.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/UnsupportedQueryException.java new file mode 100644 index 0000000..155b8da --- /dev/null +++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/UnsupportedQueryException.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.indexing.pcj.fluo.app.query; + +import org.apache.rya.api.client.CreatePCJ.ExportStrategy; + +/** + * This Exception thrown if the Rya Fluo Application does not support + * the given SPARQL query. This could happen for a number of reasons. The + * two most common reasons are that the query possesses some combination of query nodes + * that the application can't evaluate, or that the {@link ExportStrategy} of the query + * is incompatible with one of its query nodes. + * + */ +public class UnsupportedQueryException extends Exception { + private static final long serialVersionUID = 1L; + + public UnsupportedQueryException(final String message) { + super(message); + } + + public UnsupportedQueryException(final String message, final Throwable cause) { + super(message, cause); + } +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/05147266/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/util/FluoQueryUtils.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/util/FluoQueryUtils.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/util/FluoQueryUtils.java index ac41160..7a5b439 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/util/FluoQueryUtils.java +++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/util/FluoQueryUtils.java @@ -19,6 +19,7 @@ package org.apache.rya.indexing.pcj.fluo.app.util; import java.util.List; +import java.util.UUID; import org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants; import org.apache.rya.indexing.pcj.fluo.app.query.FluoQuery; @@ -60,6 +61,13 @@ public class FluoQueryUtils { } /** + * @return - A new pcjId, which is a UUID with all dashes removed + */ + public static String createNewPcjId() { + return UUID.randomUUID().toString().replaceAll("-", ""); + } + + /** * Uses a {@link NodeIdCollector} visitor to do a pre-order traverse of the * FluoQuery and gather the nodeIds of the metadata nodes. * @param query - FluoQuery to be traversed http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/05147266/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/KafkaExportParametersTest.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/KafkaExportParametersTest.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/KafkaExportParametersTest.java index b9c10d4..cd21ed6 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/KafkaExportParametersTest.java +++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/KafkaExportParametersTest.java @@ -27,12 +27,13 @@ import java.util.HashMap; import java.util.Map; import java.util.Properties; -import org.apache.rya.indexing.pcj.fluo.app.export.kafka.KafkaExportParameters; +import org.apache.rya.indexing.pcj.fluo.app.export.kafka.KafkaExportParameterBase; import org.apache.rya.indexing.pcj.fluo.app.export.kafka.KafkaBindingSetExporterFactory; +import org.apache.rya.indexing.pcj.fluo.app.export.kafka.KafkaBindingSetExporterParameters; import org.junit.Test; /** - * Tests the methods of {@link KafkaExportParameters}. + * Tests the methods of {@link KafkaExportParameterBase}. */ public class KafkaExportParametersTest { @@ -41,19 +42,19 @@ public class KafkaExportParametersTest { final Map<String, String> params = new HashMap<>(); // Load some values into the params using the wrapper. - final KafkaExportParameters kafkaParams = new KafkaExportParameters(params); - kafkaParams.setExportToKafka(true); + final KafkaBindingSetExporterParameters kafkaParams = new KafkaBindingSetExporterParameters(params); + kafkaParams.setUseKafkaBindingSetExporter(true); // Ensure the params map has the expected values. final Map<String, String> expectedParams = new HashMap<>(); - expectedParams.put(KafkaExportParameters.CONF_EXPORT_TO_KAFKA, "true"); - assertTrue(kafkaParams.isExportToKafka()); + expectedParams.put(KafkaBindingSetExporterParameters.CONF_USE_KAFKA_BINDING_SET_EXPORTER, "true"); + assertTrue(kafkaParams.getUseKafkaBindingSetExporter()); assertEquals(expectedParams, params); // now go the other way. - expectedParams.put(KafkaExportParameters.CONF_EXPORT_TO_KAFKA, "false"); - kafkaParams.setExportToKafka(false); - assertFalse(kafkaParams.isExportToKafka()); + expectedParams.put(KafkaBindingSetExporterParameters.CONF_USE_KAFKA_BINDING_SET_EXPORTER, "false"); + kafkaParams.setUseKafkaBindingSetExporter(false); + assertFalse(kafkaParams.getUseKafkaBindingSetExporter()); assertEquals(expectedParams, params); } @Test @@ -68,7 +69,7 @@ public class KafkaExportParametersTest { // Make sure export key1 is NOT kept separate from producer config key1 // This is a change, originally they were kept separate. params.put(key1, value1First); - final KafkaExportParameters kafkaParams = new KafkaExportParameters(params); + final KafkaExportParameterBase kafkaParams = new KafkaExportParameterBase(params); // Load some values into the properties using the wrapper. Properties props = new Properties(); props.put(key1, value1Second); @@ -87,8 +88,8 @@ public class KafkaExportParametersTest { final Map<String, String> params = new HashMap<>(); // Ensure an unconfigured parameters map will say kafka export is disabled. - final KafkaExportParameters kafkaParams = new KafkaExportParameters(params); - assertFalse(kafkaParams.isExportToKafka()); + final KafkaBindingSetExporterParameters kafkaParams = new KafkaBindingSetExporterParameters(params); + assertFalse(kafkaParams.getUseKafkaBindingSetExporter()); } @Test http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/05147266/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/RyaExportParametersTest.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/RyaExportParametersTest.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/RyaExportParametersTest.java index 9ac5139..5653312 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/RyaExportParametersTest.java +++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/RyaExportParametersTest.java @@ -37,7 +37,7 @@ public class RyaExportParametersTest { // Load some values into the params using the wrapper. final RyaExportParameters ryaParams = new RyaExportParameters(params); - ryaParams.setExportToRya(true); + ryaParams.setUseRyaBindingSetExporter(true); ryaParams.setAccumuloInstanceName("demoAccumulo"); ryaParams.setZookeeperServers("zoo1;zoo2"); ryaParams.setExporterUsername("fluo"); @@ -45,7 +45,7 @@ public class RyaExportParametersTest { // Ensure the params map has the expected values. final Map<String, String> expectedParams = new HashMap<>(); - expectedParams.put(RyaExportParameters.CONF_EXPORT_TO_RYA, "true"); + expectedParams.put(RyaExportParameters.CONF_USE_RYA_BINDING_SET_EXPORTER, "true"); expectedParams.put(RyaExportParameters.CONF_ACCUMULO_INSTANCE_NAME, "demoAccumulo"); expectedParams.put(RyaExportParameters.CONF_ZOOKEEPER_SERVERS, "zoo1;zoo2"); expectedParams.put(RyaExportParameters.CONF_EXPORTER_USERNAME, "fluo"); @@ -60,6 +60,6 @@ public class RyaExportParametersTest { // Ensure an unconfigured parameters map will say rya export is disabled. final RyaExportParameters ryaParams = new RyaExportParameters(params); - assertFalse(ryaParams.isExportToRya()); + assertFalse(ryaParams.getUseRyaBindingSetExporter()); } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/05147266/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/query/PeriodicQueryUtilTest.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/query/PeriodicQueryUtilTest.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/query/PeriodicQueryUtilTest.java index b40ba3f..55455a7 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/query/PeriodicQueryUtilTest.java +++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/query/PeriodicQueryUtilTest.java @@ -154,7 +154,7 @@ public class PeriodicQueryUtilTest { } @Test - public void testFluoQueryVarOrders() throws MalformedQueryException { + public void testFluoQueryVarOrders() throws MalformedQueryException, UnsupportedQueryException { String query = "prefix function: <http://org.apache.rya/function#> " //n + "prefix time: <http://www.w3.org/2006/time#> " //n + "select (count(?obs) as ?total) where {" //n http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/05147266/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/query/QueryMetadataVisitorTest.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/query/QueryMetadataVisitorTest.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/query/QueryMetadataVisitorTest.java index 5c89a75..48f2f39 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/query/QueryMetadataVisitorTest.java +++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/query/QueryMetadataVisitorTest.java @@ -29,7 +29,7 @@ import org.junit.Test; public class QueryMetadataVisitorTest { @Test - public void builderTest() { + public void builderTest() throws UnsupportedQueryException { String query = "prefix function: <http://org.apache.rya/function#> " // n + "prefix time: <http://www.w3.org/2006/time#> " // n + "select ?id (count(?obs) as ?total) where {" // n http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/05147266/extras/rya.pcj.fluo/pcj.fluo.client/src/main/java/org/apache/rya/indexing/pcj/fluo/client/PcjAdminClient.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.client/src/main/java/org/apache/rya/indexing/pcj/fluo/client/PcjAdminClient.java b/extras/rya.pcj.fluo/pcj.fluo.client/src/main/java/org/apache/rya/indexing/pcj/fluo/client/PcjAdminClient.java index 901f39d..cc74f6b 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.client/src/main/java/org/apache/rya/indexing/pcj/fluo/client/PcjAdminClient.java +++ b/extras/rya.pcj.fluo/pcj.fluo.client/src/main/java/org/apache/rya/indexing/pcj/fluo/client/PcjAdminClient.java @@ -43,6 +43,7 @@ import org.apache.accumulo.core.client.ZooKeeperInstance; import org.apache.accumulo.core.client.security.tokens.PasswordToken; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import org.apache.rya.indexing.pcj.fluo.app.query.UnsupportedQueryException; import org.apache.rya.indexing.pcj.fluo.client.PcjAdminClientCommand.ArgumentsException; import org.apache.rya.indexing.pcj.fluo.client.PcjAdminClientCommand.ExecutionException; import org.apache.rya.indexing.pcj.fluo.client.command.CountUnprocessedStatementsCommand; @@ -152,6 +153,9 @@ public class PcjAdminClient { System.err.println("Could not execute the command."); e.printStackTrace(); System.exit(-1); + } catch (UnsupportedQueryException e) { + System.err.println("Could not execute the command because the query is invalid."); + e.printStackTrace(); } finally { log.trace("Shutting down the PCJ Admin Client."); http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/05147266/extras/rya.pcj.fluo/pcj.fluo.client/src/main/java/org/apache/rya/indexing/pcj/fluo/client/PcjAdminClientCommand.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.client/src/main/java/org/apache/rya/indexing/pcj/fluo/client/PcjAdminClientCommand.java b/extras/rya.pcj.fluo/pcj.fluo.client/src/main/java/org/apache/rya/indexing/pcj/fluo/client/PcjAdminClientCommand.java index 2b3b105..a944b33 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.client/src/main/java/org/apache/rya/indexing/pcj/fluo/client/PcjAdminClientCommand.java +++ b/extras/rya.pcj.fluo/pcj.fluo.client/src/main/java/org/apache/rya/indexing/pcj/fluo/client/PcjAdminClientCommand.java @@ -24,6 +24,7 @@ import edu.umd.cs.findbugs.annotations.NonNull; import org.apache.accumulo.core.client.Connector; import org.apache.fluo.api.client.FluoClient; +import org.apache.rya.indexing.pcj.fluo.app.query.UnsupportedQueryException; import org.apache.rya.rdftriplestore.RyaSailRepository; /** @@ -57,13 +58,14 @@ public interface PcjAdminClientCommand { * @param rya - A connection to the Rya instance used to search for historic PCJ matches. (not null) * @param client - A connection to the Fluo app that is updating the PCJs. (not null) * @param args - Command line arguments that configure how the command will execute. (not null) + * @throws UnsupportedQueryException */ public void execute( final Connector accumulo, final String ryaTablePrefix, final RyaSailRepository rya, final FluoClient fluo, - final String[] args) throws ArgumentsException, ExecutionException; + final String[] args) throws ArgumentsException, ExecutionException, UnsupportedQueryException; /** * A {@link PcjAdminClientCommand} could not be executed because of a problem with http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/05147266/extras/rya.pcj.fluo/pcj.fluo.client/src/main/java/org/apache/rya/indexing/pcj/fluo/client/command/NewQueryCommand.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.client/src/main/java/org/apache/rya/indexing/pcj/fluo/client/command/NewQueryCommand.java b/extras/rya.pcj.fluo/pcj.fluo.client/src/main/java/org/apache/rya/indexing/pcj/fluo/client/command/NewQueryCommand.java index 3f335f4..78515d9 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.client/src/main/java/org/apache/rya/indexing/pcj/fluo/client/command/NewQueryCommand.java +++ b/extras/rya.pcj.fluo/pcj.fluo.client/src/main/java/org/apache/rya/indexing/pcj/fluo/client/command/NewQueryCommand.java @@ -42,6 +42,7 @@ import org.apache.rya.accumulo.AccumuloRdfConfiguration; import org.apache.rya.accumulo.query.AccumuloRyaQueryEngine; import org.apache.rya.api.persist.RyaDAOException; import org.apache.rya.indexing.pcj.fluo.api.CreateFluoPcj; +import org.apache.rya.indexing.pcj.fluo.app.query.UnsupportedQueryException; import org.apache.rya.indexing.pcj.fluo.client.PcjAdminClientCommand; import org.apache.rya.indexing.pcj.fluo.client.util.ParsedQueryRequest; import org.apache.rya.indexing.pcj.storage.PcjException; @@ -94,7 +95,7 @@ public class NewQueryCommand implements PcjAdminClientCommand { } @Override - public void execute(final Connector accumulo, final String ryaTablePrefix, final RyaSailRepository rya, final FluoClient fluo, final String[] args) throws ArgumentsException, ExecutionException { + public void execute(final Connector accumulo, final String ryaTablePrefix, final RyaSailRepository rya, final FluoClient fluo, final String[] args) throws ArgumentsException, ExecutionException, UnsupportedQueryException { checkNotNull(accumulo); checkNotNull(fluo); checkNotNull(args); http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/05147266/extras/rya.pcj.fluo/pcj.fluo.client/src/main/java/org/apache/rya/indexing/pcj/fluo/client/command/QueryReportCommand.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.client/src/main/java/org/apache/rya/indexing/pcj/fluo/client/command/QueryReportCommand.java b/extras/rya.pcj.fluo/pcj.fluo.client/src/main/java/org/apache/rya/indexing/pcj/fluo/client/command/QueryReportCommand.java index 675a844..2a7f787 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.client/src/main/java/org/apache/rya/indexing/pcj/fluo/client/command/QueryReportCommand.java +++ b/extras/rya.pcj.fluo/pcj.fluo.client/src/main/java/org/apache/rya/indexing/pcj/fluo/client/command/QueryReportCommand.java @@ -26,6 +26,7 @@ import org.apache.logging.log4j.Logger; import org.apache.rya.indexing.pcj.fluo.client.PcjAdminClientCommand; import org.apache.rya.indexing.pcj.fluo.api.GetQueryReport; import org.apache.rya.indexing.pcj.fluo.api.GetQueryReport.QueryReport; +import org.apache.rya.indexing.pcj.fluo.app.query.UnsupportedQueryException; import org.apache.rya.indexing.pcj.fluo.client.util.QueryReportRenderer; import com.beust.jcommander.JCommander; @@ -69,7 +70,7 @@ public class QueryReportCommand implements PcjAdminClientCommand { } @Override - public void execute(final Connector accumulo, final String ryaTablePrefix, final RyaSailRepository rya, final FluoClient fluo, final String[] args) throws ArgumentsException, ExecutionException { + public void execute(final Connector accumulo, final String ryaTablePrefix, final RyaSailRepository rya, final FluoClient fluo, final String[] args) throws ArgumentsException, ExecutionException, UnsupportedQueryException { checkNotNull(accumulo); checkNotNull(ryaTablePrefix); checkNotNull(rya); http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/05147266/extras/rya.pcj.fluo/pcj.fluo.client/src/main/java/org/apache/rya/indexing/pcj/fluo/client/util/QueryReportRenderer.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.client/src/main/java/org/apache/rya/indexing/pcj/fluo/client/util/QueryReportRenderer.java b/extras/rya.pcj.fluo/pcj.fluo.client/src/main/java/org/apache/rya/indexing/pcj/fluo/client/util/QueryReportRenderer.java index f44db6c..d1b3e25 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.client/src/main/java/org/apache/rya/indexing/pcj/fluo/client/util/QueryReportRenderer.java +++ b/extras/rya.pcj.fluo/pcj.fluo.client/src/main/java/org/apache/rya/indexing/pcj/fluo/client/util/QueryReportRenderer.java @@ -20,12 +20,9 @@ package org.apache.rya.indexing.pcj.fluo.client.util; import static com.google.common.base.Preconditions.checkNotNull; -import edu.umd.cs.findbugs.annotations.DefaultAnnotation; -import edu.umd.cs.findbugs.annotations.NonNull; - import org.apache.commons.lang3.StringUtils; +import org.apache.rya.api.client.CreatePCJ.QueryType; import org.apache.rya.indexing.pcj.fluo.api.GetQueryReport.QueryReport; -import org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.QueryType; import org.apache.rya.indexing.pcj.fluo.app.query.ConstructQueryMetadata; import org.apache.rya.indexing.pcj.fluo.app.query.FilterMetadata; import org.apache.rya.indexing.pcj.fluo.app.query.FluoQuery; @@ -38,6 +35,9 @@ import org.openrdf.query.parser.ParsedQuery; import org.openrdf.query.parser.sparql.SPARQLParser; import org.openrdf.queryrender.sparql.SPARQLQueryRenderer; +import edu.umd.cs.findbugs.annotations.DefaultAnnotation; +import edu.umd.cs.findbugs.annotations.NonNull; + /** * Pretty renders a {@link QueryReport}. */ @@ -70,7 +70,7 @@ public class QueryReportRenderer { - if (metadata.getQueryType() == QueryType.Construct) { + if (metadata.getQueryType() == QueryType.CONSTRUCT) { builder.appendItem( new ReportItem("") ); final ConstructQueryMetadata constructMetadata = metadata.getConstructQueryMetadata().get(); http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/05147266/extras/rya.pcj.fluo/pcj.fluo.demo/src/main/java/org/apache/rya/indexing/pcj/fluo/demo/DemoDriver.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.demo/src/main/java/org/apache/rya/indexing/pcj/fluo/demo/DemoDriver.java b/extras/rya.pcj.fluo/pcj.fluo.demo/src/main/java/org/apache/rya/indexing/pcj/fluo/demo/DemoDriver.java index e8f10b8..1ae02dd 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.demo/src/main/java/org/apache/rya/indexing/pcj/fluo/demo/DemoDriver.java +++ b/extras/rya.pcj.fluo/pcj.fluo.demo/src/main/java/org/apache/rya/indexing/pcj/fluo/demo/DemoDriver.java @@ -304,7 +304,7 @@ public class DemoDriver { // Provide export parameters child test classes may provide to the export observer. final HashMap<String, String> exportParams = new HashMap<>(); final RyaExportParameters ryaParams = new RyaExportParameters(exportParams); - ryaParams.setExportToRya(true); + ryaParams.setUseRyaBindingSetExporter(true); ryaParams.setAccumuloInstanceName(accumulo.getInstanceName()); ryaParams.setZookeeperServers(accumulo.getZooKeepers()); ryaParams.setExporterUsername("root"); http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/05147266/extras/rya.pcj.fluo/pcj.fluo.demo/src/main/java/org/apache/rya/indexing/pcj/fluo/demo/FluoAndHistoricPcjsDemo.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.demo/src/main/java/org/apache/rya/indexing/pcj/fluo/demo/FluoAndHistoricPcjsDemo.java b/extras/rya.pcj.fluo/pcj.fluo.demo/src/main/java/org/apache/rya/indexing/pcj/fluo/demo/FluoAndHistoricPcjsDemo.java index f25b573..4070849 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.demo/src/main/java/org/apache/rya/indexing/pcj/fluo/demo/FluoAndHistoricPcjsDemo.java +++ b/extras/rya.pcj.fluo/pcj.fluo.demo/src/main/java/org/apache/rya/indexing/pcj/fluo/demo/FluoAndHistoricPcjsDemo.java @@ -18,7 +18,6 @@ */ package org.apache.rya.indexing.pcj.fluo.demo; -import java.io.IOException; import java.util.Set; import org.apache.accumulo.core.client.Connector; @@ -35,22 +34,20 @@ import org.apache.rya.api.persist.RyaDAOException; import org.apache.rya.api.resolver.RyaToRdfConversions; import org.apache.rya.indexing.pcj.fluo.api.CreateFluoPcj; import org.apache.rya.indexing.pcj.fluo.api.InsertTriples; +import org.apache.rya.indexing.pcj.fluo.app.query.UnsupportedQueryException; import org.apache.rya.indexing.pcj.storage.PcjException; import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage; import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage.CloseableIterator; -import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage.PCJStorageException; import org.apache.rya.indexing.pcj.storage.accumulo.AccumuloPcjStorage; import org.apache.rya.rdftriplestore.RyaSailRepository; import org.openrdf.model.Statement; import org.openrdf.query.BindingSet; import org.openrdf.query.MalformedQueryException; -import org.openrdf.query.QueryEvaluationException; import org.openrdf.query.parser.ParsedQuery; import org.openrdf.query.parser.sparql.SPARQLParser; import org.openrdf.queryrender.sparql.SPARQLQueryRenderer; import org.openrdf.repository.RepositoryConnection; import org.openrdf.repository.RepositoryException; -import org.openrdf.sail.SailException; import com.google.common.base.Optional; import com.google.common.collect.Sets; @@ -181,7 +178,7 @@ public class FluoAndHistoricPcjsDemo implements Demo { // Tell the Fluo app to maintain it. new CreateFluoPcj().withRyaIntegration(pcjId, pcjStorage, fluoClient, accumuloConn, ryaTablePrefix); - } catch (MalformedQueryException | PcjException | RyaDAOException e) { + } catch (MalformedQueryException | PcjException | RyaDAOException | UnsupportedQueryException e) { throw new DemoExecutionException("Error while using Fluo to compute and export historic matches, so the demo can not continue. Exiting.", e); } http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/05147266/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/api/GetPcjMetadataIT.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/api/GetPcjMetadataIT.java b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/api/GetPcjMetadataIT.java index 263a19e..7676657 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/api/GetPcjMetadataIT.java +++ b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/api/GetPcjMetadataIT.java @@ -32,6 +32,7 @@ import org.apache.fluo.api.client.FluoFactory; import org.apache.rya.api.persist.RyaDAOException; import org.apache.rya.indexing.pcj.fluo.api.GetPcjMetadata.NotInAccumuloException; import org.apache.rya.indexing.pcj.fluo.api.GetPcjMetadata.NotInFluoException; +import org.apache.rya.indexing.pcj.fluo.app.query.UnsupportedQueryException; import org.apache.rya.indexing.pcj.storage.PcjException; import org.apache.rya.indexing.pcj.storage.PcjMetadata; import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage; @@ -53,7 +54,7 @@ import com.google.common.collect.Sets; public class GetPcjMetadataIT extends RyaExportITBase { @Test - public void getMetadataByQueryId() throws RepositoryException, MalformedQueryException, SailException, QueryEvaluationException, PcjException, NotInFluoException, NotInAccumuloException, RyaDAOException { + public void getMetadataByQueryId() throws RepositoryException, MalformedQueryException, SailException, QueryEvaluationException, PcjException, NotInFluoException, NotInAccumuloException, RyaDAOException, UnsupportedQueryException { final String sparql = "SELECT ?x " + "WHERE { " + @@ -82,7 +83,7 @@ public class GetPcjMetadataIT extends RyaExportITBase { } @Test - public void getAllMetadata() throws MalformedQueryException, SailException, QueryEvaluationException, PcjException, NotInFluoException, NotInAccumuloException, AccumuloException, AccumuloSecurityException, RyaDAOException { + public void getAllMetadata() throws MalformedQueryException, SailException, QueryEvaluationException, PcjException, NotInFluoException, NotInAccumuloException, AccumuloException, AccumuloSecurityException, RyaDAOException, UnsupportedQueryException { final Connector accumuloConn = super.getAccumuloConnector(); final PrecomputedJoinStorage pcjStorage = new AccumuloPcjStorage(accumuloConn, getRyaInstanceName()); http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/05147266/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/api/ListQueryIdsIT.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/api/ListQueryIdsIT.java b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/api/ListQueryIdsIT.java index e3914bd..3310690 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/api/ListQueryIdsIT.java +++ b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/api/ListQueryIdsIT.java @@ -18,7 +18,7 @@ */ package org.apache.rya.indexing.pcj.fluo.api; -import static org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns.QUERY_ID; +import static org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns.QUERY_NODE_ID; import static org.junit.Assert.assertEquals; import java.util.List; @@ -49,10 +49,10 @@ public class ListQueryIdsIT extends RyaExportITBase { try(FluoClient fluoClient = FluoFactory.newClient(super.getFluoConfiguration())) { // Store a few SPARQL/Query ID pairs in the Fluo table. try(Transaction tx = fluoClient.newTransaction()) { - tx.set("SPARQL_3", QUERY_ID, "ID_3"); - tx.set("SPARQL_1", QUERY_ID, "ID_1"); - tx.set("SPARQL_4", QUERY_ID, "ID_4"); - tx.set("SPARQL_2", QUERY_ID, "ID_2"); + tx.set("SPARQL_3", QUERY_NODE_ID, "ID_3"); + tx.set("SPARQL_1", QUERY_NODE_ID, "ID_1"); + tx.set("SPARQL_4", QUERY_NODE_ID, "ID_4"); + tx.set("SPARQL_2", QUERY_NODE_ID, "ID_2"); tx.commit(); } http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/05147266/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryMetadataDAOIT.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryMetadataDAOIT.java b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryMetadataDAOIT.java index 315dddb..45492de 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryMetadataDAOIT.java +++ b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryMetadataDAOIT.java @@ -29,9 +29,9 @@ import org.apache.fluo.api.client.FluoClient; import org.apache.fluo.api.client.FluoFactory; import org.apache.fluo.api.client.Snapshot; import org.apache.fluo.api.client.Transaction; +import org.apache.rya.api.client.CreatePCJ.ExportStrategy; +import org.apache.rya.api.client.CreatePCJ.QueryType; import org.apache.rya.indexing.pcj.fluo.app.ConstructGraph; -import org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.ExportStrategy; -import org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.QueryType; import org.apache.rya.indexing.pcj.fluo.app.NodeType; import org.apache.rya.indexing.pcj.fluo.app.query.AggregationMetadata.AggregationElement; import org.apache.rya.indexing.pcj.fluo.app.query.AggregationMetadata.AggregationType; @@ -148,11 +148,11 @@ public class FluoQueryMetadataDAOIT extends RyaExportITBase { // Create the object that will be serialized. String queryId = NodeType.generateNewFluoIdForType(NodeType.QUERY); final QueryMetadata.Builder builder = QueryMetadata.builder(queryId); - builder.setQueryType(QueryType.Projection); + builder.setQueryType(QueryType.PROJECTION); builder.setVarOrder(new VariableOrder("y;s;d")); builder.setSparql("sparql string"); builder.setChildNodeId("childNodeId"); - builder.setExportStrategies(new HashSet<>(Arrays.asList(ExportStrategy.Kafka))); + builder.setExportStrategies(new HashSet<>(Arrays.asList(ExportStrategy.KAFKA))); final QueryMetadata originalMetadata = builder.build(); try(FluoClient fluoClient = FluoFactory.newClient(super.getFluoConfiguration())) { @@ -338,7 +338,7 @@ public class FluoQueryMetadataDAOIT extends RyaExportITBase { } @Test - public void fluoQueryTest() throws MalformedQueryException { + public void fluoQueryTest() throws MalformedQueryException, UnsupportedQueryException { final FluoQueryMetadataDAO dao = new FluoQueryMetadataDAO(); // Create the object that will be serialized. @@ -357,7 +357,7 @@ public class FluoQueryMetadataDAOIT extends RyaExportITBase { builder.setFluoQueryId(NodeType.generateNewFluoIdForType(NodeType.QUERY)); final FluoQuery originalQuery = builder.build(); - assertEquals(QueryType.Projection, originalQuery.getQueryType()); + assertEquals(QueryType.PROJECTION, originalQuery.getQueryType()); assertEquals(false, originalQuery.getConstructQueryMetadata().isPresent()); try(FluoClient fluoClient = FluoFactory.newClient(super.getFluoConfiguration())) { @@ -379,7 +379,7 @@ public class FluoQueryMetadataDAOIT extends RyaExportITBase { } @Test - public void fluoConstructQueryTest() throws MalformedQueryException { + public void fluoConstructQueryTest() throws MalformedQueryException, UnsupportedQueryException { final FluoQueryMetadataDAO dao = new FluoQueryMetadataDAO(); // Create the object that will be serialized. @@ -398,7 +398,7 @@ public class FluoQueryMetadataDAOIT extends RyaExportITBase { builder.setFluoQueryId(NodeType.generateNewFluoIdForType(NodeType.QUERY)); final FluoQuery originalQuery = builder.build(); - assertEquals(QueryType.Construct, originalQuery.getQueryType()); + assertEquals(QueryType.CONSTRUCT, originalQuery.getQueryType()); assertEquals(true, originalQuery.getConstructQueryMetadata().isPresent()); try(FluoClient fluoClient = FluoFactory.newClient(super.getFluoConfiguration())) { @@ -421,7 +421,7 @@ public class FluoQueryMetadataDAOIT extends RyaExportITBase { @Test - public void fluoNestedQueryTest() throws MalformedQueryException { + public void fluoNestedQueryTest() throws MalformedQueryException, UnsupportedQueryException { final FluoQueryMetadataDAO dao = new FluoQueryMetadataDAO(); // Create the object that will be serialized. @@ -442,7 +442,7 @@ public class FluoQueryMetadataDAOIT extends RyaExportITBase { builder.setFluoQueryId(NodeType.generateNewFluoIdForType(NodeType.QUERY)); final FluoQuery originalQuery = builder.build(); - assertEquals(QueryType.Projection, originalQuery.getQueryType()); + assertEquals(QueryType.PROJECTION, originalQuery.getQueryType()); try(FluoClient fluoClient = FluoFactory.newClient(super.getFluoConfiguration())) { // Write it to the Fluo table. @@ -463,7 +463,7 @@ public class FluoQueryMetadataDAOIT extends RyaExportITBase { } @Test - public void fluoNestedConstructQueryTest() throws MalformedQueryException { + public void fluoNestedConstructQueryTest() throws MalformedQueryException, UnsupportedQueryException { final FluoQueryMetadataDAO dao = new FluoQueryMetadataDAO(); // Create the object that will be serialized. @@ -488,7 +488,7 @@ public class FluoQueryMetadataDAOIT extends RyaExportITBase { builder.setFluoQueryId(NodeType.generateNewFluoIdForType(NodeType.QUERY)); final FluoQuery originalQuery = builder.build(); - assertEquals(QueryType.Construct, originalQuery.getQueryType()); + assertEquals(QueryType.CONSTRUCT, originalQuery.getQueryType()); try(FluoClient fluoClient = FluoFactory.newClient(super.getFluoConfiguration())) { // Write it to the Fluo table. http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/05147266/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/BatchIT.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/BatchIT.java b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/BatchIT.java index 32d0e41..47a2f29 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/BatchIT.java +++ b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/BatchIT.java @@ -53,6 +53,7 @@ import org.apache.rya.indexing.pcj.fluo.app.query.FluoQuery; import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns; import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryMetadataDAO; import org.apache.rya.indexing.pcj.fluo.app.query.JoinMetadata.JoinType; +import org.apache.rya.indexing.pcj.fluo.app.query.UnsupportedQueryException; import org.apache.rya.indexing.pcj.fluo.app.util.FluoQueryUtils; import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage; import org.apache.rya.indexing.pcj.storage.accumulo.AccumuloPcjStorage; @@ -343,7 +344,7 @@ public class BatchIT extends RyaExportITBase { return statements; } - private List<String> getNodeIdStrings(FluoClient fluoClient, String queryId) { + private List<String> getNodeIdStrings(FluoClient fluoClient, String queryId) throws UnsupportedQueryException { List<String> nodeStrings; try (Snapshot sx = fluoClient.newSnapshot()) { FluoQuery query = dao.readFluoQuery(sx, queryId); http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/05147266/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/CreateDeleteIT.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/CreateDeleteIT.java b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/CreateDeleteIT.java index 7c4caa4..a1d76cb 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/CreateDeleteIT.java +++ b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/CreateDeleteIT.java @@ -33,6 +33,7 @@ import org.apache.fluo.api.client.scanner.ColumnScanner; import org.apache.fluo.api.client.scanner.RowScanner; import org.apache.fluo.api.data.Bytes; import org.apache.fluo.api.data.Span; +import org.apache.rya.api.client.CreatePCJ.ExportStrategy; import org.apache.rya.api.client.RyaClient; import org.apache.rya.api.client.accumulo.AccumuloRyaClientFactory; import org.apache.rya.indexing.pcj.fluo.api.DeleteFluoPcj; @@ -79,7 +80,7 @@ public class CreateDeleteIT extends RyaExportITBase { try(FluoClient fluoClient = FluoFactory.newClient(super.getFluoConfiguration())) { // Ensure the data was loaded. final List<Bytes> rows = getFluoTableEntries(fluoClient); - assertEquals(20, rows.size()); + assertEquals(18, rows.size()); // Delete the PCJ from the Fluo application. new DeleteFluoPcj(1).deletePcj(fluoClient, pcjId); @@ -111,7 +112,7 @@ public class CreateDeleteIT extends RyaExportITBase { try(FluoClient fluoClient = FluoFactory.newClient(super.getFluoConfiguration())) { // Ensure the data was loaded. final List<Bytes> rows = getFluoTableEntries(fluoClient); - assertEquals(12, rows.size()); + assertEquals(10, rows.size()); // Delete the PCJ from the Fluo application. new DeleteFluoPcj(1).deletePcj(fluoClient, pcjId); @@ -130,7 +131,7 @@ public class CreateDeleteIT extends RyaExportITBase { // Register the PCJ with Rya. final RyaClient ryaClient = AccumuloRyaClientFactory.build(createConnectionDetails(), getAccumuloConnector()); - final String pcjId = ryaClient.getCreatePCJ().createPCJ(getRyaInstanceName(), sparql); + final String pcjId = ryaClient.getCreatePCJ().createPCJ(getRyaInstanceName(), sparql, Sets.newHashSet(ExportStrategy.NO_OP_EXPORT)); // Write the data to Rya. final SailRepositoryConnection ryaConn = super.getRyaSailRepository().getConnection(); http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/05147266/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/KafkaExportIT.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/KafkaExportIT.java b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/KafkaExportIT.java index f9f55d0..8911f56 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/KafkaExportIT.java +++ b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/KafkaExportIT.java @@ -92,6 +92,8 @@ public class KafkaExportIT extends KafkaExportITBase { // Create the PCJ in Fluo and load the statements into Rya. final String pcjId = loadData(sparql, statements); + FluoITHelper.printFluoTable(super.getFluoConfiguration()); + // The expected results of the SPARQL query once the PCJ has been computed. final Set<BindingSet> expectedResult = new HashSet<>(); @@ -590,9 +592,9 @@ public class KafkaExportIT extends KafkaExportITBase { // Read all of the results from the Kafka topic. final Set<VisibilityBindingSet> results = new HashSet<>(); - try(final KafkaConsumer<Integer, VisibilityBindingSet> consumer = makeConsumer(pcjId)) { - final ConsumerRecords<Integer, VisibilityBindingSet> records = consumer.poll(5000); - final Iterator<ConsumerRecord<Integer, VisibilityBindingSet>> recordIterator = records.iterator(); + try(final KafkaConsumer<String, VisibilityBindingSet> consumer = makeConsumer(pcjId)) { + final ConsumerRecords<String, VisibilityBindingSet> records = consumer.poll(5000); + final Iterator<ConsumerRecord<String, VisibilityBindingSet>> recordIterator = records.iterator(); while (recordIterator.hasNext()) { results.add( recordIterator.next().value() ); } @@ -607,9 +609,9 @@ public class KafkaExportIT extends KafkaExportITBase { // Read the results from the Kafka topic. The last one has the final aggregation result. VisibilityBindingSet result = null; - try(final KafkaConsumer<Integer, VisibilityBindingSet> consumer = makeConsumer(pcjId)) { - final ConsumerRecords<Integer, VisibilityBindingSet> records = consumer.poll(5000); - final Iterator<ConsumerRecord<Integer, VisibilityBindingSet>> recordIterator = records.iterator(); + try(final KafkaConsumer<String, VisibilityBindingSet> consumer = makeConsumer(pcjId)) { + final ConsumerRecords<String, VisibilityBindingSet> records = consumer.poll(5000); + final Iterator<ConsumerRecord<String, VisibilityBindingSet>> recordIterator = records.iterator(); while (recordIterator.hasNext()) { result = recordIterator.next().value(); } @@ -625,9 +627,9 @@ public class KafkaExportIT extends KafkaExportITBase { // The key in this map is a Binding Set containing only the group by variables. final Map<BindingSet, VisibilityBindingSet> results = new HashMap<>(); - try(final KafkaConsumer<Integer, VisibilityBindingSet> consumer = makeConsumer(pcjId)) { - final ConsumerRecords<Integer, VisibilityBindingSet> records = consumer.poll(5000); - final Iterator<ConsumerRecord<Integer, VisibilityBindingSet>> recordIterator = records.iterator(); + try(final KafkaConsumer<String, VisibilityBindingSet> consumer = makeConsumer(pcjId)) { + final ConsumerRecords<String, VisibilityBindingSet> records = consumer.poll(5000); + final Iterator<ConsumerRecord<String, VisibilityBindingSet>> recordIterator = records.iterator(); while (recordIterator.hasNext()) { final VisibilityBindingSet visBindingSet = recordIterator.next().value(); http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/05147266/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/KafkaRyaSubGraphExportIT.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/KafkaRyaSubGraphExportIT.java b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/KafkaRyaSubGraphExportIT.java index ca8de0d..b2944ca 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/KafkaRyaSubGraphExportIT.java +++ b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/KafkaRyaSubGraphExportIT.java @@ -33,13 +33,12 @@ import java.util.stream.Collectors; import org.apache.fluo.api.client.FluoClient; import org.apache.fluo.api.config.ObserverSpecification; import org.apache.fluo.core.client.FluoClientImpl; +import org.apache.fluo.recipes.test.FluoITHelper; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; -import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.serialization.StringDeserializer; -import org.apache.kafka.common.serialization.StringSerializer; import org.apache.rya.accumulo.AccumuloRyaDAO; import org.apache.rya.api.domain.RyaStatement; import org.apache.rya.api.domain.RyaSubGraph; @@ -48,13 +47,14 @@ import org.apache.rya.api.domain.RyaURI; import org.apache.rya.api.resolver.RdfToRyaConversions; import org.apache.rya.indexing.pcj.fluo.ConstructGraphTestUtils; import org.apache.rya.indexing.pcj.fluo.api.CreateFluoPcj; -import org.apache.rya.indexing.pcj.fluo.app.export.kafka.KafkaExportParameters; +import org.apache.rya.indexing.pcj.fluo.app.export.kafka.KafkaSubGraphExporterParameters; import org.apache.rya.indexing.pcj.fluo.app.export.kafka.RyaSubGraphKafkaSerDe; import org.apache.rya.indexing.pcj.fluo.app.observers.AggregationObserver; import org.apache.rya.indexing.pcj.fluo.app.observers.ConstructQueryResultObserver; import org.apache.rya.indexing.pcj.fluo.app.observers.FilterObserver; import org.apache.rya.indexing.pcj.fluo.app.observers.JoinObserver; import org.apache.rya.indexing.pcj.fluo.app.observers.ProjectionObserver; +import org.apache.rya.indexing.pcj.fluo.app.observers.QueryResultObserver; import org.apache.rya.indexing.pcj.fluo.app.observers.StatementPatternObserver; import org.apache.rya.indexing.pcj.fluo.app.observers.TripleObserver; import org.apache.rya.pcj.fluo.test.base.KafkaExportITBase; @@ -88,22 +88,18 @@ public class KafkaRyaSubGraphExportIT extends KafkaExportITBase { observers.add(new ObserverSpecification(FilterObserver.class.getName())); observers.add(new ObserverSpecification(AggregationObserver.class.getName())); observers.add(new ObserverSpecification(ProjectionObserver.class.getName())); + observers.add(new ObserverSpecification(ConstructQueryResultObserver.class.getName())); + // Configure the export observer to export new PCJ results to the mini // accumulo cluster. final HashMap<String, String> exportParams = new HashMap<>(); - final KafkaExportParameters kafkaParams = new KafkaExportParameters(exportParams); - kafkaParams.setExportToKafka(true); - - // Configure the Kafka Producer - final Properties producerConfig = new Properties(); - producerConfig.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BROKERHOST + ":" + BROKERPORT); - producerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); - producerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, RyaSubGraphKafkaSerDe.class.getName()); - kafkaParams.addAllProducerConfig(producerConfig); + final KafkaSubGraphExporterParameters kafkaParams = new KafkaSubGraphExporterParameters(exportParams); + kafkaParams.setUseKafkaSubgraphExporter(true); + kafkaParams.setKafkaBootStrapServers(BROKERHOST + ":" + BROKERPORT); - final ObserverSpecification exportObserverConfig = new ObserverSpecification(ConstructQueryResultObserver.class.getName(), + final ObserverSpecification exportObserverConfig = new ObserverSpecification(QueryResultObserver.class.getName(), exportParams); observers.add(exportObserverConfig); http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/05147266/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/QueryIT.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/QueryIT.java b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/QueryIT.java index 6ecec02..0aefaca 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/QueryIT.java +++ b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/QueryIT.java @@ -34,9 +34,11 @@ import javax.xml.datatype.DatatypeFactory; import org.apache.accumulo.core.client.Connector; import org.apache.fluo.api.client.FluoClient; import org.apache.fluo.core.client.FluoClientImpl; +import org.apache.rya.api.client.CreatePCJ.ExportStrategy; import org.apache.rya.api.client.RyaClient; import org.apache.rya.api.client.accumulo.AccumuloRyaClientFactory; import org.apache.rya.indexing.pcj.fluo.api.CreateFluoPcj; +import org.apache.rya.indexing.pcj.fluo.app.query.UnsupportedQueryException; import org.apache.rya.indexing.pcj.storage.PeriodicQueryResultStorage; import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage; import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage.CloseableIterator; @@ -877,7 +879,7 @@ public class QueryIT extends RyaExportITBase { runTest(query, statements, expectedResults, ExporterType.Periodic); } - @Test(expected= IllegalArgumentException.class) + @Test(expected= UnsupportedQueryException.class) public void nestedConstructPeriodicQueryWithAggregationAndGroupBy() throws Exception { String query = "prefix function: <http://org.apache.rya/function#> " // n + "prefix time: <http://www.w3.org/2006/time#> " // n @@ -924,7 +926,7 @@ public class QueryIT extends RyaExportITBase { PeriodicQueryResultStorage periodicStorage = new AccumuloPeriodicQueryResultStorage(accumuloConn, getRyaInstanceName()); String periodicId = periodicStorage.createPeriodicQuery(sparql); try (FluoClient fluo = new FluoClientImpl(super.getFluoConfiguration())) { - new CreateFluoPcj().createPcj(periodicId, sparql, fluo); + new CreateFluoPcj().createPcj(periodicId, sparql, Sets.newHashSet(ExportStrategy.RYA), fluo); } addStatementsAndWait(statements); http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/05147266/extras/rya.pcj.fluo/pcj.fluo.test.base/src/main/java/org/apache/rya/pcj/fluo/test/base/KafkaExportITBase.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.test.base/src/main/java/org/apache/rya/pcj/fluo/test/base/KafkaExportITBase.java b/extras/rya.pcj.fluo/pcj.fluo.test.base/src/main/java/org/apache/rya/pcj/fluo/test/base/KafkaExportITBase.java index c828a20..ed9ce60 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.test.base/src/main/java/org/apache/rya/pcj/fluo/test/base/KafkaExportITBase.java +++ b/extras/rya.pcj.fluo/pcj.fluo.test.base/src/main/java/org/apache/rya/pcj/fluo/test/base/KafkaExportITBase.java @@ -37,25 +37,25 @@ import org.apache.accumulo.core.client.Instance; import org.apache.accumulo.minicluster.MiniAccumuloCluster; import org.apache.fluo.api.config.ObserverSpecification; import org.apache.fluo.recipes.test.AccumuloExportITBase; -import org.apache.fluo.recipes.test.FluoITHelper; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.producer.KafkaProducer; -import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; -import org.apache.kafka.common.serialization.StringSerializer; +import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.rya.accumulo.AccumuloRdfConfiguration; import org.apache.rya.accumulo.AccumuloRyaDAO; +import org.apache.rya.api.client.CreatePCJ.ExportStrategy; import org.apache.rya.api.client.Install.InstallConfiguration; import org.apache.rya.api.client.RyaClient; import org.apache.rya.api.client.accumulo.AccumuloConnectionDetails; import org.apache.rya.api.client.accumulo.AccumuloRyaClientFactory; import org.apache.rya.indexing.accumulo.ConfigUtils; import org.apache.rya.indexing.external.PrecomputedJoinIndexerConfig; -import org.apache.rya.indexing.pcj.fluo.app.export.kafka.KafkaExportParameters; -import org.apache.rya.indexing.pcj.fluo.app.export.kafka.RyaSubGraphKafkaSerDe; +import org.apache.rya.indexing.pcj.fluo.app.export.kafka.KafkaBindingSetExporterParameters; +import org.apache.rya.indexing.pcj.fluo.app.export.kafka.KafkaSubGraphExporterParameters; +import org.apache.rya.indexing.pcj.fluo.app.export.kafka.KryoVisibilityBindingSetSerializer; import org.apache.rya.indexing.pcj.fluo.app.observers.AggregationObserver; import org.apache.rya.indexing.pcj.fluo.app.observers.ConstructQueryResultObserver; import org.apache.rya.indexing.pcj.fluo.app.observers.FilterObserver; @@ -74,6 +74,8 @@ import org.openrdf.model.Statement; import org.openrdf.repository.sail.SailRepositoryConnection; import org.openrdf.sail.Sail; +import com.google.common.collect.Sets; + import kafka.admin.AdminUtils; import kafka.admin.RackAwareMode; import kafka.server.KafkaConfig; @@ -119,41 +121,20 @@ public class KafkaExportITBase extends AccumuloExportITBase { observers.add(new ObserverSpecification(FilterObserver.class.getName())); observers.add(new ObserverSpecification(AggregationObserver.class.getName())); observers.add(new ObserverSpecification(ProjectionObserver.class.getName())); + observers.add(new ObserverSpecification(ConstructQueryResultObserver.class.getName())); // Configure the export observer to export new PCJ results to the mini // accumulo cluster. final HashMap<String, String> exportParams = new HashMap<>(); - - final KafkaExportParameters kafkaParams = new KafkaExportParameters(exportParams); - kafkaParams.setExportToKafka(true); - - // Configure the Kafka Producer - final Properties producerConfig = new Properties(); - producerConfig.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BROKERHOST + ":" + BROKERPORT); - producerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer"); - producerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, - "org.apache.rya.indexing.pcj.fluo.app.export.kafka.KryoVisibilityBindingSetSerializer"); - kafkaParams.addAllProducerConfig(producerConfig); + final KafkaBindingSetExporterParameters kafkaParams = new KafkaBindingSetExporterParameters(exportParams); + kafkaParams.setUseKafkaBindingSetExporter(true); + kafkaParams.setKafkaBootStrapServers(BROKERHOST + ":" + BROKERPORT); + + final KafkaSubGraphExporterParameters kafkaConstructParams = new KafkaSubGraphExporterParameters(exportParams); + kafkaConstructParams.setUseKafkaSubgraphExporter(true); final ObserverSpecification exportObserverConfig = new ObserverSpecification(QueryResultObserver.class.getName(), exportParams); observers.add(exportObserverConfig); - - //create construct query observer and tell it not to export to Kafka - //it will only add results back into Fluo - HashMap<String, String> constructParams = new HashMap<>(); - final KafkaExportParameters kafkaConstructParams = new KafkaExportParameters(constructParams); - kafkaConstructParams.setExportToKafka(true); - - // Configure the Kafka Producer - final Properties constructProducerConfig = new Properties(); - constructProducerConfig.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BROKERHOST + ":" + BROKERPORT); - constructProducerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); - constructProducerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, RyaSubGraphKafkaSerDe.class.getName()); - kafkaConstructParams.addAllProducerConfig(constructProducerConfig); - - final ObserverSpecification constructExportObserverConfig = new ObserverSpecification(ConstructQueryResultObserver.class.getName(), - constructParams); - observers.add(constructExportObserverConfig); // Add the observers to the Fluo Configuration. super.getFluoConfiguration().addObservers(observers); @@ -323,21 +304,19 @@ public class KafkaExportITBase extends AccumuloExportITBase { consumer.close(); } - protected KafkaConsumer<Integer, VisibilityBindingSet> makeConsumer(final String TopicName) { + protected KafkaConsumer<String, VisibilityBindingSet> makeConsumer(final String TopicName) { // setup consumer final Properties consumerProps = new Properties(); consumerProps.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BROKERHOST + ":" + BROKERPORT); consumerProps.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "group0"); consumerProps.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, "consumer0"); - consumerProps.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, - "org.apache.kafka.common.serialization.IntegerDeserializer"); - consumerProps.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, - "org.apache.rya.indexing.pcj.fluo.app.export.kafka.KryoVisibilityBindingSetSerializer"); + consumerProps.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); + consumerProps.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KryoVisibilityBindingSetSerializer.class.getName()); // to make sure the consumer starts from the beginning of the topic consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); - final KafkaConsumer<Integer, VisibilityBindingSet> consumer = new KafkaConsumer<>(consumerProps); + final KafkaConsumer<String, VisibilityBindingSet> consumer = new KafkaConsumer<>(consumerProps); consumer.subscribe(Arrays.asList(TopicName)); return consumer; } @@ -353,7 +332,7 @@ public class KafkaExportITBase extends AccumuloExportITBase { final RyaClient ryaClient = AccumuloRyaClientFactory.build(new AccumuloConnectionDetails(ACCUMULO_USER, ACCUMULO_PASSWORD.toCharArray(), accInstance.getInstanceName(), accInstance.getZooKeepers()), accumuloConn); - final String pcjId = ryaClient.getCreatePCJ().createPCJ(RYA_INSTANCE_NAME, sparql); + final String pcjId = ryaClient.getCreatePCJ().createPCJ(RYA_INSTANCE_NAME, sparql, Sets.newHashSet(ExportStrategy.KAFKA)); // Write the data to Rya. final SailRepositoryConnection ryaConn = getRyaSailRepository().getConnection(); http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/05147266/extras/rya.pcj.fluo/pcj.fluo.test.base/src/main/java/org/apache/rya/pcj/fluo/test/base/RyaExportITBase.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.test.base/src/main/java/org/apache/rya/pcj/fluo/test/base/RyaExportITBase.java b/extras/rya.pcj.fluo/pcj.fluo.test.base/src/main/java/org/apache/rya/pcj/fluo/test/base/RyaExportITBase.java index 9c5732f..1c02db3 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.test.base/src/main/java/org/apache/rya/pcj/fluo/test/base/RyaExportITBase.java +++ b/extras/rya.pcj.fluo/pcj.fluo.test.base/src/main/java/org/apache/rya/pcj/fluo/test/base/RyaExportITBase.java @@ -65,7 +65,8 @@ public class RyaExportITBase extends FluoITBase { // Configure the export observer to export new PCJ results to the mini accumulo cluster. final HashMap<String, String> exportParams = new HashMap<>(); final RyaExportParameters ryaParams = new RyaExportParameters(exportParams); - ryaParams.setExportToRya(true); + ryaParams.setUseRyaBindingSetExporter(true); + ryaParams.setUsePeriodicBindingSetExporter(true); ryaParams.setRyaInstanceName(getRyaInstanceName()); ryaParams.setAccumuloInstanceName(super.getMiniAccumuloCluster().getInstanceName()); ryaParams.setZookeeperServers(super.getMiniAccumuloCluster().getZooKeepers()); http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/05147266/extras/rya.periodic.service/periodic.service.integration.tests/src/test/java/org/apache/rya/periodic/notification/application/PeriodicNotificationProviderIT.java ---------------------------------------------------------------------- diff --git a/extras/rya.periodic.service/periodic.service.integration.tests/src/test/java/org/apache/rya/periodic/notification/application/PeriodicNotificationProviderIT.java b/extras/rya.periodic.service/periodic.service.integration.tests/src/test/java/org/apache/rya/periodic/notification/application/PeriodicNotificationProviderIT.java index 4d1bc75..cf24974 100644 --- a/extras/rya.periodic.service/periodic.service.integration.tests/src/test/java/org/apache/rya/periodic/notification/application/PeriodicNotificationProviderIT.java +++ b/extras/rya.periodic.service/periodic.service.integration.tests/src/test/java/org/apache/rya/periodic/notification/application/PeriodicNotificationProviderIT.java @@ -26,6 +26,7 @@ import org.apache.fluo.api.client.FluoClient; import org.apache.fluo.core.client.FluoClientImpl; import org.apache.fluo.recipes.test.AccumuloExportITBase; import org.apache.rya.indexing.pcj.fluo.api.CreateFluoPcj; +import org.apache.rya.indexing.pcj.fluo.app.query.UnsupportedQueryException; import org.apache.rya.indexing.pcj.fluo.app.util.FluoQueryUtils; import org.apache.rya.periodic.notification.coordinator.PeriodicNotificationCoordinatorExecutor; import org.apache.rya.periodic.notification.notification.TimestampedNotification; @@ -38,7 +39,7 @@ import org.junit.Assert; public class PeriodicNotificationProviderIT extends AccumuloExportITBase { @Test - public void testProvider() throws MalformedQueryException, InterruptedException { + public void testProvider() throws MalformedQueryException, InterruptedException, UnsupportedQueryException { String sparql = "prefix function: <http://org.apache.rya/function#> " // n + "prefix time: <http://www.w3.org/2006/time#> " // n http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/05147266/extras/rya.periodic.service/periodic.service.integration.tests/src/test/java/org/apache/rya/periodic/notification/pruner/PeriodicNotificationBinPrunerIT.java ---------------------------------------------------------------------- diff --git a/extras/rya.periodic.service/periodic.service.integration.tests/src/test/java/org/apache/rya/periodic/notification/pruner/PeriodicNotificationBinPrunerIT.java b/extras/rya.periodic.service/periodic.service.integration.tests/src/test/java/org/apache/rya/periodic/notification/pruner/PeriodicNotificationBinPrunerIT.java index 27acc9c..bb98b7f 100644 --- a/extras/rya.periodic.service/periodic.service.integration.tests/src/test/java/org/apache/rya/periodic/notification/pruner/PeriodicNotificationBinPrunerIT.java +++ b/extras/rya.periodic.service/periodic.service.integration.tests/src/test/java/org/apache/rya/periodic/notification/pruner/PeriodicNotificationBinPrunerIT.java @@ -38,12 +38,10 @@ import org.apache.fluo.api.data.Bytes; import org.apache.fluo.api.data.ColumnValue; import org.apache.fluo.api.data.Span; import org.apache.fluo.core.client.FluoClientImpl; -import org.apache.fluo.recipes.test.FluoITHelper; import org.apache.rya.api.resolver.RdfToRyaConversions; import org.apache.rya.indexing.pcj.fluo.api.InsertTriples; import org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants; import org.apache.rya.indexing.pcj.fluo.app.NodeType; -import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns; import org.apache.rya.indexing.pcj.fluo.app.util.PeriodicQueryUtil; import org.apache.rya.indexing.pcj.fluo.app.util.RowKeyUtil; import org.apache.rya.indexing.pcj.storage.PeriodicQueryResultStorage; @@ -252,14 +250,14 @@ public class PeriodicNotificationBinPrunerIT extends RyaExportITBase { } } - private void compareFluoCounts(FluoClient client, String queryId, long bin) { + private void compareFluoCounts(FluoClient client, String pcjId, long bin) { QueryBindingSet bs = new QueryBindingSet(); bs.addBinding(IncrementalUpdateConstants.PERIODIC_BIN_ID, new LiteralImpl(Long.toString(bin), XMLSchema.LONG)); VariableOrder varOrder = new VariableOrder(IncrementalUpdateConstants.PERIODIC_BIN_ID); try(Snapshot sx = client.newSnapshot()) { - String fluoQueryId = sx.get(Bytes.of(queryId), FluoQueryColumns.PCJ_ID_QUERY_ID).toString(); + String fluoQueryId = NodeType.generateNewIdForType(NodeType.QUERY, pcjId); Set<String> ids = new HashSet<>(); PeriodicQueryUtil.getPeriodicQueryNodeAncestorIds(sx, fluoQueryId, ids); for(String id: ids) { http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/05147266/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/api/CreatePeriodicQuery.java ---------------------------------------------------------------------- diff --git a/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/api/CreatePeriodicQuery.java b/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/api/CreatePeriodicQuery.java index 6aade52..60a3e7c 100644 --- a/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/api/CreatePeriodicQuery.java +++ b/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/api/CreatePeriodicQuery.java @@ -21,8 +21,10 @@ package org.apache.rya.periodic.notification.api; import java.util.Optional; import org.apache.fluo.api.client.FluoClient; +import org.apache.rya.api.client.CreatePCJ.ExportStrategy; import org.apache.rya.indexing.pcj.fluo.api.CreateFluoPcj; import org.apache.rya.indexing.pcj.fluo.app.query.PeriodicQueryNode; +import org.apache.rya.indexing.pcj.fluo.app.query.UnsupportedQueryException; import org.apache.rya.indexing.pcj.fluo.app.util.FluoQueryUtils; import org.apache.rya.indexing.pcj.fluo.app.util.PeriodicQueryUtil; import org.apache.rya.indexing.pcj.storage.PeriodicQueryResultStorage; @@ -32,7 +34,7 @@ import org.apache.rya.periodic.notification.notification.PeriodicNotification; import org.openrdf.query.MalformedQueryException; import org.openrdf.query.algebra.evaluation.function.Function; -import com.google.common.base.Preconditions; +import com.google.common.collect.Sets; /** * Object that creates a Periodic Query. A Periodic Query is any query @@ -82,17 +84,22 @@ public class CreatePeriodicQuery { Optional<PeriodicQueryNode> optNode = PeriodicQueryUtil.getPeriodicNode(sparql); if(optNode.isPresent()) { PeriodicQueryNode periodicNode = optNode.get(); + String pcjId = FluoQueryUtils.createNewPcjId(); + + //register query with Fluo CreateFluoPcj createPcj = new CreateFluoPcj(); - String queryId = createPcj.createPcj(sparql, fluoClient).getQueryId(); - queryId = FluoQueryUtils.convertFluoQueryIdToPcjId(queryId); - periodicStorage.createPeriodicQuery(queryId, sparql); - PeriodicNotification notification = PeriodicNotification.builder().id(queryId).period(periodicNode.getPeriod()) + createPcj.createPcj(pcjId, sparql, Sets.newHashSet(ExportStrategy.RYA), fluoClient); + + //register query with PeriodicResultStorage table + periodicStorage.createPeriodicQuery(pcjId, sparql); + //create notification + PeriodicNotification notification = PeriodicNotification.builder().id(pcjId).period(periodicNode.getPeriod()) .timeUnit(periodicNode.getUnit()).build(); return notification; } else { throw new RuntimeException("Invalid PeriodicQuery. Query must possess a PeriodicQuery Filter."); } - } catch (MalformedQueryException | PeriodicQueryStorageException e) { + } catch (MalformedQueryException | PeriodicQueryStorageException | UnsupportedQueryException e) { throw new RuntimeException(e); } }
