http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/e387818b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/IncrementalUpdateConstants.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/IncrementalUpdateConstants.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/IncrementalUpdateConstants.java index 2084907..4b6f44e 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/IncrementalUpdateConstants.java +++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/IncrementalUpdateConstants.java @@ -35,9 +35,13 @@ public class IncrementalUpdateConstants { public static final String FILTER_PREFIX = "FILTER"; public static final String AGGREGATION_PREFIX = "AGGREGATION"; public static final String QUERY_PREFIX = "QUERY"; + public static final String PROJECTION_PREFIX = "PROJECTION"; public static final String CONSTRUCT_PREFIX = "CONSTRUCT"; public static final String PERIODIC_QUERY_PREFIX = "PERIODIC_QUERY"; + public static enum QueryType{Construct, Projection, Periodic}; + public static enum ExportStrategy{Rya, Kafka}; + public static final String PERIODIC_BIN_ID = PeriodicQueryResultStorage.PeriodicBinId; public static final String URI_TYPE = "http://www.w3.org/2001/XMLSchema#anyURI";
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/e387818b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/JoinResultUpdater.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/JoinResultUpdater.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/JoinResultUpdater.java index 9b65b34..0f448a6 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/JoinResultUpdater.java +++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/JoinResultUpdater.java @@ -139,9 +139,9 @@ public class JoinResultUpdater { // Create the Row Key for the emitted binding set. It does not contain visibilities. final Bytes resultRow = RowKeyUtil.makeRowKey(joinMetadata.getNodeId(), joinVarOrder, newJoinResult); - - // Only insert the join Binding Set if it is new. - if(tx.get(resultRow, FluoQueryColumns.JOIN_BINDING_SET) == null) { + + // Only insert the join Binding Set if it is new or BindingSet contains values not used in resultRow. + if(tx.get(resultRow, FluoQueryColumns.JOIN_BINDING_SET) == null || joinVarOrder.getVariableOrders().size() < newJoinResult.size()) { // Create the Node Value. It does contain visibilities. final Bytes nodeValueBytes = BS_SERDE.serialize(newJoinResult); @@ -210,18 +210,28 @@ public class JoinResultUpdater { final NodeType nodeType = NodeType.fromNodeId(nodeId).get(); switch(nodeType) { case STATEMENT_PATTERN: - return queryDao.readStatementPatternMetadata(tx, nodeId).getVariableOrder(); - + return removeBinIdFromVarOrder(queryDao.readStatementPatternMetadata(tx, nodeId).getVariableOrder()); case FILTER: - return queryDao.readFilterMetadata(tx, nodeId).getVariableOrder(); - + return removeBinIdFromVarOrder(queryDao.readFilterMetadata(tx, nodeId).getVariableOrder()); case JOIN: - return queryDao.readJoinMetadata(tx, nodeId).getVariableOrder(); - + return removeBinIdFromVarOrder(queryDao.readJoinMetadata(tx, nodeId).getVariableOrder()); + case PROJECTION: + return removeBinIdFromVarOrder(queryDao.readProjectionMetadata(tx, nodeId).getVariableOrder()); default: throw new IllegalArgumentException("Could not figure out the variable order for node with ID: " + nodeId); } } + + private VariableOrder removeBinIdFromVarOrder(VariableOrder varOrder) { + List<String> varOrderList = varOrder.getVariableOrders(); + if(varOrderList.get(0).equals(IncrementalUpdateConstants.PERIODIC_BIN_ID)) { + List<String> updatedVarOrderList = Lists.newArrayList(varOrderList); + updatedVarOrderList.remove(0); + return new VariableOrder(updatedVarOrderList); + } else { + return varOrder; + } + } /** * Assuming that the common variables between two children are already @@ -285,6 +295,9 @@ public class JoinResultUpdater { case JOIN: column = FluoQueryColumns.JOIN_BINDING_SET; break; + case PROJECTION: + column = FluoQueryColumns.PROJECTION_BINDING_SET; + break; default: throw new IllegalArgumentException("The child node's sibling is not of type StatementPattern, Join, Left Join, or Filter."); } http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/e387818b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/NodeType.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/NodeType.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/NodeType.java index b8fc2d9..a6fc5ea 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/NodeType.java +++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/NodeType.java @@ -24,10 +24,12 @@ import static org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.CO import static org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.FILTER_PREFIX; import static org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.JOIN_PREFIX; import static org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.QUERY_PREFIX; +import static org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.PROJECTION_PREFIX; import static org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.SP_PREFIX; import static org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.PERIODIC_QUERY_PREFIX; import java.util.List; +import java.util.UUID; import org.apache.fluo.api.data.Column; import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns; @@ -39,13 +41,14 @@ import com.google.common.base.Optional; * Represents the different types of nodes that a Query may have. */ public enum NodeType { - PERIODIC_QUERY(QueryNodeMetadataColumns.PERIODIC_QUERY_COLUMNS, FluoQueryColumns.PERIODIC_QUERY_BINDING_SET), - FILTER (QueryNodeMetadataColumns.FILTER_COLUMNS, FluoQueryColumns.FILTER_BINDING_SET), - JOIN(QueryNodeMetadataColumns.JOIN_COLUMNS, FluoQueryColumns.JOIN_BINDING_SET), - STATEMENT_PATTERN(QueryNodeMetadataColumns.STATEMENTPATTERN_COLUMNS, FluoQueryColumns.STATEMENT_PATTERN_BINDING_SET), - QUERY(QueryNodeMetadataColumns.QUERY_COLUMNS, FluoQueryColumns.QUERY_BINDING_SET), - AGGREGATION(QueryNodeMetadataColumns.AGGREGATION_COLUMNS, FluoQueryColumns.AGGREGATION_BINDING_SET), - CONSTRUCT(QueryNodeMetadataColumns.CONSTRUCT_COLUMNS, FluoQueryColumns.CONSTRUCT_STATEMENTS); + PERIODIC_QUERY(IncrementalUpdateConstants.PERIODIC_QUERY_PREFIX, QueryNodeMetadataColumns.PERIODIC_QUERY_COLUMNS, FluoQueryColumns.PERIODIC_QUERY_BINDING_SET), + FILTER (IncrementalUpdateConstants.FILTER_PREFIX, QueryNodeMetadataColumns.FILTER_COLUMNS, FluoQueryColumns.FILTER_BINDING_SET), + JOIN(IncrementalUpdateConstants.JOIN_PREFIX, QueryNodeMetadataColumns.JOIN_COLUMNS, FluoQueryColumns.JOIN_BINDING_SET), + STATEMENT_PATTERN(IncrementalUpdateConstants.SP_PREFIX, QueryNodeMetadataColumns.STATEMENTPATTERN_COLUMNS, FluoQueryColumns.STATEMENT_PATTERN_BINDING_SET), + QUERY(IncrementalUpdateConstants.QUERY_PREFIX, QueryNodeMetadataColumns.QUERY_COLUMNS, FluoQueryColumns.QUERY_BINDING_SET), + AGGREGATION(IncrementalUpdateConstants.AGGREGATION_PREFIX, QueryNodeMetadataColumns.AGGREGATION_COLUMNS, FluoQueryColumns.AGGREGATION_BINDING_SET), + PROJECTION(IncrementalUpdateConstants.PROJECTION_PREFIX, QueryNodeMetadataColumns.PROJECTION_COLUMNS, FluoQueryColumns.PROJECTION_BINDING_SET), + CONSTRUCT(IncrementalUpdateConstants.CONSTRUCT_PREFIX, QueryNodeMetadataColumns.CONSTRUCT_COLUMNS, FluoQueryColumns.CONSTRUCT_STATEMENTS); //Metadata Columns associated with given NodeType private QueryNodeMetadataColumns metadataColumns; @@ -53,15 +56,25 @@ public enum NodeType { //Column where results are stored for given NodeType private Column resultColumn; + //Prefix for the given node type + private String nodePrefix; /** * Constructs an instance of {@link NodeType}. * * @param metadataColumns - Metadata {@link Column}s associated with this {@link NodeType}. (not null) * @param resultColumn - The {@link Column} used to store this {@link NodeType}'s results. (not null) */ - private NodeType(QueryNodeMetadataColumns metadataColumns, Column resultColumn) { + private NodeType(String nodePrefix, QueryNodeMetadataColumns metadataColumns, Column resultColumn) { this.metadataColumns = requireNonNull(metadataColumns); this.resultColumn = requireNonNull(resultColumn); + this.nodePrefix = requireNonNull(nodePrefix); + } + + /** + * @return the prefix for the given node type + */ + public String getNodeTypePrefix() { + return nodePrefix; } /** @@ -103,10 +116,38 @@ public enum NodeType { type = AGGREGATION; } else if(nodeId.startsWith(CONSTRUCT_PREFIX)) { type = CONSTRUCT; + } else if(nodeId.startsWith(PROJECTION_PREFIX)) { + type = PROJECTION; } else if(nodeId.startsWith(PERIODIC_QUERY_PREFIX)) { type = PERIODIC_QUERY; } return Optional.fromNullable(type); } + + /** + * Creates an id for a given NodeType that is of the form {@link NodeType#getNodeTypePrefix()} + "_" + pcjId, + * where the pcjId is an auto generated UUID with all dashes removed. + * @param type {@link NodeType} + * @return id for the given NodeType + */ + public static String generateNewFluoIdForType(NodeType type) { + String unique = UUID.randomUUID().toString().replaceAll("-", ""); + // Put them together to create the Node ID. + return type.getNodeTypePrefix() + "_" + unique; + } + + /** + * Creates an id for a given NodeType that is of the form {@link NodeType#getNodeTypePrefix()} + "_" + pcjId + * + * @param type {@link NodeType} + * @return id for the given NodeType + */ + public static String generateNewIdForType(NodeType type, String pcjId) { + // Put them together to create the Node ID. + return type.getNodeTypePrefix() + "_" + pcjId; + } + + + } http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/e387818b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/PeriodicQueryUpdater.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/PeriodicQueryUpdater.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/PeriodicQueryUpdater.java index ae4912b..cb331cf 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/PeriodicQueryUpdater.java +++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/PeriodicQueryUpdater.java @@ -34,7 +34,6 @@ import org.openrdf.model.Literal; import org.openrdf.model.Value; import org.openrdf.model.ValueFactory; import org.openrdf.model.impl.ValueFactoryImpl; -import org.openrdf.query.Binding; import org.openrdf.query.algebra.evaluation.QueryBindingSet; /** http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/e387818b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/ProjectionResultUpdater.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/ProjectionResultUpdater.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/ProjectionResultUpdater.java new file mode 100644 index 0000000..f9d8257 --- /dev/null +++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/ProjectionResultUpdater.java @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.indexing.pcj.fluo.app; + +import static com.google.common.base.Preconditions.checkNotNull; + +import org.apache.fluo.api.client.TransactionBase; +import org.apache.fluo.api.data.Bytes; +import org.apache.log4j.Logger; +import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns; +import org.apache.rya.indexing.pcj.fluo.app.query.ProjectionMetadata; +import org.apache.rya.indexing.pcj.fluo.app.util.BindingSetUtil; +import org.apache.rya.indexing.pcj.fluo.app.util.RowKeyUtil; +import org.apache.rya.indexing.pcj.storage.accumulo.VariableOrder; +import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet; +import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSetSerDe; +import org.openrdf.query.BindingSet; + +import edu.umd.cs.findbugs.annotations.DefaultAnnotation; +import edu.umd.cs.findbugs.annotations.NonNull; + +/** + * Updates the results of a Projection node when one of its children has added a + * new Binding Set to its results. + */ +@DefaultAnnotation(NonNull.class) +public class ProjectionResultUpdater { + private static final Logger log = Logger.getLogger(QueryResultUpdater.class); + + private static final VisibilityBindingSetSerDe BS_SERDE = new VisibilityBindingSetSerDe(); + + /** + * Updates the results of a Projection node when one of its children has added a + * new Binding Set to its results. + * + * @param tx - The transaction all Fluo queries will use. (not null) + * @param childBindingSet - A binding set that the query's child node has emmitted. (not null) + * @param projectionMetadata - The metadata of the Query whose results will be updated. (not null) + * @throws Exception A problem caused the update to fail. + */ + public void updateProjectionResults( + final TransactionBase tx, + final VisibilityBindingSet childBindingSet, + final ProjectionMetadata projectionMetadata) throws Exception { + checkNotNull(tx); + checkNotNull(childBindingSet); + checkNotNull(projectionMetadata); + + log.trace( + "Transaction ID: " + tx.getStartTimestamp() + "\n" + + "Node ID: " + projectionMetadata.getNodeId() + "\n" + + "Parent Node ID: " + projectionMetadata.getParentNodeId() + "\n" + + "Child Node ID: " + projectionMetadata.getChildNodeId() + "\n" + + "Child Binding Set:\n" + childBindingSet + "\n"); + + // Create the query's Binding Set from the child node's binding set. + final VariableOrder queryVarOrder = projectionMetadata.getVariableOrder(); + final VariableOrder projectionVarOrder = projectionMetadata.getProjectedVars(); + final BindingSet queryBindingSet = BindingSetUtil.keepBindings(projectionVarOrder, childBindingSet); + + // Create the Row Key for the result. If the child node groups results, then the key must only contain the Group By variables. + Bytes resultRow = RowKeyUtil.makeRowKey(projectionMetadata.getNodeId(), queryVarOrder, queryBindingSet); + + // Create the Binding Set that goes in the Node Value. It does contain visibilities. + final Bytes nodeValueBytes = BS_SERDE.serialize(new VisibilityBindingSet(queryBindingSet, childBindingSet.getVisibility())); + + log.trace( + "Transaction ID: " + tx.getStartTimestamp() + "\n" + + "New Binding Set: " + childBindingSet + "\n"); + + tx.set(resultRow, FluoQueryColumns.PROJECTION_BINDING_SET, nodeValueBytes); + } +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/e387818b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/QueryResultUpdater.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/QueryResultUpdater.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/QueryResultUpdater.java index 44fc9bd..37d7256 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/QueryResultUpdater.java +++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/QueryResultUpdater.java @@ -23,16 +23,12 @@ import static com.google.common.base.Preconditions.checkNotNull; import org.apache.fluo.api.client.TransactionBase; import org.apache.fluo.api.data.Bytes; import org.apache.log4j.Logger; -import org.apache.rya.indexing.pcj.fluo.app.query.AggregationMetadata; import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns; -import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryMetadataDAO; import org.apache.rya.indexing.pcj.fluo.app.query.QueryMetadata; -import org.apache.rya.indexing.pcj.fluo.app.util.BindingSetUtil; import org.apache.rya.indexing.pcj.fluo.app.util.RowKeyUtil; import org.apache.rya.indexing.pcj.storage.accumulo.VariableOrder; import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet; import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSetSerDe; -import org.openrdf.query.BindingSet; import edu.umd.cs.findbugs.annotations.DefaultAnnotation; import edu.umd.cs.findbugs.annotations.NonNull; @@ -45,7 +41,6 @@ import edu.umd.cs.findbugs.annotations.NonNull; public class QueryResultUpdater { private static final Logger log = Logger.getLogger(QueryResultUpdater.class); - private static final FluoQueryMetadataDAO METADATA_DA0 = new FluoQueryMetadataDAO(); private static final VisibilityBindingSetSerDe BS_SERDE = new VisibilityBindingSetSerDe(); /** @@ -73,23 +68,12 @@ public class QueryResultUpdater { // Create the query's Binding Set from the child node's binding set. final VariableOrder queryVarOrder = queryMetadata.getVariableOrder(); - final BindingSet queryBindingSet = BindingSetUtil.keepBindings(queryVarOrder, childBindingSet); // Create the Row Key for the result. If the child node groups results, then the key must only contain the Group By variables. - final Bytes resultRow; - - final String childNodeId = queryMetadata.getChildNodeId(); - final boolean isGrouped = childNodeId.startsWith( IncrementalUpdateConstants.AGGREGATION_PREFIX ); - if(isGrouped) { - final AggregationMetadata aggMetadata = METADATA_DA0.readAggregationMetadata(tx, childNodeId); - final VariableOrder groupByVars = aggMetadata.getGroupByVariableOrder(); - resultRow = RowKeyUtil.makeRowKey(queryMetadata.getNodeId(), groupByVars, queryBindingSet); - } else { - resultRow = RowKeyUtil.makeRowKey(queryMetadata.getNodeId(), queryVarOrder, queryBindingSet); - } + final Bytes resultRow = RowKeyUtil.makeRowKey(queryMetadata.getNodeId(), queryVarOrder, childBindingSet); // Create the Binding Set that goes in the Node Value. It does contain visibilities. - final Bytes nodeValueBytes = BS_SERDE.serialize(new VisibilityBindingSet(queryBindingSet,childBindingSet.getVisibility())); + final Bytes nodeValueBytes = BS_SERDE.serialize(childBindingSet); log.trace( "Transaction ID: " + tx.getStartTimestamp() + "\n" + http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/e387818b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/kafka/KafkaRyaSubGraphExporter.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/kafka/KafkaRyaSubGraphExporter.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/kafka/KafkaRyaSubGraphExporter.java index a15743f..fa27b46 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/kafka/KafkaRyaSubGraphExporter.java +++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/kafka/KafkaRyaSubGraphExporter.java @@ -30,8 +30,6 @@ import org.apache.rya.api.domain.RyaSubGraph; import org.apache.rya.indexing.pcj.fluo.app.export.IncrementalBindingSetExporter.ResultExportException; import org.apache.rya.indexing.pcj.fluo.app.export.IncrementalRyaSubGraphExporter; -import com.google.common.base.Preconditions; - /** * Exports {@link RyaSubGraph}s to Kafka from Rya Fluo Application * http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/e387818b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/BindingSetUpdater.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/BindingSetUpdater.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/BindingSetUpdater.java index 3a731c2..7d0fd5e 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/BindingSetUpdater.java +++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/BindingSetUpdater.java @@ -31,6 +31,7 @@ import org.apache.rya.indexing.pcj.fluo.app.FilterResultUpdater; import org.apache.rya.indexing.pcj.fluo.app.JoinResultUpdater; import org.apache.rya.indexing.pcj.fluo.app.NodeType; import org.apache.rya.indexing.pcj.fluo.app.PeriodicQueryUpdater; +import org.apache.rya.indexing.pcj.fluo.app.ProjectionResultUpdater; import org.apache.rya.indexing.pcj.fluo.app.QueryResultUpdater; import org.apache.rya.indexing.pcj.fluo.app.query.AggregationMetadata; import org.apache.rya.indexing.pcj.fluo.app.query.ConstructQueryMetadata; @@ -38,6 +39,7 @@ import org.apache.rya.indexing.pcj.fluo.app.query.FilterMetadata; import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryMetadataDAO; import org.apache.rya.indexing.pcj.fluo.app.query.JoinMetadata; import org.apache.rya.indexing.pcj.fluo.app.query.PeriodicQueryMetadata; +import org.apache.rya.indexing.pcj.fluo.app.query.ProjectionMetadata; import org.apache.rya.indexing.pcj.fluo.app.query.QueryMetadata; import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet; @@ -61,6 +63,7 @@ public abstract class BindingSetUpdater extends AbstractObserver { private final QueryResultUpdater queryUpdater = new QueryResultUpdater(); private final AggregationResultUpdater aggregationUpdater = new AggregationResultUpdater(); private final ConstructQueryResultUpdater constructUpdater = new ConstructQueryResultUpdater(); + private final ProjectionResultUpdater projectionUpdater = new ProjectionResultUpdater(); private final PeriodicQueryUpdater periodicQueryUpdater = new PeriodicQueryUpdater(); @Override @@ -107,6 +110,15 @@ public abstract class BindingSetUpdater extends AbstractObserver { } break; + case PROJECTION: + final ProjectionMetadata projectionQuery = queryDao.readProjectionMetadata(tx, parentNodeId); + try { + projectionUpdater.updateProjectionResults(tx, observedBindingSet, projectionQuery); + } catch (final Exception e) { + throw new RuntimeException("Could not process a Query node.", e); + } + break; + case CONSTRUCT: final ConstructQueryMetadata constructQuery = queryDao.readConstructQueryMetadata(tx, parentNodeId); try{ @@ -154,7 +166,7 @@ public abstract class BindingSetUpdater extends AbstractObserver { default: - throw new IllegalArgumentException("The parent node's NodeType must be of type Filter, Join, PeriodicBin or Query, but was " + parentNodeType); + throw new IllegalArgumentException("The parent node's NodeType must be of type Aggregation, Projection, ConstructQuery, Filter, Join, PeriodicBin or Query, but was " + parentNodeType); } } http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/e387818b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/ProjectionObserver.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/ProjectionObserver.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/ProjectionObserver.java new file mode 100644 index 0000000..b712606 --- /dev/null +++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/ProjectionObserver.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.indexing.pcj.fluo.app.observers; + +import static java.util.Objects.requireNonNull; + +import org.apache.fluo.api.client.TransactionBase; +import org.apache.fluo.api.data.Bytes; +import org.apache.log4j.Logger; +import org.apache.rya.indexing.pcj.fluo.app.BindingSetRow; +import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns; +import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryMetadataDAO; +import org.apache.rya.indexing.pcj.fluo.app.query.ProjectionMetadata; +import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet; +import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSetSerDe; + +/** + * Performs incremental result exporting to the configured destinations. + */ +public class ProjectionObserver extends BindingSetUpdater { + private static final Logger log = Logger.getLogger(ProjectionObserver.class); + + private static final VisibilityBindingSetSerDe BS_SERDE = new VisibilityBindingSetSerDe(); + private final FluoQueryMetadataDAO queryDao = new FluoQueryMetadataDAO(); + + @Override + public ObservedColumn getObservedColumn() { + return new ObservedColumn(FluoQueryColumns.PROJECTION_BINDING_SET, NotificationType.STRONG); + } + + @Override + public Observation parseObservation(final TransactionBase tx, final Bytes row) throws Exception { + requireNonNull(tx); + requireNonNull(row); + + // Read the Filter metadata. + final String projectionNodeId = BindingSetRow.make(row).getNodeId(); + final ProjectionMetadata projectionMetadata = queryDao.readProjectionMetadata(tx, projectionNodeId); + + // Read the Visibility Binding Set from the value. + final Bytes valueBytes = tx.get(row, FluoQueryColumns.PROJECTION_BINDING_SET); + final VisibilityBindingSet projectionBindingSet = BS_SERDE.deserialize(valueBytes); + + // Figure out which node needs to handle the new metadata. + final String parentNodeId = projectionMetadata.getParentNodeId(); + + return new Observation(projectionNodeId, projectionBindingSet, parentNodeId); + } +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/e387818b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/QueryResultObserver.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/QueryResultObserver.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/QueryResultObserver.java index fbdca08..e6368ba 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/QueryResultObserver.java +++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/QueryResultObserver.java @@ -107,7 +107,7 @@ public class QueryResultObserver extends AbstractObserver { // Read the Child Binding Set that will be exported. final Bytes valueBytes = tx.get(brow, col); final VisibilityBindingSet result = BS_SERDE.deserialize(valueBytes); - + // Simplify the result's visibilities. final String visibility = result.getVisibility(); if(!simplifiedVisibilities.containsKey(visibility)) { http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/e387818b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/AggregationMetadata.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/AggregationMetadata.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/AggregationMetadata.java index ff42a0f..eaa072f 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/AggregationMetadata.java +++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/AggregationMetadata.java @@ -287,7 +287,7 @@ public class AggregationMetadata extends CommonNodeMetadata { * Builds instances of {@link AggregationMetadata}. */ @DefaultAnnotation(NonNull.class) - public static final class Builder { + public static final class Builder implements CommonNodeMetadata.Builder { private final String nodeId; private VariableOrder varOrder; @@ -317,7 +317,7 @@ public class AggregationMetadata extends CommonNodeMetadata { * single variable because aggregations are only able to emit the aggregated value. * @return This builder so that method invocations may be chained. */ - public Builder setVariableOrder(@Nullable final VariableOrder varOrder) { + public Builder setVarOrder(@Nullable final VariableOrder varOrder) { this.varOrder = varOrder; return this; } @@ -350,6 +350,10 @@ public class AggregationMetadata extends CommonNodeMetadata { this.childNodeId = childNodeId; return this; } + + public String getChildNodeId() { + return childNodeId; + } /** * @param aggregation - An aggregation that will be performed over the BindingSets that are emitted from the child node. http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/e387818b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/CommonNodeMetadata.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/CommonNodeMetadata.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/CommonNodeMetadata.java index e54acf1..a20fe4d 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/CommonNodeMetadata.java +++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/CommonNodeMetadata.java @@ -99,4 +99,18 @@ public abstract class CommonNodeMetadata { .append("}") .toString(); } + + /** + * Base interface for all metadata Builders. Using this type def + * allows for the implementation of a Builder visitor for navigating + * the Builder tree. + * + */ + public static interface Builder { + + public String getNodeId(); + + public VariableOrder getVariableOrder(); + } + } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/e387818b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/ConstructQueryMetadata.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/ConstructQueryMetadata.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/ConstructQueryMetadata.java index e836c5d..6bf968e 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/ConstructQueryMetadata.java +++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/ConstructQueryMetadata.java @@ -38,7 +38,7 @@ public class ConstructQueryMetadata extends CommonNodeMetadata { private String childNodeId; private ConstructGraph graph; - private String sparql; + private String parentNodeId; /** * Creates ConstructQueryMetadata object from the provided metadata arguments. @@ -47,21 +47,11 @@ public class ConstructQueryMetadata extends CommonNodeMetadata { * @param graph - {@link ConstructGraph} used to project {@link BindingSet}s onto sets of statement representing construct graph * @param sparql - SPARQL query containing construct graph */ - public ConstructQueryMetadata(String nodeId, String childNodeId, ConstructGraph graph, String sparql) { - super(nodeId, new VariableOrder("subject", "predicate", "object")); - Preconditions.checkNotNull(childNodeId); - Preconditions.checkNotNull(graph); - Preconditions.checkNotNull(sparql); - this.childNodeId = childNodeId; - this.graph = graph; - this.sparql = sparql; - } - - /** - * @return sparql query string representing this construct query - */ - public String getSparql() { - return sparql; + public ConstructQueryMetadata(String nodeId, String parentNodeId, String childNodeId, VariableOrder varOrder, ConstructGraph graph) { + super(nodeId, varOrder); + this.childNodeId = Preconditions.checkNotNull(childNodeId); + this.parentNodeId = Preconditions.checkNotNull(parentNodeId); + this.graph = Preconditions.checkNotNull(graph); } /** @@ -71,6 +61,13 @@ public class ConstructQueryMetadata extends CommonNodeMetadata { public String getChildNodeId() { return childNodeId; } + + /** + * @return The parent of this construct node + */ + public String getParentNodeId() { + return parentNodeId; + } /** * @return The ConstructGraph used to form statement {@link BindingSet}s for @@ -82,7 +79,7 @@ public class ConstructQueryMetadata extends CommonNodeMetadata { @Override public int hashCode() { - return Objects.hashCode(super.getNodeId(), super.getVariableOrder(), childNodeId, graph, sparql); + return Objects.hashCode(super.getNodeId(), super.getVariableOrder(), parentNodeId, childNodeId, graph); } @Override @@ -94,8 +91,8 @@ public class ConstructQueryMetadata extends CommonNodeMetadata { if (o instanceof ConstructQueryMetadata) { ConstructQueryMetadata queryMetadata = (ConstructQueryMetadata) o; if (super.equals(queryMetadata)) { - return new EqualsBuilder().append(childNodeId, queryMetadata.childNodeId).append(graph, queryMetadata.graph) - .append(sparql, queryMetadata.sparql).isEquals(); + return new EqualsBuilder().append(parentNodeId, queryMetadata.parentNodeId).append(childNodeId, queryMetadata.childNodeId).append(graph, queryMetadata.graph) + .isEquals(); } return false; } @@ -105,7 +102,7 @@ public class ConstructQueryMetadata extends CommonNodeMetadata { @Override public String toString() { return new StringBuilder().append("Construct Query Metadata {\n").append(" Node ID: " + super.getNodeId() + "\n") - .append(" SPARQL QUERY: " + sparql + "\n").append(" Variable Order: " + super.getVariableOrder() + "\n") + .append(" Variable Order: " + super.getVariableOrder() + "\n") .append(" Child Node ID: " + childNodeId + "\n").append(" Construct Graph: " + graph.getProjections() + "\n") .append("}").toString(); } @@ -123,13 +120,14 @@ public class ConstructQueryMetadata extends CommonNodeMetadata { * Builds instances of {@link QueryMetadata}. */ @DefaultAnnotation(NonNull.class) - public static final class Builder { + public static final class Builder implements CommonNodeMetadata.Builder { private String nodeId; private ConstructGraph graph; + private String parentNodeId; private String childNodeId; - private String sparql; + private VariableOrder varOrder; /** * Set the node Id that identifies this Construct Query Node @@ -144,21 +142,31 @@ public class ConstructQueryMetadata extends CommonNodeMetadata { } /** - * Set the SPARQL String representing this construct query - * @param SPARQL string representing this construct query + * @return the node id for this construct query + */ + public String getNodeId() { + return nodeId; + } + + /** + * Sets the VariableOrder that determines how results will be written + * @param varOrder + * @return This builder so that method invocations may be chained. */ - public Builder setSparql(String sparql) { - this.sparql = sparql; + public Builder setVarOrder(VariableOrder varOrder) { + this.varOrder = varOrder; return this; } + + @Override + public VariableOrder getVariableOrder() { + return varOrder; + } /** - * Set the ConstructGraph used to form statement {@link BindingSet}s for - * this Construct Query + * Set the ConstructGraph used to form statement {@link BindingSet}s for this Construct Query * - * @param varOrder - * - ConstructGraph to project {@link BindingSet}s onto RDF - * statements + * @param varOrder - ConstructGraph to project {@link BindingSet}s onto RDF statements * @return This builder so that method invocations may be chained. */ public Builder setConstructGraph(ConstructGraph graph) { @@ -167,25 +175,37 @@ public class ConstructQueryMetadata extends CommonNodeMetadata { } /** - * Set the node whose results are projected onto the given - * {@link ConstructGraph}. + * Set the node whose results are projected onto the given {@link ConstructGraph}. * - * @param childNodeId - * - The node whose results are projected onto the given - * {@link ConstructGraph}. + * @param childNodeId - The node whose results are projected onto the given {@link ConstructGraph}. * @return This builder so that method invocations may be chained. */ public Builder setChildNodeId(String childNodeId) { this.childNodeId = childNodeId; return this; } + + public String getChildNodeId() { + return childNodeId; + } + + /** + * Set the parent node of this {@link ConstructGraph}. + * + * @param parentNodeId - The the parent node of this {@link ConstructGraph}. + * @return This builder so that method invocations may be chained. + */ + public Builder setParentNodeId(String parentNodeId) { + this.parentNodeId = parentNodeId; + return this; + } /** * @return An instance of {@link ConstructQueryMetadata} build using * this builder's values. */ public ConstructQueryMetadata build() { - return new ConstructQueryMetadata(nodeId, childNodeId, graph, sparql); + return new ConstructQueryMetadata(nodeId, parentNodeId, childNodeId, varOrder, graph); } } http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/e387818b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FilterMetadata.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FilterMetadata.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FilterMetadata.java index 7e2e995..a821d8c 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FilterMetadata.java +++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FilterMetadata.java @@ -145,7 +145,7 @@ public class FilterMetadata extends CommonNodeMetadata { * Builds instances of {@link FilterMetadata}. */ @DefaultAnnotation(NonNull.class) - public static final class Builder { + public static final class Builder implements CommonNodeMetadata.Builder{ private final String nodeId; private VariableOrder varOrder; @@ -179,6 +179,11 @@ public class FilterMetadata extends CommonNodeMetadata { this.varOrder = varOrder; return this; } + + @Override + public VariableOrder getVariableOrder() { + return varOrder; + } /** * Set the original SPARQL query the filter is derived from. @@ -212,6 +217,10 @@ public class FilterMetadata extends CommonNodeMetadata { this.childNodeId = childNodeId; return this; } + + public String getChildNodeId() { + return childNodeId; + } /** * @return Returns an instance of {@link FilterMetadata} using this builder's values. http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/e387818b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQuery.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQuery.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQuery.java index 8d218af..65db02c 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQuery.java +++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQuery.java @@ -27,6 +27,7 @@ import java.util.Map; import java.util.Map.Entry; import org.apache.commons.lang3.builder.EqualsBuilder; +import org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.QueryType; import com.google.common.base.Objects; import com.google.common.base.Optional; @@ -44,7 +45,8 @@ import net.jcip.annotations.Immutable; @DefaultAnnotation(NonNull.class) public class FluoQuery { - private final Optional<QueryMetadata> queryMetadata; + private final QueryMetadata queryMetadata; + private final ImmutableMap<String, ProjectionMetadata> projectionMetadata; private final Optional<ConstructQueryMetadata> constructMetadata; private final Optional<PeriodicQueryMetadata> periodicQueryMetadata; private final ImmutableMap<String, StatementPatternMetadata> statementPatternMetadata; @@ -52,13 +54,15 @@ public class FluoQuery { private final ImmutableMap<String, JoinMetadata> joinMetadata; private final ImmutableMap<String, AggregationMetadata> aggregationMetadata; private final QueryType type; - public static enum QueryType {Projection, Construct}; + private final String queryId; /** * Constructs an instance of {@link FluoQuery}. Private because applications * must use {@link Builder} instead. * - * @param queryMetadata - The root node of a query that is updated in Fluo. (not null) + * @param queryMetadata - metadata for the query for handling results (not null) + * @param projectionMetadata - projection nodes of query that project results (not null) + * @param constructMetadata - construct node of query that creates subgraphs * @param periodicQueryMetadata - The periodic query node that is updated in Fluo. * @param statementPatternMetadata - A map from Node ID to Statement Pattern metadata as * it is represented within the Fluo app. (not null) @@ -71,52 +75,27 @@ public class FluoQuery { */ private FluoQuery( final QueryMetadata queryMetadata, + final ImmutableMap<String, ProjectionMetadata> projectionMetadata, + final Optional<ConstructQueryMetadata> constructMetadata, final Optional<PeriodicQueryMetadata> periodicQueryMetadata, final ImmutableMap<String, StatementPatternMetadata> statementPatternMetadata, final ImmutableMap<String, FilterMetadata> filterMetadata, final ImmutableMap<String, JoinMetadata> joinMetadata, final ImmutableMap<String, AggregationMetadata> aggregationMetadata) { this.aggregationMetadata = requireNonNull(aggregationMetadata); - this.queryMetadata = Optional.of(requireNonNull(queryMetadata)); - this.constructMetadata = Optional.absent(); + this.queryMetadata = requireNonNull(queryMetadata); + this.queryId = queryMetadata.getNodeId(); + this.projectionMetadata = requireNonNull(projectionMetadata); + this.constructMetadata = constructMetadata; this.periodicQueryMetadata = periodicQueryMetadata; this.statementPatternMetadata = requireNonNull(statementPatternMetadata); this.filterMetadata = requireNonNull(filterMetadata); this.joinMetadata = requireNonNull(joinMetadata); - this.type = QueryType.Projection; - } - - - /** - * Constructs an instance of {@link FluoQuery}. Private because applications - * must use {@link Builder} instead. - * - * @param constructMetadata - The root node of a query that is updated in Fluo. (not null) - * @param periodicQueryMetadata - The periodic query node that is updated in Fluo. - * @param statementPatternMetadata - A map from Node ID to Statement Pattern metadata as - * it is represented within the Fluo app. (not null) - * @param filterMetadata A map from Node ID to Filter metadata as it is represented - * within the Fluo app. (not null) - * @param joinMetadata - A map from Node ID to Join metadata as it is represented - * within the Fluo app. (not null) - * @param aggregationMetadata - A map from Node ID to Aggregation metadata as it is - * represented within the Fluo app. (not null) - */ - private FluoQuery( - final ConstructQueryMetadata constructMetadata, - final Optional<PeriodicQueryMetadata> periodicQueryMetadata, - final ImmutableMap<String, StatementPatternMetadata> statementPatternMetadata, - final ImmutableMap<String, FilterMetadata> filterMetadata, - final ImmutableMap<String, JoinMetadata> joinMetadata, - final ImmutableMap<String, AggregationMetadata> aggregationMetadata) { - this.constructMetadata = Optional.of(requireNonNull(constructMetadata)); - this.queryMetadata = Optional.absent(); - this.periodicQueryMetadata = periodicQueryMetadata; - this.statementPatternMetadata = requireNonNull(statementPatternMetadata); - this.filterMetadata = requireNonNull(filterMetadata); - this.joinMetadata = requireNonNull(joinMetadata); - this.aggregationMetadata = aggregationMetadata; - this.type = QueryType.Construct; + if(constructMetadata.isPresent()) { + this.type = QueryType.Construct; + } else { + this.type = QueryType.Projection; + } } /** @@ -126,24 +105,86 @@ public class FluoQuery { public QueryType getQueryType() { return type; } + + /** + * @return the unique id of this query + */ + public String getQueryId() { + return queryId; + } /** * @return Metadata about the root node of a query that is updated within the Fluo app. */ - public Optional<QueryMetadata> getQueryMetadata() { + public QueryMetadata getQueryMetadata() { return queryMetadata; } + /** + * @param nodeId - node id of the query metadata + * @return Optional containing the queryMetadata if it matches the specified nodeId + */ + public Optional<QueryMetadata> getQueryMetadata(String nodeId) { + if(queryMetadata.getNodeId().equals(nodeId)) { + return Optional.of(queryMetadata); + } else { + return Optional.absent(); + } + } + + /** + * @return construct query metadata for generating subgraphs + */ public Optional<ConstructQueryMetadata> getConstructQueryMetadata() { return constructMetadata; } /** + * @param nodeId - node id of the ConstructMetadata + * @return Optional containing the ConstructMetadata if it is present and has the given nodeId + */ + public Optional<ConstructQueryMetadata> getConstructQueryMetadata(String nodeId) { + if(constructMetadata.isPresent() && constructMetadata.get().getNodeId().equals(nodeId)) { + return constructMetadata; + } else { + return Optional.absent(); + } + } + + /** + * @param nodeId - id of the Projection metadata you want (not null) + * @return projection metadata corresponding to give nodeId + */ + public Optional<ProjectionMetadata> getProjectionMetadata(String nodeId) { + return Optional.fromNullable(projectionMetadata.get(nodeId)); + } + + /** + * @return All of the projection metadata that is stored for the query + */ + public Collection<ProjectionMetadata> getProjectionMetadata() { + return projectionMetadata.values(); + } + + /** * @return All of the Periodic Query metadata that is stored for the query. */ public Optional<PeriodicQueryMetadata> getPeriodicQueryMetadata() { return periodicQueryMetadata; } + + /** + * @param nodeId - id of the PeriodicQueryMetadata + * @return Optional containing the PeriodicQueryMetadata if it is present and has the given nodeId + */ + public Optional<PeriodicQueryMetadata> getPeriodicQueryMetadata(String nodeId) { + + if(periodicQueryMetadata.isPresent() && periodicQueryMetadata.get().getNodeId().equals(nodeId)) { + return periodicQueryMetadata; + } else { + return Optional.absent(); + } + } /** * Get a Statement Pattern node's metadata. @@ -254,8 +295,11 @@ public class FluoQuery { public String toString() { final StringBuilder builder = new StringBuilder(); - if(queryMetadata.isPresent()) { - builder.append( queryMetadata.get().toString() ); + builder.append(queryMetadata.toString()); + builder.append("\n"); + + for(final ProjectionMetadata metadata : projectionMetadata.values()) { + builder.append(metadata); builder.append("\n"); } @@ -305,9 +349,10 @@ public class FluoQuery { @DefaultAnnotation(NonNull.class) public static final class Builder { - private QueryMetadata.Builder queryBuilder = null; - private ConstructQueryMetadata.Builder constructBuilder = null; - private PeriodicQueryMetadata.Builder periodicQueryBuilder = null; + private QueryMetadata.Builder queryBuilder; + private ConstructQueryMetadata.Builder constructBuilder; + private PeriodicQueryMetadata.Builder periodicQueryBuilder; + private final Map<String, ProjectionMetadata.Builder> projectionBuilders = new HashMap<>(); private final Map<String, StatementPatternMetadata.Builder> spBuilders = new HashMap<>(); private final Map<String, FilterMetadata.Builder> filterBuilders = new HashMap<>(); private final Map<String, JoinMetadata.Builder> joinBuilders = new HashMap<>(); @@ -319,23 +364,55 @@ public class FluoQuery { * @param queryBuilder - The builder representing the query's results. * @return This builder so that method invocation may be chained. */ - public Builder setQueryMetadata(@Nullable final QueryMetadata.Builder queryBuilder) { - this.queryBuilder = queryBuilder; + public Builder setQueryMetadata(final QueryMetadata.Builder queryBuilder) { + this.queryBuilder = requireNonNull(queryBuilder); return this; } /** * @return The Query metadata builder if one has been set. */ - public Optional<QueryMetadata.Builder> getQueryBuilder() { - return Optional.fromNullable( queryBuilder ); + public QueryMetadata.Builder getQueryBuilder() { + return queryBuilder; + } + + /** + * @param nodeId - id of the QueryMetadata.Builder + * @return Optional containing the QueryMetadata.Builder if it has the specified nodeId + */ + public Optional<QueryMetadata.Builder> getQueryBuilder(String nodeId) { + if(queryBuilder.getNodeId().equals(nodeId)) { + return Optional.of(queryBuilder); + } else { + return Optional.absent(); + } + + } + + /** + * Sets the {@link ProjectionMetadata.Builder} that is used by this builder. + * + * @param projectionBuilder - The builder representing this query's projection + * @return This builder so that method invocation may be chained. + */ + public Builder addProjectionBuilder(@Nullable final ProjectionMetadata.Builder projectionBuilder) { + requireNonNull(projectionBuilder); + projectionBuilders.put(projectionBuilder.getNodeId(), projectionBuilder); + return this; + } + + /** + * @return The ProjectionMetadata builder if one has been set. + */ + public Optional<ProjectionMetadata.Builder> getProjectionBuilder(String nodeId) { + requireNonNull(nodeId); + return Optional.fromNullable( projectionBuilders.get(nodeId) ); } /** * Sets the {@link ConstructQueryMetadata.Builder} that is used by this builder. * - * @param constructBuilder - * - The builder representing the query's results. + * @param constructBuilder - The builder representing the query's results. * @return This builder so that method invocation may be chained. */ public Builder setConstructQueryMetadata(@Nullable final ConstructQueryMetadata.Builder constructBuilder) { @@ -344,6 +421,18 @@ public class FluoQuery { } /** + * @param id of the ConstructQueryMetadata.Builder + * @return Optional containing the ConstructQueryMetadata.Builder if it has been set and has the given nodeId. + */ + public Optional<ConstructQueryMetadata.Builder> getConstructQueryBuilder(String nodeId) { + if(constructBuilder != null && constructBuilder.getNodeId().equals(nodeId)) { + return Optional.of(constructBuilder); + } else { + return Optional.absent(); + } + } + + /** * @return The Construct Query metadata builder if one has been set. */ public Optional<ConstructQueryMetadata.Builder> getConstructQueryBuilder() { @@ -442,8 +531,6 @@ public class FluoQuery { this.aggregationBuilders.put(aggregationBuilder.getNodeId(), aggregationBuilder); return this; } - - /** * Adds a new {@link PeriodicQueryMetadata.Builder} to this builder. @@ -457,7 +544,6 @@ public class FluoQuery { return this; } - /** * Get a PeriodicQuery builder from this builder. * @@ -467,24 +553,35 @@ public class FluoQuery { return Optional.fromNullable( periodicQueryBuilder); } - /** - * @return Creates a {@link FluoQuery} using the values that have been supplied to this builder. + * @param - id of the PeriodicQueryMetadata.Builder + * @return - Optional containing the PeriodicQueryMetadata.Builder if one has been set and it has the given nodeId */ - public FluoQuery build() { - checkArgument((queryBuilder != null && constructBuilder == null) || (queryBuilder == null && constructBuilder != null)); + public Optional<PeriodicQueryMetadata.Builder> getPeriodicQueryBuilder(String nodeId) { - Optional<QueryMetadata.Builder> optionalQueryBuilder = getQueryBuilder(); - QueryMetadata queryMetadata = null; - if(optionalQueryBuilder.isPresent()) { - queryMetadata = optionalQueryBuilder.get().build(); + if(periodicQueryBuilder != null && periodicQueryBuilder.getNodeId().equals(nodeId)) { + return Optional.of(periodicQueryBuilder); + } else { + return Optional.absent(); } + } + + /** + * @return Creates a {@link FluoQuery} using the values that have been supplied to this builder. + */ + public FluoQuery build() { + checkArgument((projectionBuilders.size() > 0 || constructBuilder != null)); Optional<PeriodicQueryMetadata.Builder> optionalPeriodicQueryBuilder = getPeriodicQueryBuilder(); PeriodicQueryMetadata periodicQueryMetadata = null; if(optionalPeriodicQueryBuilder.isPresent()) { periodicQueryMetadata = optionalPeriodicQueryBuilder.get().build(); } + + final ImmutableMap.Builder<String, ProjectionMetadata> projectionMetadata = ImmutableMap.builder(); + for(final Entry<String, ProjectionMetadata.Builder> entry : projectionBuilders.entrySet()) { + projectionMetadata.put(entry.getKey(), entry.getValue().build()); + } final ImmutableMap.Builder<String, StatementPatternMetadata> spMetadata = ImmutableMap.builder(); for(final Entry<String, StatementPatternMetadata.Builder> entry : spBuilders.entrySet()) { @@ -506,12 +603,13 @@ public class FluoQuery { aggregateMetadata.put(entry.getKey(), entry.getValue().build()); } - if(queryBuilder != null) { - return new FluoQuery(queryBuilder.build(), Optional.fromNullable(periodicQueryMetadata), spMetadata.build(), filterMetadata.build(), joinMetadata.build(), aggregateMetadata.build()); - } - //constructBuilder non-null in this case, but no need to check - else { - return new FluoQuery(constructBuilder.build(), Optional.fromNullable(periodicQueryMetadata), spMetadata.build(), filterMetadata.build(), joinMetadata.build(), aggregateMetadata.build()); + if(constructBuilder != null) { + if(periodicQueryMetadata != null) { + throw new IllegalArgumentException("Queries containing sliding window filters and construct query patterns are not supported."); + } + return new FluoQuery(queryBuilder.build(), projectionMetadata.build(), Optional.of(constructBuilder.build()), Optional.fromNullable(periodicQueryMetadata), spMetadata.build(), filterMetadata.build(), joinMetadata.build(), aggregateMetadata.build()); + } else { + return new FluoQuery(queryBuilder.build(), projectionMetadata.build(), Optional.absent(), Optional.fromNullable(periodicQueryMetadata), spMetadata.build(), filterMetadata.build(), joinMetadata.build(), aggregateMetadata.build()); } } http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/e387818b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryColumns.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryColumns.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryColumns.java index ed18d49..8cd25d0 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryColumns.java +++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryColumns.java @@ -42,6 +42,20 @@ import edu.umd.cs.findbugs.annotations.NonNull; * <tr> <td>Node ID</td> <td>queryMetadata:variableOrder</td> <td>The Variable Order binding sets are emitted with.</td> </tr> * <tr> <td>Node ID</td> <td>queryMetadata:sparql</td> <td>The original SPARQL query that is being computed by this query.</td> </tr> * <tr> <td>Node ID</td> <td>queryMetadata:childNodeId</td> <td>The Node ID of the child who feeds this node.</td> </tr> + * <tr> <td>Node ID</td> <td>queryMetadata:queryType</td> <td>The {@link QueryType} of this query.</td> </tr> + * <tr> <td>Node ID</td> <td>queryMetadata:exportStrategies</td> <td>Strategies for exporting results from Rya Fluo app</td> </tr> + * <tr> <td>Node ID + DELIM + Binding Set String</td> <td>queryMetadata:bindingSet</td> <td>A {@link VisibilityBindingSet} object.</td> </tr> + * </table> + * </p> + * <p> + * <b>Projection Metadata</b> + * <table border="1" style="width:100%"> + * <tr> <th>Fluo Row</td> <th>Fluo Column</td> <th>Fluo Value</td> </tr> + * <tr> <td>Node ID</td> <td>projectionMetadata:nodeId</td> <td>The Node ID of the Query.</td> </tr> + * <tr> <td>Node ID</td> <td>projectionMetadata:projectedVars</td> <td>The variables that results are projected onto.</td> </tr>* + * <tr> <td>Node ID</td> <td>projectionMetadata:variableOrder</td> <td>The Variable Order that Binding values are written in in the Row to identify solutions.</td> </tr> + * <tr> <td>Node ID</td> <td>projectionMetadata:childNodeId</td> <td>The Node ID of the child who feeds this node.</td> </tr> + * <tr> <td>Node ID</td> <td>projectionMetadata:parentNodeId</td> <td>The Node ID of the parent of this node.</td> </tr> * <tr> <td>Node ID + DELIM + Binding Set String</td> <td>queryMetadata:bindingSet</td> <td>A {@link VisibilityBindingSet} object.</td> </tr> * </table> * </p> @@ -50,11 +64,11 @@ import edu.umd.cs.findbugs.annotations.NonNull; * <table border="1" style="width:100%"> * <tr> <th>Fluo Row</td> <th>Fluo Column</td> <th>Fluo Value</td> </tr> * <tr> <td>Node ID</td> <td>constructMetadata:nodeId</td> <td>The Node ID of the Query.</td> </tr> - * <tr> <td>Node ID</td> <td>constructMetadata:sparql</td> <td>The original SPARQL query that is being computed by this query.</td> </tr> * <tr> <td>Node ID</td> <td>constructMetadata:variableOrder</td> <td>The Variable Order binding sets are emitted with.</td> </tr> * <tr> <td>Node ID</td> <td>constructMetadata:graph</td> <td>The construct graph used to project BindingSets to statements.</td> </tr> * <tr> <td>Node ID</td> <td>constructMetadata:childNodeId</td> <td>The Node ID of the child who feeds this node.</td> </tr> - * <tr> <td>Node ID</td> <td>constructMetadata:statements</td> <td>The RDF statements produced by this construct query node.</td> </tr> + * <tr> <td>Node ID</td> <td>constructMetadata:parentNodeId</td> <td>The Node ID of the parent that this node feeds.</td> </tr> + * <tr> <td>Node ID + DELIM + Binding Set String</td> <td>constructMetadata:statements</td> <td>The RDF statements produced by this construct query node.</td> </tr> * </table> * </p> * <p> @@ -131,6 +145,7 @@ public class FluoQueryColumns { public static final String STATEMENT_PATTERN_METADATA_CF = "statementPatternMetadata"; public static final String AGGREGATION_METADATA_CF = "aggregationMetadata"; public static final String CONSTRUCT_METADATA_CF = "constructMetadata"; + public static final String PROJECTION_METADATA_CF = "projectionMetadata"; public static final String PERIODIC_QUERY_METADATA_CF = "periodicQueryMetadata"; /** @@ -178,14 +193,24 @@ public class FluoQueryColumns { public static final Column QUERY_SPARQL = new Column(QUERY_METADATA_CF, "sparql"); public static final Column QUERY_CHILD_NODE_ID = new Column(QUERY_METADATA_CF, "childNodeId"); public static final Column QUERY_BINDING_SET = new Column(QUERY_METADATA_CF, "bindingSet"); + public static final Column QUERY_EXPORT_STRATEGIES = new Column(QUERY_METADATA_CF, "exportStrategies"); + public static final Column QUERY_TYPE = new Column(QUERY_METADATA_CF, "queryType"); + + // Query Metadata columns. + public static final Column PROJECTION_NODE_ID = new Column(PROJECTION_METADATA_CF, "nodeId"); + public static final Column PROJECTION_PROJECTED_VARS = new Column(PROJECTION_METADATA_CF, "projectedVars"); + public static final Column PROJECTION_VARIABLE_ORDER = new Column(PROJECTION_METADATA_CF, "variableOrder"); + public static final Column PROJECTION_CHILD_NODE_ID = new Column(PROJECTION_METADATA_CF, "childNodeId"); + public static final Column PROJECTION_PARENT_NODE_ID = new Column(PROJECTION_METADATA_CF, "parentNodeId"); + public static final Column PROJECTION_BINDING_SET = new Column(PROJECTION_METADATA_CF, "bindingSet"); // Construct Query Metadata columns. public static final Column CONSTRUCT_NODE_ID = new Column(CONSTRUCT_METADATA_CF, "nodeId"); public static final Column CONSTRUCT_VARIABLE_ORDER = new Column(CONSTRUCT_METADATA_CF, "variableOrder"); public static final Column CONSTRUCT_GRAPH = new Column(CONSTRUCT_METADATA_CF, "graph"); public static final Column CONSTRUCT_CHILD_NODE_ID = new Column(CONSTRUCT_METADATA_CF, "childNodeId"); + public static final Column CONSTRUCT_PARENT_NODE_ID = new Column(CONSTRUCT_METADATA_CF, "parentNodeId"); public static final Column CONSTRUCT_STATEMENTS = new Column(CONSTRUCT_METADATA_CF, "statements"); - public static final Column CONSTRUCT_SPARQL = new Column(CONSTRUCT_METADATA_CF, "sparql"); // Filter Metadata columns. public static final Column FILTER_NODE_ID = new Column(FILTER_METADATA_CF, "nodeId"); @@ -256,8 +281,20 @@ public class FluoQueryColumns { Arrays.asList(QUERY_NODE_ID, QUERY_VARIABLE_ORDER, QUERY_SPARQL, + QUERY_TYPE, + QUERY_EXPORT_STRATEGIES, QUERY_CHILD_NODE_ID)), + /** + * The columns a {@link ProjectionMetadata} object's fields are stored within. + */ + PROJECTION_COLUMNS( + Arrays.asList(PROJECTION_NODE_ID, + PROJECTION_PROJECTED_VARS, + PROJECTION_VARIABLE_ORDER, + PROJECTION_PARENT_NODE_ID, + PROJECTION_CHILD_NODE_ID)), + /** * The columns a {@link PeriodicBinMetadata} object's fields are stored within. @@ -280,7 +317,7 @@ public class FluoQueryColumns { CONSTRUCT_VARIABLE_ORDER, CONSTRUCT_GRAPH, CONSTRUCT_CHILD_NODE_ID, - CONSTRUCT_SPARQL, + CONSTRUCT_PARENT_NODE_ID, CONSTRUCT_STATEMENTS)), http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/e387818b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryMetadataDAO.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryMetadataDAO.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryMetadataDAO.java index 8675b80..5ba7383 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryMetadataDAO.java +++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryMetadataDAO.java @@ -25,7 +25,9 @@ import java.io.IOException; import java.io.ObjectInputStream; import java.io.ObjectOutputStream; import java.util.Collection; +import java.util.HashSet; import java.util.Map; +import java.util.Set; import java.util.concurrent.TimeUnit; import org.apache.fluo.api.client.SnapshotBase; @@ -34,6 +36,9 @@ import org.apache.fluo.api.data.Bytes; import org.apache.fluo.api.data.Column; import org.apache.rya.indexing.pcj.fluo.app.ConstructGraph; import org.apache.rya.indexing.pcj.fluo.app.ConstructGraphSerializer; +import org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants; +import org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.ExportStrategy; +import org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.QueryType; import org.apache.rya.indexing.pcj.fluo.app.NodeType; import org.apache.rya.indexing.pcj.fluo.app.query.AggregationMetadata.AggregationElement; import org.apache.rya.indexing.pcj.fluo.app.query.JoinMetadata.JoinType; @@ -42,7 +47,6 @@ import org.apache.rya.indexing.pcj.storage.accumulo.VariableOrder; import com.google.common.base.Charsets; import com.google.common.base.Joiner; import com.google.common.base.Optional; -import com.google.common.base.Preconditions; import edu.umd.cs.findbugs.annotations.DefaultAnnotation; import edu.umd.cs.findbugs.annotations.NonNull; @@ -64,10 +68,14 @@ public class FluoQueryMetadataDAO { requireNonNull(tx); requireNonNull(metadata); + Joiner joiner = Joiner.on(IncrementalUpdateConstants.VAR_DELIM); + final String rowId = metadata.getNodeId(); tx.set(rowId, FluoQueryColumns.QUERY_NODE_ID, rowId); tx.set(rowId, FluoQueryColumns.QUERY_VARIABLE_ORDER, metadata.getVariableOrder().toString()); tx.set(rowId, FluoQueryColumns.QUERY_SPARQL, metadata.getSparql() ); + tx.set(rowId, FluoQueryColumns.QUERY_EXPORT_STRATEGIES, joiner.join(metadata.getExportStrategies())); + tx.set(rowId, FluoQueryColumns.QUERY_TYPE, metadata.getQueryType().toString()); tx.set(rowId, FluoQueryColumns.QUERY_CHILD_NODE_ID, metadata.getChildNodeId() ); } @@ -91,6 +99,8 @@ public class FluoQueryMetadataDAO { final Map<Column, String> values = sx.gets(rowId, FluoQueryColumns.QUERY_VARIABLE_ORDER, FluoQueryColumns.QUERY_SPARQL, + FluoQueryColumns.QUERY_TYPE, + FluoQueryColumns.QUERY_EXPORT_STRATEGIES, FluoQueryColumns.QUERY_CHILD_NODE_ID); // Return an object holding them. @@ -99,13 +109,81 @@ public class FluoQueryMetadataDAO { final String sparql = values.get(FluoQueryColumns.QUERY_SPARQL); final String childNodeId = values.get(FluoQueryColumns.QUERY_CHILD_NODE_ID); + final String queryType = values.get(FluoQueryColumns.QUERY_TYPE); + final String[] exportStrategies = values.get(FluoQueryColumns.QUERY_EXPORT_STRATEGIES).split(IncrementalUpdateConstants.VAR_DELIM); + + Set<ExportStrategy> strategies = new HashSet<>(); + for(String strategy: exportStrategies) { + strategies.add(ExportStrategy.valueOf(strategy)); + } return QueryMetadata.builder(nodeId) - .setVariableOrder( varOrder ) + .setVarOrder( varOrder ) .setSparql( sparql ) + .setExportStrategies(strategies) + .setQueryType(QueryType.valueOf(queryType)) .setChildNodeId( childNodeId ); } + + + /** + * Write an instance of {@link ProjectionMetadata} to the Fluo table. + * + * @param tx - The transaction that will be used to commit the metadata. (not null) + * @param metadata - The Query node metadata that will be written to the table. (not null) + */ + public void write(final TransactionBase tx, final ProjectionMetadata metadata) { + requireNonNull(tx); + requireNonNull(metadata); + + final String rowId = metadata.getNodeId(); + tx.set(rowId, FluoQueryColumns.PROJECTION_NODE_ID, rowId); + tx.set(rowId, FluoQueryColumns.PROJECTION_VARIABLE_ORDER, metadata.getVariableOrder().toString()); + tx.set(rowId, FluoQueryColumns.PROJECTION_PROJECTED_VARS, metadata.getProjectedVars().toString()); + tx.set(rowId, FluoQueryColumns.PROJECTION_PARENT_NODE_ID, metadata.getParentNodeId()); + tx.set(rowId, FluoQueryColumns.PROJECTION_CHILD_NODE_ID, metadata.getChildNodeId() ); + } + + /** + * Read an instance of {@link ProjectionMetadata} from the Fluo table. + * + * @param sx - The snapshot that will be used to read the metadata . (not null) + * @param nodeId - The nodeId of the Projection node that will be read. (not null) + * @return The {@link ProjectionMetadata} that was read from the table. + */ + public ProjectionMetadata readProjectionMetadata(final SnapshotBase sx, final String nodeId) { + return readProjectionMetadataBuilder(sx, nodeId).build(); + } + + private ProjectionMetadata.Builder readProjectionMetadataBuilder(final SnapshotBase sx, final String nodeId) { + requireNonNull(sx); + requireNonNull(nodeId); + + // Fetch the values from the Fluo table. + final String rowId = nodeId; + final Map<Column, String> values = sx.gets(rowId, + FluoQueryColumns.PROJECTION_VARIABLE_ORDER, + FluoQueryColumns.PROJECTION_PROJECTED_VARS, + FluoQueryColumns.PROJECTION_PARENT_NODE_ID, + FluoQueryColumns.PROJECTION_CHILD_NODE_ID); + // Return an object holding them. + final String varOrderString = values.get(FluoQueryColumns.PROJECTION_VARIABLE_ORDER); + final String projectedVarString = values.get(FluoQueryColumns.PROJECTION_PROJECTED_VARS); + final VariableOrder varOrder = new VariableOrder(varOrderString); + final VariableOrder projectedVars = new VariableOrder(projectedVarString); + final String childNodeId = values.get(FluoQueryColumns.PROJECTION_CHILD_NODE_ID); + final String parentNodeId = values.get(FluoQueryColumns.PROJECTION_PARENT_NODE_ID); + + + return ProjectionMetadata.builder(nodeId) + .setVarOrder( varOrder ) + .setProjectedVars(projectedVars) + .setParentNodeId(parentNodeId) + .setChildNodeId( childNodeId ); + } + + /** * Write an instance of {@link ConstructQueryMetadata} to the Fluo table. * @@ -120,7 +198,7 @@ public class FluoQueryMetadataDAO { tx.set(rowId, FluoQueryColumns.CONSTRUCT_NODE_ID, rowId); tx.set(rowId, FluoQueryColumns.CONSTRUCT_VARIABLE_ORDER, metadata.getVariableOrder().toString()); tx.set(rowId, FluoQueryColumns.CONSTRUCT_CHILD_NODE_ID, metadata.getChildNodeId() ); - tx.set(rowId, FluoQueryColumns.CONSTRUCT_SPARQL, metadata.getSparql()); + tx.set(rowId, FluoQueryColumns.CONSTRUCT_PARENT_NODE_ID, metadata.getParentNodeId() ); tx.set(rowId, FluoQueryColumns.CONSTRUCT_GRAPH, ConstructGraphSerializer.toConstructString(metadata.getConstructGraph())); } @@ -143,18 +221,22 @@ public class FluoQueryMetadataDAO { final String rowId = nodeId; final Map<Column, String> values = sx.gets(rowId, FluoQueryColumns.CONSTRUCT_GRAPH, - FluoQueryColumns.CONSTRUCT_SPARQL, - FluoQueryColumns.CONSTRUCT_CHILD_NODE_ID); + FluoQueryColumns.CONSTRUCT_CHILD_NODE_ID, + FluoQueryColumns.CONSTRUCT_PARENT_NODE_ID, + FluoQueryColumns.CONSTRUCT_VARIABLE_ORDER); final String graphString = values.get(FluoQueryColumns.CONSTRUCT_GRAPH); final ConstructGraph graph = ConstructGraphSerializer.toConstructGraph(graphString); final String childNodeId = values.get(FluoQueryColumns.CONSTRUCT_CHILD_NODE_ID); - final String sparql = values.get(FluoQueryColumns.CONSTRUCT_SPARQL); + final String parentNodeId = values.get(FluoQueryColumns.CONSTRUCT_PARENT_NODE_ID); + final String varOrderString = values.get(FluoQueryColumns.CONSTRUCT_VARIABLE_ORDER); + return ConstructQueryMetadata.builder() .setNodeId(nodeId) + .setParentNodeId(parentNodeId) .setConstructGraph(graph) - .setSparql(sparql) + .setVarOrder(new VariableOrder(varOrderString)) .setChildNodeId(childNodeId); } @@ -342,7 +424,7 @@ public class FluoQueryMetadataDAO { final String rightChildNodeId = values.get(FluoQueryColumns.JOIN_RIGHT_CHILD_NODE_ID); return JoinMetadata.builder(nodeId) - .setVariableOrder(varOrder) + .setVarOrder(varOrder) .setJoinType(joinType) .setParentNodeId(parentNodeId) .setLeftChildNodeId(leftChildNodeId) @@ -477,7 +559,7 @@ public class FluoQueryMetadataDAO { } final AggregationMetadata.Builder builder = AggregationMetadata.builder(nodeId) - .setVariableOrder(varOrder) + .setVarOrder(varOrder) .setParentNodeId(parentNodeId) .setChildNodeId(childNodeId) .setGroupByVariableOrder(groupByVars); @@ -498,27 +580,28 @@ public class FluoQueryMetadataDAO { public void write(final TransactionBase tx, final FluoQuery query) { requireNonNull(tx); requireNonNull(query); + + QueryMetadata queryMetadata = query.getQueryMetadata(); + final String sparql = queryMetadata.getSparql(); + final String queryId = queryMetadata.getNodeId(); + final String pcjId = queryMetadata.getExportId(); + + // The results of the query are eventually exported to an instance + // of Rya, so store the Rya ID for the PCJ. + tx.set(queryId, FluoQueryColumns.RYA_PCJ_ID, pcjId); + tx.set(pcjId, FluoQueryColumns.PCJ_ID_QUERY_ID, queryId); + tx.set(Bytes.of(sparql), FluoQueryColumns.QUERY_ID, Bytes.of(queryId)); + write(tx, queryMetadata); // Write the rest of the metadata objects. - switch (query.getQueryType()) { - case Construct: + + if (query.getQueryType() == QueryType.Construct) { ConstructQueryMetadata constructMetadata = query.getConstructQueryMetadata().get(); - // Store the Query ID so that it may be looked up from the original - // SPARQL string. - final String constructSparql = constructMetadata.getSparql(); - final String constructQueryId = constructMetadata.getNodeId(); - tx.set(Bytes.of(constructSparql), FluoQueryColumns.QUERY_ID, Bytes.of(constructQueryId)); write(tx, constructMetadata); - break; - case Projection: - QueryMetadata queryMetadata = query.getQueryMetadata().get(); - // Store the Query ID so that it may be looked up from the original - // SPARQL string. - final String sparql = queryMetadata.getSparql(); - final String queryId = queryMetadata.getNodeId(); - tx.set(Bytes.of(sparql), FluoQueryColumns.QUERY_ID, Bytes.of(queryId)); - write(tx, queryMetadata); - break; + } + + for(final ProjectionMetadata projection : query.getProjectionMetadata()) { + write(tx, projection); } Optional<PeriodicQueryMetadata> periodicMetadata = query.getPeriodicQueryMetadata(); @@ -569,16 +652,23 @@ public class FluoQueryMetadataDAO { case QUERY: // Add this node's metadata. final QueryMetadata.Builder queryBuilder = readQueryMetadataBuilder(sx, childNodeId); - Preconditions.checkArgument(!builder.getQueryBuilder().isPresent()); builder.setQueryMetadata(queryBuilder); // Add it's child's metadata. addChildMetadata(sx, builder, queryBuilder.build().getChildNodeId()); break; - + + case PROJECTION: + //Add this node's metadata + final ProjectionMetadata.Builder projectionBuilder = readProjectionMetadataBuilder(sx, childNodeId); + builder.addProjectionBuilder(projectionBuilder); + + //Add it's child's metadata + addChildMetadata(sx, builder, projectionBuilder.build().getChildNodeId()); + break; + case CONSTRUCT: final ConstructQueryMetadata.Builder constructBuilder = readConstructQueryMetadataBuilder(sx, childNodeId); - Preconditions.checkArgument(!builder.getQueryBuilder().isPresent()); builder.setConstructQueryMetadata(constructBuilder); // Add it's child's metadata. http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/e387818b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/JoinMetadata.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/JoinMetadata.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/JoinMetadata.java index 7bad9a7..aa79daf 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/JoinMetadata.java +++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/JoinMetadata.java @@ -163,7 +163,7 @@ public class JoinMetadata extends CommonNodeMetadata { * Builds instances of {@link JoinMetadata}. */ @DefaultAnnotation(NonNull.class) - public static final class Builder { + public static final class Builder implements CommonNodeMetadata.Builder { private final String nodeId; private VariableOrder varOrder; @@ -194,11 +194,16 @@ public class JoinMetadata extends CommonNodeMetadata { * @param varOrder - The variable order of the binding sets that are emitted by this node. * @return This builder so that method invocation could be chained. */ - public Builder setVariableOrder(@Nullable final VariableOrder varOrder) { + public Builder setVarOrder(@Nullable final VariableOrder varOrder) { this.varOrder = varOrder; return this; } + @Override + public VariableOrder getVariableOrder() { + return varOrder; + } + /** * Sets the node id of this node's parent. * @@ -242,6 +247,14 @@ public class JoinMetadata extends CommonNodeMetadata { this.rightChildNodeId = rightChildNodeId; return this; } + + public String getLeftChildNodeId() { + return leftChildNodeId; + } + + public String getRightChildNodeId() { + return rightChildNodeId; + } /** * @return An instance of {@link JoinMetadata} built using this builder's values. http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/e387818b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/PeriodicQueryMetadata.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/PeriodicQueryMetadata.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/PeriodicQueryMetadata.java index 33253f2..ae4b10e 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/PeriodicQueryMetadata.java +++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/PeriodicQueryMetadata.java @@ -166,7 +166,7 @@ public class PeriodicQueryMetadata extends CommonNodeMetadata { /** * Builder for chaining method calls to construct an instance of PeriodicQueryMetadata. */ - public static class Builder { + public static class Builder implements CommonNodeMetadata.Builder { private String nodeId; private VariableOrder varOrder; @@ -200,11 +200,12 @@ public class PeriodicQueryMetadata extends CommonNodeMetadata { return this; } + /** * Returns {@link VariableOrder} * @return VariableOrder that indicates order that results are written in */ - public VariableOrder getVarOrder() { + public VariableOrder getVariableOrder() { return varOrder; } @@ -235,6 +236,10 @@ public class PeriodicQueryMetadata extends CommonNodeMetadata { return this; } + public String getChildNodeId() { + return childNodeId; + } + /** * Sets window size for periodic query * @param windowSize
