http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/15ec5d5f/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/TripleObserver.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/TripleObserver.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/TripleObserver.java new file mode 100644 index 0000000..d43ffc9 --- /dev/null +++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/TripleObserver.java @@ -0,0 +1,135 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.indexing.pcj.fluo.app.observers; + +import static org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.DELIM; +import static org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.NODEID_BS_DELIM; +import static org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.SP_PREFIX; +import static org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.VAR_DELIM; + +import java.util.Map; +import java.util.Map.Entry; + +import org.apache.rya.indexing.pcj.fluo.app.IncUpdateDAO; +import org.apache.rya.indexing.pcj.fluo.app.StringTypeLayer; +import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns; +import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryMetadataDAO; +import org.apache.rya.indexing.pcj.fluo.app.query.StatementPatternMetadata; + +import com.google.common.collect.Maps; + +import io.fluo.api.config.ScannerConfiguration; +import io.fluo.api.data.Bytes; +import io.fluo.api.data.Column; +import io.fluo.api.data.Span; +import io.fluo.api.iterator.ColumnIterator; +import io.fluo.api.iterator.RowIterator; +import io.fluo.api.types.TypedObserver; +import io.fluo.api.types.TypedTransactionBase; + +/** + * An observer that matches new Triples to the Statement Patterns that are part + * of any PCJ that is being maintained. If the triple matches a pattern, then + * the new result is stored as a binding set for the pattern. + */ +public class TripleObserver extends TypedObserver { + + private final FluoQueryMetadataDAO queryDao = new FluoQueryMetadataDAO(); + + public TripleObserver() { + super(new StringTypeLayer()); + } + + @Override + public ObservedColumn getObservedColumn() { + return new ObservedColumn(FluoQueryColumns.TRIPLES, NotificationType.STRONG); + } + + @Override + public void process(final TypedTransactionBase tx, final Bytes row, final Column column) { + //get string representation of triple + final String triple = IncUpdateDAO.getTripleString(row); + + //get variable metadata for all SP in table + final ScannerConfiguration sc1 = new ScannerConfiguration(); + sc1.fetchColumn(FluoQueryColumns.STATEMENT_PATTERN_VARIABLE_ORDER.getFamily(), FluoQueryColumns.STATEMENT_PATTERN_VARIABLE_ORDER.getQualifier()); + sc1.setSpan(Span.prefix(SP_PREFIX)); + + //see if triple matches conditions of any of the SP + final RowIterator ri = tx.get(sc1); + + while(ri.hasNext()) { + + final Entry<Bytes, ColumnIterator> next = ri.next(); + final ColumnIterator ci = next.getValue(); + final String spID = next.getKey().toString(); + + final StatementPatternMetadata spMetadata = queryDao.readStatementPatternMetadata(tx, spID); + final String pattern = spMetadata.getStatementPattern(); + + while(ci.hasNext()) { + final String varOrders = ci.next().getValue().toString(); + final String bindingSet = getBindingSet(triple, pattern, varOrders); + if(bindingSet.length() != 0) { + tx.mutate().row(spID + NODEID_BS_DELIM + bindingSet).col(FluoQueryColumns.STATEMENT_PATTERN_BINDING_SET).set(bindingSet); + } + } + } + + // Once the triple has been handled, it may be deleted. + tx.delete(row, column); + } + + //determines whether triple matches SPID conditions and generates bindingset + //whose order is determined by varOrder + private static String getBindingSet(final String triple, final String spID, final String varOrder) { + final String[] spIdArray = spID.split(DELIM); + final String[] tripleArray = triple.split(DELIM); + final String[] varOrderArray = varOrder.split(VAR_DELIM); + final Map<String,String> varMap = Maps.newHashMap(); + + if(spIdArray.length != 3 || tripleArray.length != 3) { + throw new IllegalArgumentException("Invald number of components"); + } + + for(int i = 0; i < 3; i ++) { + + if(spIdArray[i].startsWith("-const-")) { + if(!spIdArray[i].substring(7).equals(tripleArray[i])) { + return ""; + } + } else{ + varMap.put(spIdArray[i], tripleArray[i]); + } + + } + + String bindingSet = ""; + + for (final String element : varOrderArray) { + if(bindingSet.length() == 0) { + bindingSet = varMap.get(element); + } else { + bindingSet = bindingSet + DELIM + varMap.get(element); + } + } + + return bindingSet; + } +} \ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/15ec5d5f/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/CommonNodeMetadata.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/CommonNodeMetadata.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/CommonNodeMetadata.java new file mode 100644 index 0000000..b559037 --- /dev/null +++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/CommonNodeMetadata.java @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.indexing.pcj.fluo.app.query; + +import static com.google.common.base.Preconditions.checkNotNull; + +import javax.annotation.ParametersAreNonnullByDefault; +import javax.annotation.concurrent.Immutable; + +import org.apache.commons.lang3.builder.EqualsBuilder; + +import com.google.common.base.Objects; + +import mvm.rya.indexing.external.tupleSet.PcjTables.VariableOrder; + +/** + * Metadata that is common to all nodes that are part of a query. + */ +@Immutable +@ParametersAreNonnullByDefault +public abstract class CommonNodeMetadata { + + private final String nodeId; + private final VariableOrder varOrder; + + /** + * Constructs an instance of {@link CommonNodeMetadata}. + * + * @param nodeId - The ID the Fluo app uses to reference this node. (not null) + * @param varOrder - The variable order of binding sets that are emitted by this node. (not null) + */ + public CommonNodeMetadata( + final String nodeId, + final VariableOrder varOrder) { + this.nodeId = checkNotNull(nodeId); + this.varOrder = checkNotNull(varOrder); + } + + /** + * @return The ID the Fluo app uses to reference this node. + */ + public String getNodeId() { + return nodeId; + } + + /** + * @return The variable order of binding sets that are emitted by this node. + */ + public VariableOrder getVariableOrder() { + return varOrder; + } + + @Override + public int hashCode() { + return Objects.hashCode( + nodeId, + varOrder); + } + + @Override + public boolean equals(final Object o) { + if(this == o) { + return true; + } + + if(o instanceof CommonNodeMetadata) { + final CommonNodeMetadata metadata = (CommonNodeMetadata)o; + return new EqualsBuilder() + .append(nodeId, metadata.nodeId) + .append(varOrder, metadata.varOrder) + .isEquals(); + } + + return false; + } + + @Override + public String toString() { + return new StringBuilder() + .append("CommonNodeMetadata { ") + .append(" Node ID: " + nodeId + "\n") + .append(" Variable Order: " + varOrder + "\n") + .append("}") + .toString(); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/15ec5d5f/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FilterMetadata.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FilterMetadata.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FilterMetadata.java new file mode 100644 index 0000000..0194eb5 --- /dev/null +++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FilterMetadata.java @@ -0,0 +1,257 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.indexing.pcj.fluo.app.query; + +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkNotNull; + +import javax.annotation.Nullable; +import javax.annotation.ParametersAreNonnullByDefault; +import javax.annotation.concurrent.Immutable; + +import org.apache.commons.lang3.builder.EqualsBuilder; + +import com.google.common.base.Objects; + +import mvm.rya.indexing.external.tupleSet.PcjTables.VariableOrder; + +/** + * Metadata that is specific to Filter nodes. + */ +@Immutable +@ParametersAreNonnullByDefault +public class FilterMetadata extends CommonNodeMetadata { + + private final String originalSparql; + private final int filterIndexWithinSparql; + private final String parentNodeId; + private final String childNodeId; + + /** + * Constructs an instance of {@link FilterMetadata}. + * + * @param nodeId - The ID the Fluo app uses to reference this node. (not null) + * @param varOrder - The variable order of binding sets that are emitted by this node. (not null) + * @param originalSparql - The original SPARQL query the filter is derived from. (not null) + * @param filterIndexWithinSparql - The index of the filter within the original SPARQL query + * that this node processes. (not null) + * @param parentNodeId - The node id of this node's parent. (not null) + * @param childNodeId - The node id of this node's child. (not null) + */ + public FilterMetadata( + final String nodeId, + final VariableOrder varOrder, + final String originalSparql, + final int filterIndexWithinSparql, + final String parentNodeId, + final String childNodeId) { + super(nodeId, varOrder); + this.originalSparql = checkNotNull(originalSparql); + checkArgument(filterIndexWithinSparql >= 0 , "filterIndexWithinSparql must be >= 0, was " + filterIndexWithinSparql); + this.filterIndexWithinSparql = filterIndexWithinSparql; + this.parentNodeId = checkNotNull(parentNodeId); + this.childNodeId = checkNotNull(childNodeId); + } + + /** + * @return The original SPARQL query the filter is derived from. + */ + public String getOriginalSparql() { + return originalSparql; + } + + /** + * @return The index of the filter within the original SPARQL query that + * this node processes. + */ + public int getFilterIndexWithinSparql() { + return filterIndexWithinSparql; + } + + /** + * @return The node id of this node's parent. + */ + public String getParentNodeId() { + return parentNodeId; + } + + /** + * @return The node whose results are being filtered. + */ + public String getChildNodeId() { + return childNodeId; + } + + @Override + public int hashCode() { + return Objects.hashCode( + super.getNodeId(), + super.getVariableOrder(), + originalSparql, + filterIndexWithinSparql, + parentNodeId, + childNodeId); + } + + @Override + public boolean equals(final Object o) { + if(this == o) { + return true; + } + + if(o instanceof FilterMetadata) { + if(super.equals(o)) { + final FilterMetadata filterMetadata = (FilterMetadata)o; + return new EqualsBuilder() + .append(originalSparql, filterMetadata.originalSparql) + .append(filterIndexWithinSparql, filterMetadata.filterIndexWithinSparql) + .append(parentNodeId, filterMetadata.parentNodeId) + .append(childNodeId, filterMetadata.childNodeId) + .isEquals(); + } + return false; + + } + + return false; + } + + @Override + public String toString() { + return new StringBuilder() + .append("Filter Metadata {\n") + .append(" Node ID: " + super.getNodeId() + "\n") + .append(" Variable Order: " + super.getVariableOrder() + "\n") + .append(" Parent Node ID: " + parentNodeId + "\n") + .append(" Child Node ID: " + childNodeId + "\n") + .append(" Original SPARQL: " + originalSparql + "\n") + .append(" Filter Index Within SPARQL: " + filterIndexWithinSparql + "\n") + .append("}") + .toString(); + } + + /** + * Creates a new {@link Builder} for this class. + * + * @param nodeId - The ID the Fluo app uses to reference this node. (not null) + * @return A new {@link Builder} for this class. + */ + public static Builder builder(final String nodeId) { + return new Builder(nodeId); + } + + /** + * Builds instances of {@link FilterMetadata}. + */ + @ParametersAreNonnullByDefault + public static final class Builder { + + private final String nodeId; + private VariableOrder varOrder; + private String originalSparql; + private int filterIndexWithinSparql; + private String parentNodeId; + private String childNodeId; + + /** + * Constructs an instance of {@link Builder}. + * + * @param nodeId - the ID the Fluo app uses to reference this node. + */ + public Builder(final String nodeId) { + this.nodeId = checkNotNull(nodeId); + } + + /** + * @return the ID the Fluo app uses to reference this node. + */ + public String getNodeId() { + return nodeId; + } + + /** + * Set the variable order of binding sets that are emitted by this node. + * + * @param varOrder - The variable order of binding sets that are emitted by this node. + * @return This builder so that method invocations may be chained. + */ + public Builder setVarOrder(@Nullable final VariableOrder varOrder) { + this.varOrder = varOrder; + return this; + } + + /** + * Set the original SPARQL query the filter is derived from. + * + * @param originalSparql - The original SPARQL query the filter is derived from. + * @return This builder so that method invocations may be chained. + */ + public Builder setOriginalSparql(final String originalSparql) { + this.originalSparql = originalSparql; + return this; + } + + /** + * Set the index of the filter within the original SPARQL query that this node processes. + * + * @param filterIndexWithinSparql - The index of the filter within the original + * SPARQL query that this node processes. + * @return This builder so that method invocations may be chained. + */ + public Builder setFilterIndexWithinSparql(final int filterIndexWithinSparql) { + this.filterIndexWithinSparql = filterIndexWithinSparql; + return this; + } + + /** + * Set the node ID of this node's parent. + * + * @param parentNodeId - The node ID of this node's parent. + * @return This builder so that method invocations may be chained. + */ + public Builder setParentNodeId(@Nullable final String parentNodeId) { + this.parentNodeId = parentNodeId; + return this; + } + + /** + * Set the node ID of this node's child. + * + * @param childNodeId - The node id of this node's child. + * @return This builder so that method invocations may be chained. + */ + public Builder setChildNodeId(@Nullable final String childNodeId) { + this.childNodeId = childNodeId; + return this; + } + + /** + * @return Returns an instance of {@link FilterMetadata} using this builder's values. + */ + public FilterMetadata build() { + return new FilterMetadata( + nodeId, + varOrder, + originalSparql, + filterIndexWithinSparql, + parentNodeId, + childNodeId); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/15ec5d5f/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQuery.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQuery.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQuery.java new file mode 100644 index 0000000..4282c99 --- /dev/null +++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQuery.java @@ -0,0 +1,318 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.indexing.pcj.fluo.app.query; + +import static com.google.common.base.Preconditions.checkNotNull; + +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; +import java.util.Map.Entry; + +import javax.annotation.Nullable; +import javax.annotation.ParametersAreNonnullByDefault; +import javax.annotation.concurrent.Immutable; + +import org.apache.commons.lang3.builder.EqualsBuilder; + +import com.google.common.base.Objects; +import com.google.common.base.Optional; +import com.google.common.collect.ImmutableMap; + +/** + * Metadata for every node of a query that is being updated by the Fluo application. + */ +@Immutable +@ParametersAreNonnullByDefault +public class FluoQuery { + + private final QueryMetadata queryMetadata; + private final ImmutableMap<String, StatementPatternMetadata> statementPatternMetadata; + private final ImmutableMap<String, FilterMetadata> filterMetadata; + private final ImmutableMap<String, JoinMetadata> joinMetadata; + + /** + * Constructs an instance of {@link FluoQuery}. Private because applications + * must use {@link Builder} instead. + * + * @param queryMetadata - The root node of a query that is updated in Fluo. (not null) + * @param statementPatternMetadata - A map from Node ID to Statement Pattern metadata as + * it is represented within the Fluo app. (not null) + * @param filterMetadata A map from Node ID to Filter metadata as it is represented + * within the Fluo app. (not null) + * @param joinMetadata A map from Node ID to Join metadata as it is represented + * within the Fluo app. (not null) + */ + private FluoQuery( + final QueryMetadata queryMetadata, + final ImmutableMap<String, StatementPatternMetadata> statementPatternMetadata, + final ImmutableMap<String, FilterMetadata> filterMetadata, + final ImmutableMap<String, JoinMetadata> joinMetadata) { + this.queryMetadata = checkNotNull(queryMetadata); + this.statementPatternMetadata = checkNotNull(statementPatternMetadata); + this.filterMetadata = checkNotNull(filterMetadata); + this.joinMetadata = checkNotNull(joinMetadata); + } + + /** + * @return Metadata about the root node of a query that is updated within the Fluo app. + */ + public QueryMetadata getQueryMetadata() { + return queryMetadata; + } + + /** + * Get a Statement Pattern node's metadata. + * + * @param nodeId - The node ID of the StatementPattern metadata you want. (not null) + * @return The StatementPattern metadata if it could be found; otherwise absent. + */ + public Optional<StatementPatternMetadata> getStatementPatternMetadata(final String nodeId) { + checkNotNull(nodeId); + return Optional.fromNullable( statementPatternMetadata.get(nodeId) ); + } + + /** + * @return All of the Statement Pattern metadata that is stored for the query. + */ + public Collection<StatementPatternMetadata> getStatementPatternMetadata() { + return statementPatternMetadata.values(); + } + + /** + * Get a Filter node's metadata. + * + * @param nodeId - The node ID of the Filter metadata you want. (not null) + * @return The Filter metadata if it could be found; otherwise absent. + */ + public Optional<FilterMetadata> getFilterMetadata(final String nodeId) { + checkNotNull(nodeId); + return Optional.fromNullable( filterMetadata.get(nodeId) ); + } + + /** + * @return All of the Filter metadata that is stored for the query. + */ + public Collection<FilterMetadata> getFilterMetadata() { + return filterMetadata.values(); + } + + /** + * Get a Join node's metadata. + * + * @param nodeId - The node ID of the Join metadata you want. (not null) + * @return The Join metadata if it could be found; otherwise absent. + */ + public Optional<JoinMetadata> getJoinMetadata(final String nodeId) { + checkNotNull(nodeId); + return Optional.fromNullable( joinMetadata.get(nodeId) ); + } + + /** + * @return All of the Join metadata that is stored for the query. + */ + public Collection<JoinMetadata> getJoinMetadata() { + return joinMetadata.values(); + } + + @Override + public int hashCode() { + return Objects.hashCode( + queryMetadata, + statementPatternMetadata, + filterMetadata, + joinMetadata); + } + + @Override + public boolean equals(final Object o) { + if(this == o) { + return true; + } + + if(o instanceof FluoQuery) { + final FluoQuery fluoQuery = (FluoQuery)o; + return new EqualsBuilder() + .append(queryMetadata, fluoQuery.queryMetadata) + .append(statementPatternMetadata, fluoQuery.statementPatternMetadata) + .append(filterMetadata, fluoQuery.filterMetadata) + .append(joinMetadata, fluoQuery.joinMetadata) + .isEquals(); + } + + return false; + } + + @Override + public String toString() { + final StringBuilder builder = new StringBuilder(); + + if(queryMetadata != null) { + builder.append( queryMetadata.toString() ); + builder.append("\n"); + } + + for(final FilterMetadata metadata : filterMetadata.values()) { + builder.append(metadata); + builder.append("\n"); + } + + for(final JoinMetadata metadata : joinMetadata.values()) { + builder.append(metadata.toString()); + builder.append("\n"); + } + + for(final StatementPatternMetadata metadata : statementPatternMetadata.values()) { + builder.append(metadata.toString()); + builder.append("\n"); + } + + return builder.toString(); + } + + /** + * @return A new {@link Builder} for this class. + */ + public static Builder builder() { + return new Builder(); + } + + /** + * Builds instances of {@link FluoQuery}. + */ + @ParametersAreNonnullByDefault + public static final class Builder { + + private QueryMetadata.Builder queryBuilder = null; + private final Map<String, StatementPatternMetadata.Builder> spBuilders = new HashMap<>(); + private final Map<String, FilterMetadata.Builder> filterBuilders = new HashMap<>(); + private final Map<String, JoinMetadata.Builder> joinBuilders = new HashMap<>(); + + /** + * Sets the {@link QueryMetadata.Builder} that is used by this builder. + * + * @param queryMetadata - The builder representing the query's results. + * @return This builder so that method invocation may be chained. + */ + public Builder setQueryMetadata(@Nullable final QueryMetadata.Builder queryMetadata) { + this.queryBuilder = queryMetadata; + return this; + } + + /** + * @return The Query metadata builder if one has been set. + */ + public Optional<QueryMetadata.Builder> getQueryBuilder() { + return Optional.fromNullable( queryBuilder ); + } + + /** + * Adds a new {@link StatementPatternMetadata.Builder} to this builder. + * + * @param spBuilder - A builder representing a specific Statement Pattern within the query. (not null) + * @return This builder so that method invocation may be chained. + */ + public Builder addStatementPatternBuilder(final StatementPatternMetadata.Builder spBuilder) { + checkNotNull(spBuilder); + spBuilders.put(spBuilder.getNodeId(), spBuilder); + return this; + } + + /** + * Get a Statement Pattern builder from this builder. + * + * @param nodeId - The Node ID the Statement Pattern builder was stored at. (not null) + * @return The builder that was stored at the node id if one was found. + */ + public Optional<StatementPatternMetadata.Builder> getStatementPatternBuilder(final String nodeId) { + checkNotNull(nodeId); + return Optional.fromNullable( spBuilders.get(nodeId) ); + } + + /** + * Adds a new {@link FilterMetadata.Builder} to this builder. + * + * @param filterBuilder - A builder representing a specific Filter within the query. (not null) + * @return This builder so that method invocation may be chained. + */ + public Builder addFilterMetadata(@Nullable final FilterMetadata.Builder filterBuilder) { + checkNotNull(filterBuilder); + this.filterBuilders.put(filterBuilder.getNodeId(), filterBuilder); + return this; + } + + /** + * Get a Filter builder from this builder. + * + * @param nodeId - The Node ID the Filter builder was stored at. (not null) + * @return The builder that was stored at the node id if one was found. + */ + public Optional<FilterMetadata.Builder> getFilterBuilder(final String nodeId) { + checkNotNull(nodeId); + return Optional.fromNullable( filterBuilders.get(nodeId) ); + } + + /** + * Adds a new {@link JoinMetadata.Builder} to this builder. + * + * @param joinBuilder - A builder representing a specific Join within the query. (not null) + * @return This builder so that method invocation may be chained. + */ + public Builder addJoinMetadata(@Nullable final JoinMetadata.Builder joinBuilder) { + checkNotNull(joinBuilder); + this.joinBuilders.put(joinBuilder.getNodeId(), joinBuilder); + return this; + } + + /** + * Get a Join builder from this builder. + * + * @param nodeId - The Node ID the Join builder was stored at. (not null) + * @return The builder that was stored at the node id if one was found. + */ + public Optional<JoinMetadata.Builder> getJoinBuilder(final String nodeId) { + checkNotNull(nodeId); + return Optional.fromNullable( joinBuilders.get(nodeId) ); + } + + /** + * @return Creates a {@link FluoQuery} using the values that have been supplied to this builder. + */ + public FluoQuery build() { + final QueryMetadata queryMetadata = queryBuilder.build(); + + final ImmutableMap.Builder<String, StatementPatternMetadata> spMetadata = ImmutableMap.builder(); + for(final Entry<String, StatementPatternMetadata.Builder> entry : spBuilders.entrySet()) { + spMetadata.put(entry.getKey(), entry.getValue().build()); + } + + final ImmutableMap.Builder<String, FilterMetadata> filterMetadata = ImmutableMap.builder(); + for(final Entry<String, FilterMetadata.Builder> entry : filterBuilders.entrySet()) { + filterMetadata.put(entry.getKey(), entry.getValue().build()); + } + + final ImmutableMap.Builder<String, JoinMetadata> joinMetadata = ImmutableMap.builder(); + for(final Entry<String, JoinMetadata.Builder> entry : joinBuilders.entrySet()) { + joinMetadata.put(entry.getKey(), entry.getValue().build()); + } + + return new FluoQuery(queryMetadata, spMetadata.build(), filterMetadata.build(), joinMetadata.build()); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/15ec5d5f/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryColumns.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryColumns.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryColumns.java new file mode 100644 index 0000000..3d766ed --- /dev/null +++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryColumns.java @@ -0,0 +1,142 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.indexing.pcj.fluo.app.query; + +import io.fluo.api.data.Column; + +/** + * Holds {@link Column} objects that represent where each piece of metadata is stored. + * <p> + * See the table bellow for information specific to each metadata model. + * <p> + * <b>Query Metadata</b> + * <table border="1" style="width:100%"> + * <tr> <th>Fluo Row</td> <th>Fluo Column</td> <th>Fluo Value</td> </tr> + * <tr> <td>Node ID</td> <td>queryMetadata:nodeId</td> <td>The Node ID of the Query.</td> </tr> + * <tr> <td>Node ID</td> <td>queryMetadata:variableOrder</td> <td>The Variable Order binding sets are emitted with.</td> </tr> + * <tr> <td>Node ID</td> <td>queryMetadata:sparql</td> <td>The original SPARQL query that is being computed by this query..</td> </tr> + * <tr> <td>Node ID</td> <td>queryMetadata:childNodeId</td> <td>The Node ID of the child who feeds this node.</td> </tr> + * <tr> <td>Node ID + DELIM + Binding Set String</td> <td>queryMetadata:bindingSet</td> <td>A Binding Set that matches the query.</td> </tr> + * </table> + * </p> + * <p> + * <b>Filter Metadata</b> + * <table border="1" style="width:100%"> + * <tr> <th>Fluo Row</td> <th>Fluo Column</td> <th>Fluo Value</td> </tr> + * <tr> <td>Node ID</td> <td>filterMetadata:nodeId</td> <td>The Node ID of the Filter.</td> </tr> + * <tr> <td>Node ID</td> <td>filterMetadata:veriableOrder</td> <td>The Variable Order binding sets are emitted with.</td> </tr> + * <tr> <td>Node ID</td> <td>filterMetadata:originalSparql</td> <td>The original SPRAQL query this filter was derived from.</td> </tr> + * <tr> <td>Node ID</td> <td>filterMetadata:filterIndexWithinSparql</td> <td>Indicates which filter within the original SPARQL query this represents.</td> </tr> + * <tr> <td>Node ID</td> <td>filterMetadata:parentNodeId</td> <td>The Node ID this filter emits Binding Sets to.</td> </tr> + * <tr> <td>Node ID</td> <td>filterMetadata:childNodeId</td> <td>The Node ID of the node that feeds this node Binding Sets.</td> </tr> + * <tr> <td>Node ID + DELIM + Binding set String </td> <td>filterMetadata:bindingSet</td> <td>A Binding Set that matches the Filter.</td> </tr> + * </table> + * <p> + * <p> + * <b>Join Metadata</b> + * <table border="1" style="width:100%"> + * <tr> <th>Fluo Row</td> <th>Fluo Column</td> <th>Fluo Value</td> </tr> + * <tr> <td>Node ID</td> <td>joinMetadata:nodeId</td> <td>The Node ID of the Join.</td> </tr> + * <tr> <td>Node ID</td> <td>joinMetadata:variableOrder</td> <td>The Variable Order binding sets are emitted with.</td> </tr> + * <tr> <td>Node ID</td> <td>joinMetadata:parentNodeId</td> <td>The Node ID this join emits Binding Sets to.</td> </tr> + * <tr> <td>Node ID</td> <td>joinMetadata:leftChildNodeId</td> <td>A Node ID of the node that feeds this node Binding Sets.</td> </tr> + * <tr> <td>Node ID</td> <td>joinMetadata:rightChildNodeId</td> <td>A Node ID of the node that feeds this node Binding Sets.</td> </tr> + * <tr> <td>Node ID + DELIM + Binding set String</td> <td>joinMetadata:bindingSet</td> <td>A Binding Set that matches the Join.</td> </tr> + * </table> + * <p> + * <p> + * <b>Statement Pattern Metadata</b> + * <table border="1" style="width:100%"> + * <tr> <th>Fluo Row</td> <th>Fluo Column</td> <th>Fluo Value</td> </tr> + * <tr> <td>Node ID</td> <td>statementPatternMetadata:nodeId</td> <td>The Node ID of the Statement Pattern.</td> </tr> + * <tr> <td>Node ID</td> <td>statementPatternMetadata:variableOrder</td> <td>The Variable Order binding sets are emitted with.</td> </tr> + * <tr> <td>Node ID</td> <td>statementPatternMetadata:pattern</td> <td>The pattern that defines which Statements will be matched.</td> </tr> + * <tr> <td>Node ID</td> <td>statementPatternMetadata:parentNodeId</td> <td>The Node ID this statement pattern emits Binding Sets to.</td> </tr> + * <tr> <td>Node ID + DELIM + Binding set String</td> <td>statementPatternMetadata:bindingSet</td> <td>A Binding Set that matches the Statement Pattern.</td> </tr> + * </table> + * <p> + */ +public class FluoQueryColumns { + + // Column families used to store query metadata. + public static final String QUERY_METADATA_CF = "queryMetadata"; + public static final String FILTER_METADATA_CF = "filterMetadata"; + public static final String JOIN_METADATA_CF = "joinMetadata"; + public static final String STATEMENT_PATTERN_METADATA_CF = "statementPatternMetadata"; + + /** + * New triples that have been added to Rya are written as a row in this + * column so that any queries that include them in their results will be + * updated. + * <p> + * <table border="1" style="width:100%"> + * <tr> <th>Fluo Row</td> <th>Fluo Column</td> <th>Fluo Value</td> </tr> + * <tr> <td>Core Rya SPO formatted triple</td> <td>triples:SPO</td> <td>empty</td> </tr> + * </table> + */ + public static final Column TRIPLES = new Column("triples", "SPO"); + + /** + * Stores the name of the Accumulo table the query's results will be stored. + * The table's structure is defined by Rya's. + * </p> + * <table border="1" style="width:100%"> + * <tr> <th>Fluo Row</td> <th>Fluo Column</td> <th>Fluo Value</td> </tr> + * <tr> <td>Query ID</td> <td>query:ryaExportTableName</td> + * <td>The name of the Accumulo table the results will be exported to using + * the Rya PCJ table structure.</td> </tr> + * </table> + */ + public static final Column QUERY_RYA_EXPORT_TABLE_NAME = new Column("query", "ryaExportTableName"); + + + // Sparql to Query ID used to list all queries that are in the system. + public static final Column QUERY_ID = new Column("sparql", "queryId"); + + // Query Metadata columns. + public static final Column QUERY_NODE_ID = new Column(QUERY_METADATA_CF, "nodeId"); + public static final Column QUERY_VARIABLE_ORDER = new Column(QUERY_METADATA_CF, "variableOrder"); + public static final Column QUERY_SPARQL = new Column(QUERY_METADATA_CF, "sparql"); + public static final Column QUERY_CHILD_NODE_ID = new Column(QUERY_METADATA_CF, "childNodeId"); + public static final Column QUERY_BINDING_SET = new Column(QUERY_METADATA_CF, "bindingSet"); + + // Filter Metadata columns. + public static final Column FILTER_NODE_ID = new Column(FILTER_METADATA_CF, "nodeId"); + public static final Column FILTER_VARIABLE_ORDER = new Column(FILTER_METADATA_CF, "veriableOrder"); + public static final Column FILTER_ORIGINAL_SPARQL = new Column(FILTER_METADATA_CF, "originalSparql"); + public static final Column FILTER_INDEX_WITHIN_SPARQL = new Column(FILTER_METADATA_CF, "filterIndexWithinSparql"); + public static final Column FILTER_PARENT_NODE_ID = new Column(FILTER_METADATA_CF, "parentNodeId"); + public static final Column FILTER_CHILD_NODE_ID = new Column(FILTER_METADATA_CF, "childNodeId"); + public static final Column FILTER_BINDING_SET = new Column(FILTER_METADATA_CF, "bindingSet"); + + // Join Metadata columns. + public static final Column JOIN_NODE_ID = new Column(JOIN_METADATA_CF, "nodeId"); + public static final Column JOIN_VARIABLE_ORDER = new Column(JOIN_METADATA_CF, "variableOrder"); + public static final Column JOIN_PARENT_NODE_ID = new Column(JOIN_METADATA_CF, "parentNodeId"); + public static final Column JOIN_LEFT_CHILD_NODE_ID = new Column(JOIN_METADATA_CF, "leftChildNodeId"); + public static final Column JOIN_RIGHT_CHILD_NODE_ID = new Column(JOIN_METADATA_CF, "rightChildNodeId"); + public static final Column JOIN_BINDING_SET = new Column(JOIN_METADATA_CF, "bindingSet"); + + // Statement Pattern Metadata columns. + public static final Column STATEMENT_PATTERN_NODE_ID = new Column(STATEMENT_PATTERN_METADATA_CF, "nodeId"); + public static final Column STATEMENT_PATTERN_VARIABLE_ORDER = new Column(STATEMENT_PATTERN_METADATA_CF, "variableOrder"); + public static final Column STATEMENT_PATTERN_PATTERN = new Column(STATEMENT_PATTERN_METADATA_CF, "pattern"); + public static final Column STATEMENT_PATTERN_PARENT_NODE_ID = new Column(STATEMENT_PATTERN_METADATA_CF, "parentNodeId"); + public static final Column STATEMENT_PATTERN_BINDING_SET = new Column(STATEMENT_PATTERN_METADATA_CF, "bindingSet"); +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/15ec5d5f/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryMetadataDAO.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryMetadataDAO.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryMetadataDAO.java new file mode 100644 index 0000000..0ab5f18 --- /dev/null +++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryMetadataDAO.java @@ -0,0 +1,358 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.indexing.pcj.fluo.app.query; + +import static com.google.common.base.Preconditions.checkNotNull; + +import java.util.Map; + +import javax.annotation.ParametersAreNonnullByDefault; + +import org.apache.rya.indexing.pcj.fluo.app.NodeType; + +import com.google.common.collect.Sets; + +import io.fluo.api.client.SnapshotBase; +import io.fluo.api.client.TransactionBase; +import io.fluo.api.data.Bytes; +import io.fluo.api.data.Column; +import io.fluo.api.types.Encoder; +import io.fluo.api.types.StringEncoder; +import mvm.rya.indexing.external.tupleSet.PcjTables.VariableOrder; + +/** + * Reads and writes {@link FluoQuery} instances and their components to/from + * a Fluo table. + */ +@ParametersAreNonnullByDefault +public class FluoQueryMetadataDAO { + + private static final Encoder encoder = new StringEncoder(); + + /** + * Write an instance of {@link QueryMetadata} to the Fluo table. + * + * @param tx - The transaction that will be used to commit the metadata. (not null) + * @param metadata - The Query node metadata that will be written to the table. (not null) + */ + public void write(final TransactionBase tx, final QueryMetadata metadata) { + checkNotNull(tx); + checkNotNull(metadata); + + final Bytes rowId = encoder.encode(metadata.getNodeId()); + tx.set(rowId, FluoQueryColumns.QUERY_NODE_ID, rowId); + tx.set(rowId, FluoQueryColumns.QUERY_VARIABLE_ORDER, encoder.encode( metadata.getVariableOrder().toString() )); + tx.set(rowId, FluoQueryColumns.QUERY_SPARQL, encoder.encode( metadata.getSparql() )); + tx.set(rowId, FluoQueryColumns.QUERY_CHILD_NODE_ID, encoder.encode( metadata.getChildNodeId() )); + } + + /** + * Read an instance of {@link QueryMetadata} from the Fluo table. + * + * @param sx - The snapshot that will be used to read the metadata . (not null) + * @param nodeId - The nodeId of the Query node that will be read. (not nul) + * @return The {@link QueryMetadata} that was read from table. + */ + public QueryMetadata readQueryMetadata(final SnapshotBase sx, final String nodeId) { + return readQueryMetadataBuilder(sx, nodeId).build(); + } + + private QueryMetadata.Builder readQueryMetadataBuilder(final SnapshotBase sx, final String nodeId) { + checkNotNull(sx); + checkNotNull(nodeId); + + // Fetch the values from the Fluo table. + final Bytes rowId = encoder.encode(nodeId); + final Map<Column, Bytes> values = sx.get(rowId, Sets.newHashSet( + FluoQueryColumns.QUERY_VARIABLE_ORDER, + FluoQueryColumns.QUERY_SPARQL, + FluoQueryColumns.QUERY_CHILD_NODE_ID)); + + // Return an object holding them. + final String varOrderString = encoder.decodeString( values.get(FluoQueryColumns.QUERY_VARIABLE_ORDER)); + final VariableOrder varOrder = new VariableOrder(varOrderString); + + final String sparql = encoder.decodeString( values.get(FluoQueryColumns.QUERY_SPARQL) ); + final String childNodeId = encoder.decodeString( values.get(FluoQueryColumns.QUERY_CHILD_NODE_ID) ); + + return QueryMetadata.builder(nodeId) + .setVariableOrder( varOrder ) + .setSparql( sparql ) + .setChildNodeId( childNodeId ); + } + + /** + * Write an instance of {@link FilterMetadata} to the Fluo table. + * + * @param tx - The transaction that will be used to commit the metadata. (not null) + * @param metadata - The Filter node metadata that will be written to the table. (not null) + */ + public void write(final TransactionBase tx, final FilterMetadata metadata) { + checkNotNull(tx); + checkNotNull(metadata); + + final Bytes rowId = encoder.encode(metadata.getNodeId()); + tx.set(rowId, FluoQueryColumns.FILTER_NODE_ID, rowId); + tx.set(rowId, FluoQueryColumns.FILTER_VARIABLE_ORDER, encoder.encode( metadata.getVariableOrder().toString() )); + tx.set(rowId, FluoQueryColumns.FILTER_ORIGINAL_SPARQL, encoder.encode( metadata.getOriginalSparql() )); + tx.set(rowId, FluoQueryColumns.FILTER_INDEX_WITHIN_SPARQL, encoder.encode( metadata.getFilterIndexWithinSparql() )); + tx.set(rowId, FluoQueryColumns.FILTER_PARENT_NODE_ID, encoder.encode( metadata.getParentNodeId() )); + tx.set(rowId, FluoQueryColumns.FILTER_CHILD_NODE_ID, encoder.encode( metadata.getChildNodeId() )); + } + + /** + * Read an instance of {@link FilterMetadata} from the Fluo table. + * + * @param sx - The snapshot that will be used to read the metadata. (not null) + * @param nodeId - The nodeId of the Filter node that will be read. (not nul) + * @return The {@link FilterMetadata} that was read from table. + */ + public FilterMetadata readFilterMetadata(final SnapshotBase sx, final String nodeId) { + return readFilterMetadataBuilder(sx, nodeId).build(); + } + + private FilterMetadata.Builder readFilterMetadataBuilder(final SnapshotBase sx, final String nodeId) { + checkNotNull(sx); + checkNotNull(nodeId); + + // Fetch the values from the Fluo table. + final Bytes rowId = encoder.encode(nodeId); + final Map<Column, Bytes> values = sx.get(rowId, Sets.newHashSet( + FluoQueryColumns.FILTER_VARIABLE_ORDER, + FluoQueryColumns.FILTER_ORIGINAL_SPARQL, + FluoQueryColumns.FILTER_INDEX_WITHIN_SPARQL, + FluoQueryColumns.FILTER_PARENT_NODE_ID, + FluoQueryColumns.FILTER_CHILD_NODE_ID)); + + // Return an object holding them. + final String varOrderString = encoder.decodeString( values.get(FluoQueryColumns.FILTER_VARIABLE_ORDER)); + final VariableOrder varOrder = new VariableOrder(varOrderString); + + final String originalSparql = encoder.decodeString( values.get(FluoQueryColumns.FILTER_ORIGINAL_SPARQL) ); + final int filterIndexWithinSparql = encoder.decodeInteger( values.get(FluoQueryColumns.FILTER_INDEX_WITHIN_SPARQL) ); + final String parentNodeId = encoder.decodeString( values.get(FluoQueryColumns.FILTER_PARENT_NODE_ID) ); + final String childNodeId = encoder.decodeString( values.get(FluoQueryColumns.FILTER_CHILD_NODE_ID) ); + + return FilterMetadata.builder(nodeId) + .setVarOrder(varOrder) + .setOriginalSparql(originalSparql) + .setFilterIndexWithinSparql(filterIndexWithinSparql) + .setParentNodeId(parentNodeId) + .setChildNodeId(childNodeId); + } + + /** + * Write an instance of {@link JoinMetadata} to the Fluo table. + * + * @param tx - The transaction that will be used to commit the metadata. (not null) + * @param metadata - The Join node metadata that will be written to the table. (not null) + */ + public void write(final TransactionBase tx, final JoinMetadata metadata) { + checkNotNull(tx); + checkNotNull(metadata); + + final Bytes rowId = encoder.encode(metadata.getNodeId()); + tx.set(rowId, FluoQueryColumns.JOIN_NODE_ID, rowId); + tx.set(rowId, FluoQueryColumns.JOIN_VARIABLE_ORDER, encoder.encode( metadata.getVariableOrder().toString() )); + tx.set(rowId, FluoQueryColumns.JOIN_PARENT_NODE_ID, encoder.encode( metadata.getParentNodeId() )); + tx.set(rowId, FluoQueryColumns.JOIN_LEFT_CHILD_NODE_ID, encoder.encode( metadata.getLeftChildNodeId() )); + tx.set(rowId, FluoQueryColumns.JOIN_RIGHT_CHILD_NODE_ID, encoder.encode( metadata.getRightChildNodeId() )); + } + + /** + * Read an instance of {@link JoinMetadata} from the Fluo table. + * + * @param sx - The snapshot that will be used to read the metadata. (not null) + * @param nodeId - The nodeId of the Join node that will be read. (not nul) + * @return The {@link JoinMetadata} that was read from table. + */ + public JoinMetadata readJoinMetadata(final SnapshotBase sx, final String nodeId) { + return readJoinMetadataBuilder(sx, nodeId).build(); + } + + private JoinMetadata.Builder readJoinMetadataBuilder(final SnapshotBase sx, final String nodeId) { + checkNotNull(sx); + checkNotNull(nodeId); + + // Fetch the values from the Fluo table. + final Bytes rowId = encoder.encode(nodeId); + final Map<Column, Bytes> values = sx.get(rowId, Sets.newHashSet( + FluoQueryColumns.JOIN_VARIABLE_ORDER, + FluoQueryColumns.JOIN_PARENT_NODE_ID, + FluoQueryColumns.JOIN_LEFT_CHILD_NODE_ID, + FluoQueryColumns.JOIN_RIGHT_CHILD_NODE_ID)); + + // Return an object holding them. + final String varOrderString = encoder.decodeString( values.get(FluoQueryColumns.JOIN_VARIABLE_ORDER)); + final VariableOrder varOrder = new VariableOrder(varOrderString); + + final String parentNodeId = encoder.decodeString( values.get(FluoQueryColumns.JOIN_PARENT_NODE_ID) ); + final String leftChildNodeId = encoder.decodeString( values.get(FluoQueryColumns.JOIN_LEFT_CHILD_NODE_ID) ); + final String rightChildNodeId = encoder.decodeString( values.get(FluoQueryColumns.JOIN_RIGHT_CHILD_NODE_ID) ); + + return JoinMetadata.builder(nodeId) + .setVariableOrder(varOrder) + .setParentNodeId(parentNodeId) + .setLeftChildNodeId(leftChildNodeId) + .setRightChildNodeId(rightChildNodeId); + } + + /** + * Write an instance of {@link StatementPatternMetadata} to the Fluo table. + * + * @param tx - The transaction that will be used to commit the metadata. (not null) + * @param metadata - The Statement Pattern node metadata that will be written to the table. (not null) + */ + public void write(final TransactionBase tx, final StatementPatternMetadata metadata) { + checkNotNull(tx); + checkNotNull(metadata); + + final Bytes rowId = encoder.encode(metadata.getNodeId()); + tx.set(rowId, FluoQueryColumns.STATEMENT_PATTERN_NODE_ID, rowId); + tx.set(rowId, FluoQueryColumns.STATEMENT_PATTERN_VARIABLE_ORDER, encoder.encode( metadata.getVariableOrder().toString() )); + tx.set(rowId, FluoQueryColumns.STATEMENT_PATTERN_PATTERN, encoder.encode( metadata.getStatementPattern() )); + tx.set(rowId, FluoQueryColumns.STATEMENT_PATTERN_PARENT_NODE_ID, encoder.encode( metadata.getParentNodeId() )); + } + + /** + * Read an instance of {@link StatementPatternMetadata} from the Fluo table. + * + * @param sx - The snapshot that will be used to read the metadata. (not null) + * @param nodeId - The nodeId of the Statement Pattern node that will be read. (not nul) + * @return The {@link StatementPatternMetadata} that was read from table. + */ + public StatementPatternMetadata readStatementPatternMetadata(final SnapshotBase sx, final String nodeId) { + return readStatementPatternMetadataBuilder(sx, nodeId).build(); + } + + private StatementPatternMetadata.Builder readStatementPatternMetadataBuilder(final SnapshotBase sx, final String nodeId) { + checkNotNull(sx); + checkNotNull(nodeId); + + // Fetch the values from the Fluo table. + final Bytes rowId = encoder.encode(nodeId); + final Map<Column, Bytes> values = sx.get(rowId, Sets.newHashSet( + FluoQueryColumns.STATEMENT_PATTERN_VARIABLE_ORDER, + FluoQueryColumns.STATEMENT_PATTERN_PATTERN, + FluoQueryColumns.STATEMENT_PATTERN_PARENT_NODE_ID)); + + // Return an object holding them. + final String varOrderString = encoder.decodeString( values.get(FluoQueryColumns.STATEMENT_PATTERN_VARIABLE_ORDER)); + final VariableOrder varOrder = new VariableOrder(varOrderString); + + final String pattern = encoder.decodeString( values.get(FluoQueryColumns.STATEMENT_PATTERN_PATTERN) ); + final String parentNodeId = encoder.decodeString( values.get(FluoQueryColumns.STATEMENT_PATTERN_PARENT_NODE_ID) ); + + return StatementPatternMetadata.builder(nodeId) + .setVarOrder(varOrder) + .setStatementPattern(pattern) + .setParentNodeId(parentNodeId); + } + + /** + * Write an instance of {@link FluoQuery} to the Fluo table. + * + * @param tx - The transaction that will be used to commit the metadata. (not null) + * @param query - The query metadata that will be written to the table. (not null) + */ + public void write(final TransactionBase tx, final FluoQuery query) { + checkNotNull(tx); + checkNotNull(query); + + // Store the Query ID so that it may be looked up from the original SPARQL string. + final String sparql = query.getQueryMetadata().getSparql(); + final String queryId = query.getQueryMetadata().getNodeId(); + tx.set(Bytes.of(sparql), FluoQueryColumns.QUERY_ID, Bytes.of(queryId)); + + // Write the rest of the metadata objects. + write(tx, query.getQueryMetadata()); + + for(final FilterMetadata filter : query.getFilterMetadata()) { + write(tx, filter); + } + + for(final JoinMetadata join : query.getJoinMetadata()) { + write(tx, join); + } + + for(final StatementPatternMetadata statementPattern : query.getStatementPatternMetadata()) { + write(tx, statementPattern); + } + } + + /** + * Read an instance of {@link FluoQuery} from the Fluo table. + * + * @param sx - The snapshot that will be used to read the metadata from the Fluo table. (not null) + * @param queryId - The ID of the query whose nodes will be read. (not null) + * @return The {@link FluoQuery} that was read from table. + */ + public FluoQuery readFluoQuery(final SnapshotBase sx, final String queryId) { + checkNotNull(sx); + checkNotNull(queryId); + + final FluoQuery.Builder fluoQueryBuilder = FluoQuery.builder(); + addChildMetadata(sx, fluoQueryBuilder, queryId); + return fluoQueryBuilder.build(); + } + + private void addChildMetadata(final SnapshotBase sx, final FluoQuery.Builder builder, final String childNodeId) { + checkNotNull(sx); + checkNotNull(builder); + checkNotNull(childNodeId); + + final NodeType childType = NodeType.fromNodeId(childNodeId).get(); + switch(childType) { + case QUERY: + // Add this node's metadata. + final QueryMetadata.Builder queryBuilder = readQueryMetadataBuilder(sx, childNodeId); + builder.setQueryMetadata(queryBuilder); + + // Add it's child's metadata. + addChildMetadata(sx, builder, queryBuilder.build().getChildNodeId()); + break; + + case JOIN: + // Add this node's metadata. + final JoinMetadata.Builder joinBuilder = readJoinMetadataBuilder(sx, childNodeId); + builder.addJoinMetadata(joinBuilder); + + // Add it's children's metadata. + final JoinMetadata joinMetadata = joinBuilder.build(); + addChildMetadata(sx, builder, joinMetadata.getLeftChildNodeId()); + addChildMetadata(sx, builder, joinMetadata.getRightChildNodeId()); + break; + + case FILTER: + // Add this node's metadata. + final FilterMetadata.Builder filterBuilder = readFilterMetadataBuilder(sx, childNodeId); + builder.addFilterMetadata(filterBuilder); + + // Add it's child's metadata. + addChildMetadata(sx, builder, filterBuilder.build().getChildNodeId()); + break; + + case STATEMENT_PATTERN: + // Add this node's metadata. + final StatementPatternMetadata.Builder spBuilder = readStatementPatternMetadataBuilder(sx, childNodeId); + builder.addStatementPatternBuilder(spBuilder); + break; + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/15ec5d5f/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/JoinMetadata.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/JoinMetadata.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/JoinMetadata.java new file mode 100644 index 0000000..2546972 --- /dev/null +++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/JoinMetadata.java @@ -0,0 +1,224 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.indexing.pcj.fluo.app.query; + +import static com.google.common.base.Preconditions.checkNotNull; + +import javax.annotation.Nullable; +import javax.annotation.ParametersAreNonnullByDefault; +import javax.annotation.concurrent.Immutable; + +import org.apache.commons.lang3.builder.EqualsBuilder; + +import com.google.common.base.Objects; + +import mvm.rya.indexing.external.tupleSet.PcjTables.VariableOrder; + +/** + * Metadata that is specific to Join nodes. + */ +@Immutable +@ParametersAreNonnullByDefault +public class JoinMetadata extends CommonNodeMetadata { + + private final String parentNodeId; + private final String leftChildNodeId; + private final String rightChildNodeId; + + /** + * Constructs an instance of {@link JoinMetadata}. + * + * @param nodeId - The ID the Fluo app uses to reference this node. (not null) + * @param varOrder - The variable order of binding sets that are emitted by this node. (not null) + * @param parentNodeId - The node id of this node's parent. (not null) + * @param leftChildNodeId - One of the nodes whose results are being joined. (not null) + * @param rightChildNodeId - The other node whose results are being joined. (not null) + */ + public JoinMetadata( + final String nodeId, + final VariableOrder varOrder, + final String parentNodeId, + final String leftChildNodeId, + final String rightChildNodeId) { + super(nodeId, varOrder); + this.parentNodeId = checkNotNull(parentNodeId); + this.leftChildNodeId = checkNotNull(leftChildNodeId); + this.rightChildNodeId = checkNotNull(rightChildNodeId); + } + + /** + * @return The node id of this node's parent. + */ + public String getParentNodeId() { + return parentNodeId; + } + + /** + * @return One of the nodes whose results are being joined. + */ + public String getLeftChildNodeId() { + return leftChildNodeId; + } + + /** + * @return The other node whose results are being joined. + */ + public String getRightChildNodeId() { + return rightChildNodeId; + } + + @Override + public int hashCode() { + return Objects.hashCode( + super.getNodeId(), + super.getVariableOrder(), + parentNodeId, + leftChildNodeId, + rightChildNodeId); + } + + @Override + public boolean equals(final Object o) { + if(o == this) { + return true; + } + + if(o instanceof JoinMetadata) { + if(super.equals(o)) { + final JoinMetadata joinMetadata = (JoinMetadata)o; + return new EqualsBuilder() + .append(parentNodeId, joinMetadata.parentNodeId) + .append(leftChildNodeId, joinMetadata.leftChildNodeId) + .append(rightChildNodeId, joinMetadata.rightChildNodeId) + .isEquals(); + } + return false; + } + + return false; + } + + @Override + public String toString() { + return new StringBuilder() + .append("Join Metadata {\n") + .append(" Node ID: " + super.getNodeId() + "\n") + .append(" Variable Order: " + super.getVariableOrder() + "\n") + .append(" Parent Node ID: " + parentNodeId + "\n") + .append(" Left Child Node ID: " + leftChildNodeId + "\n") + .append(" Right Child Node ID: " + rightChildNodeId + "\n") + .append("}") + .toString(); + } + + /** + * Creates a new {@link Builder} for this class. + * + * @param nodeId - The ID the Fluo app uses to reference this node. (not null) + * @return A new {@link Builder} for this class. + */ + public static Builder builder(final String nodeId) { + return new Builder(nodeId); + } + + /** + * Builds instances of {@link JoinMetadata}. + */ + @ParametersAreNonnullByDefault + public static final class Builder { + + private final String nodeId; + private VariableOrder varOrder; + private String parentNodeId; + private String leftChildNodeId; + private String rightChildNodeId; + + /** + * Constructs an instance of {@link Builder}. + * + * @param nodeId - The node ID associated with the Join node this builder makes. (not null) + */ + public Builder(final String nodeId) { + this.nodeId = checkNotNull(nodeId); + } + + /** + * @return The node ID associated with the Join node this builder makes. + */ + public String getNodeId() { + return nodeId; + } + + /** + * Sets the variable order of the binding sets that are emitted by this node. + * + * @param varOrder - The variable order of the binding sets that are emitted by this node. + * @return This builder so that method invocation could be chained. + */ + public Builder setVariableOrder(@Nullable final VariableOrder varOrder) { + this.varOrder = varOrder; + return this; + } + + /** + * Sets the node id of this node's parent. + * + * @param parentNodeId - The node id of this node's parent. + * @return This builder so that method invocation could be chained. + */ + public Builder setParentNodeId(@Nullable final String parentNodeId) { + this.parentNodeId = parentNodeId; + return this; + } + + /** + * Set one of the nodes whose results are being joined. + * + * @param leftChildNodeId - One of the nodes whose results are being joined. + * @return This builder so that method invocation could be chained. + */ + public Builder setLeftChildNodeId(@Nullable final String leftChildNodeId) { + this.leftChildNodeId = leftChildNodeId; + return this; + } + + /** + * Set the other node whose results are being joined. + * + * @param rightChildNodeId - The other node whose results are being joined. + * @return This builder so that method invocation could be chained. + */ + public Builder setRightChildNodeId(@Nullable final String rightChildNodeId) { + this.rightChildNodeId = rightChildNodeId; + return this; + } + + /** + * @return An instance of {@link JoinMetadata} built using this builder's values. + */ + public JoinMetadata build() { + return new JoinMetadata( + nodeId, + varOrder, + parentNodeId, + leftChildNodeId, + rightChildNodeId); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/15ec5d5f/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/QueryMetadata.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/QueryMetadata.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/QueryMetadata.java new file mode 100644 index 0000000..1815177 --- /dev/null +++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/QueryMetadata.java @@ -0,0 +1,186 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.indexing.pcj.fluo.app.query; + +import static com.google.common.base.Preconditions.checkNotNull; + +import javax.annotation.Nullable; +import javax.annotation.ParametersAreNonnullByDefault; +import javax.annotation.concurrent.Immutable; + +import org.apache.commons.lang3.builder.EqualsBuilder; + +import com.google.common.base.Objects; + +import mvm.rya.indexing.external.tupleSet.PcjTables.VariableOrder; + +/** + * Metadata that is specific to a Projection. + */ +@Immutable +@ParametersAreNonnullByDefault +public class QueryMetadata extends CommonNodeMetadata { + + private final String sparql; + private final String childNodeId; + + /** + * Constructs an instance of {@link QueryMetadata}. + * + * @param nodeId - The ID the Fluo app uses to reference this node. (not null) + * @param varOrder - The variable order of binding sets that are emitted by this node. (not null) + * @param sparql - The SPARQL query whose results are being updated by the Fluo app. (not null) + * @param childNodeId - The node whose results are projected to the query's SELECT variables. (not null) + */ + public QueryMetadata( + final String nodeId, + final VariableOrder varOrder, + final String sparql, + final String childNodeId) { + super(nodeId, varOrder); + this.sparql = checkNotNull(sparql); + this.childNodeId = checkNotNull(childNodeId); + } + + /** + * @return The SPARQL query whose results are being updated by the Fluo app. + */ + public String getSparql() { + return sparql; + } + + /** + * @return The node whose results are projected to the query's SELECT variables. + */ + public String getChildNodeId() { + return childNodeId; + } + + @Override + public int hashCode() { + return Objects.hashCode( + super.getNodeId(), + super.getVariableOrder(), + sparql, + childNodeId); + } + + @Override + public boolean equals(final Object o) { + if(this == o) { + return true; + } + + if(o instanceof QueryMetadata) { + if(super.equals(o)) { + final QueryMetadata queryMetadata = (QueryMetadata)o; + return new EqualsBuilder() + .append(sparql, queryMetadata.sparql) + .append(childNodeId, queryMetadata.childNodeId) + .isEquals(); + } + return false; + } + + return false; + } + + @Override + public String toString() { + return new StringBuilder() + .append("QueryMetadata {\n") + .append(" Node ID: " + super.getNodeId() + "\n") + .append(" Variable Order: " + super.getVariableOrder() + "\n") + .append(" Child Node ID: " + childNodeId + "\n") + .append(" SPARQL: " + sparql + "\n") + .append("}") + .toString(); + } + + /** + * Creates a new {@link Builder} for this class. + * + * @param nodeId - The ID the Fluo app uses to reference this node. (not null) + * @return A new {@link Builder} for this class. + */ + public static Builder builder(final String nodeId) { + return new Builder(nodeId); + } + + /** + * Builds instances of {@link QueryMetadata}. + */ + @ParametersAreNonnullByDefault + public static final class Builder { + + private final String nodeId; + private VariableOrder varOrder; + private String sparql; + private String childNodeId; + + /** + * Constructs an instance of {@link Builder}. + * + * @param nodeId - The ID the Fluo app uses to reference this node. (not null) + */ + public Builder(final String nodeId) { + this.nodeId = checkNotNull(nodeId); + } + + /** + * Set the variable order of binding sets that are emitted by this node. + * + * @param varOrder - The variable order of binding sets that are emitted by this node. + * @return This builder so that method invocations may be chained. + */ + public Builder setVariableOrder(@Nullable final VariableOrder varOrder) { + this.varOrder = varOrder; + return this; + } + + /** + * Set the SPARQL query whose results are being updated by the Fluo app. + * + * @param sparql - The SPARQL query whose results are being updated by the Fluo app. + * @return This builder so that method invocations may be chained. + */ + public Builder setSparql(@Nullable final String sparql) { + this.sparql = sparql; + return this; + } + + /** + * Set the node whose results are projected to the query's SELECT variables. + * + * @param childNodeId - The node whose results are projected to the query's SELECT variables. + * @return This builder so that method invocations may be chained. + */ + public Builder setChildNodeId(@Nullable final String childNodeId) { + this.childNodeId = childNodeId; + return this; + } + + /** + * @return An instance of {@link QueryMetadata} build using this builder's values. + */ + public QueryMetadata build() { + return new QueryMetadata(nodeId, varOrder, sparql, childNodeId); + } + } +} \ No newline at end of file