http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/15ec5d5f/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/IncUpdateDAO.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/IncUpdateDAO.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/IncUpdateDAO.java new file mode 100644 index 0000000..5cb9869 --- /dev/null +++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/IncUpdateDAO.java @@ -0,0 +1,232 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.indexing.pcj.fluo.app; + +import static com.google.common.base.Preconditions.checkNotNull; +import static org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.DELIM; +import static org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.TYPE_DELIM; +import static org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.URI_TYPE; + +import java.util.Map.Entry; + +import io.fluo.api.client.FluoClient; +import io.fluo.api.client.Snapshot; +import io.fluo.api.config.ScannerConfiguration; +import io.fluo.api.data.Bytes; +import io.fluo.api.data.Column; +import io.fluo.api.iterator.ColumnIterator; +import io.fluo.api.iterator.RowIterator; +import io.fluo.api.types.TypedTransaction; +import mvm.rya.api.RdfCloudTripleStoreConstants.TABLE_LAYOUT; +import mvm.rya.api.domain.RyaStatement; +import mvm.rya.api.resolver.triple.TripleRow; +import mvm.rya.api.resolver.triple.TripleRowResolverException; +import mvm.rya.api.resolver.triple.impl.WholeRowTripleResolver; + +public class IncUpdateDAO { + + private static final StringTypeLayer stl = new StringTypeLayer(); + private static final WholeRowTripleResolver tr = new WholeRowTripleResolver(); + + private static RyaStatement deserializeTriple(final Bytes row) { + final byte[] rowArray = row.toArray(); + + RyaStatement rs = null; + try { + rs = tr.deserialize(TABLE_LAYOUT.SPO, new TripleRow(rowArray, new byte[0], new byte[0])); + } catch (final TripleRowResolverException e) { + e.printStackTrace(); + } + + return rs; + } + + private static String getTripleString(final RyaStatement rs) { + final String subj = rs.getSubject().getData() + TYPE_DELIM + URI_TYPE; + final String pred = rs.getPredicate().getData() + TYPE_DELIM + URI_TYPE; + final String objData = rs.getObject().getData(); + final String objDataType = rs.getObject().getDataType().stringValue(); + + return subj + DELIM + pred + DELIM + objData + TYPE_DELIM + objDataType; + } + + public static String getTripleString(final Bytes row) { + return getTripleString(deserializeTriple(row)); + } + + /** + * Add a row, creating and closing a transaction. + * + * @param fluoClient - Creates connections to the Fluo table that will be written to. (not null) + * @param row - The Row ID. + * @param col - The Column. + * @param val - The value. + */ + public static void addRow(final FluoClient fluoClient, final String row, final Column col, final String val) { + checkNotNull(fluoClient); + try (TypedTransaction tx = stl.wrap(fluoClient.newTransaction())) { + addRow(tx, row, col, val); + tx.commit(); + } + } + + /** + * Writes an entry to the Fluo table. + * + * @param tx - The transaction that will be used. (not null) + * @param row - The Row ID. + * @param col - The Column. + * @param val - The value. + */ + public static void addRow(final TypedTransaction tx, final String row, final Column col, final String val) { + checkNotNull(tx); + tx.mutate().row(row).col(col).set(val); + } + + /** + * Print all statements in the repo for demo and diagnostic purposes. + * @param fluoClient + * @throws Exception + */ + public static void printTriples(final FluoClient fluoClient) throws Exception { + try (Snapshot snapshot = fluoClient.newSnapshot()) { + final ScannerConfiguration scanConfig = new ScannerConfiguration(); + scanConfig.fetchColumn(Bytes.of("triples"), Bytes.of("SPO")); + + final RowIterator rowIter = snapshot.get(scanConfig); + + while (rowIter.hasNext()) { + final Entry<Bytes, ColumnIterator> row = rowIter.next(); + System.out.println("Triple: " + row.getKey().toString()); + } + } + } + +// /** +// * Print all bindings for the given queries. For demo's and diagnostics. +// * @param fluoClient +// * @param queryNames +// * @throws Exception +// */ +// public static void printQueryResults(final FluoClient fluoClient, +// final Map<String, String> queryNames) throws Exception { +// try (Snapshot snapshot = fluoClient.newSnapshot(); +// TypedTransaction tx1 = stl.wrap(fluoClient.newTransaction())) { +// +// final ScannerConfiguration scanConfig = new ScannerConfiguration(); +// scanConfig.fetchColumn(Bytes.of("query"), Bytes.of("bindingSet")); +// +// final RowIterator rowIter = snapshot.get(scanConfig); +// String sparqlRow = ""; +// System.out.println("*********************************************************"); +// +// while (rowIter.hasNext()) { +// final Entry<Bytes, ColumnIterator> row = rowIter.next(); +// final String[] joinInfo = row.getKey().toString() +// .split(NODEID_BS_DELIM); +// final String sparql = joinInfo[0]; +// final String bs = joinInfo[1]; +// if (!sparqlRow.equals(sparql)) { +// sparqlRow = sparql; +// System.out.println(); +// System.out.println(); +// System.out.println(queryNames.get(sparqlRow) +// + " has bindings: "); +// System.out.println(); +// } +// +// final String variables = tx1.get().row(sparqlRow).col(NODE_VARS).toString(); +// final String[] vars = variables.split(";"); +// final String[] bsVals = bs.split(DELIM); +// System.out.print("Bindingset: "); +// for (int i = 0; i < vars.length; i++) { +// System.out.print(vars[i] + " = " + bsVals[i] + " "); +// } +// System.out.println(); +// +// } +// +// System.out.println("*********************************************************"); +// } +// } + + /** + * Print all rows in the Fluo table for diagnostics. + * @param fluoClient + * @throws Exception + */ + public static void printAll(final FluoClient fluoClient) throws Exception { + final String FORMAT = "%-30s | %-10s | %-10s | %-40s\n"; + System.out + .println("Printing all tables. Showing unprintable bytes and braces as {ff} and {{} and {}} where ff is the value in hexadecimal."); + System.out.format(FORMAT, "--Row--", "--Column Family--", + "--Column Qual--", "--Value--"); + // Use try with resource to ensure snapshot is closed. + try (Snapshot snapshot = fluoClient.newSnapshot()) { + final ScannerConfiguration scanConfig = new ScannerConfiguration(); + // scanConfig.setSpan(Span.prefix("word:")); + + final RowIterator rowIter = snapshot.get(scanConfig); + + while (rowIter.hasNext()) { + final Entry<Bytes, ColumnIterator> row = rowIter.next(); + final ColumnIterator colIter = row.getValue(); + while (colIter.hasNext()) { + final Entry<Column, Bytes> column = colIter.next(); + // System.out.println(row.getKey() + " " + + // column.getKey().getFamily()+ " " + + // column.getKey().getQualifier() ); + System.out.format(FORMAT, to_String(row.getKey()), + to_String(column.getKey().getFamily()), + to_String(column.getKey().getQualifier()), + to_String(column.getValue())); + } + } + } + } + + /** + * convert a non-utf8 to string and show unprintable bytes as {xx} where x + * is hex. + * + * @param value + * @return + */ + static String to_String(final Bytes bytes) { + return to_String(bytes.toArray()); + } + + static String to_String(final byte[] bytes) { + final StringBuilder sb = new StringBuilder(); + for (final byte b : bytes) { + if ((b > 0x7e) || (b < 32)) { + sb.append("{"); + sb.append(Integer.toHexString(b & 0xff)); // Lop off the sign extended ones. + sb.append("}"); + } else if (b == '{' || b == '}') { // Escape the literal braces. + sb.append("{"); + sb.append((char) b); + sb.append("}"); + } else { + sb.append((char) b); + } + } + return sb.toString(); + } +} \ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/15ec5d5f/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/IncrementalUpdateConstants.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/IncrementalUpdateConstants.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/IncrementalUpdateConstants.java new file mode 100644 index 0000000..84581ef --- /dev/null +++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/IncrementalUpdateConstants.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.indexing.pcj.fluo.app; + +public class IncrementalUpdateConstants { + + // String constants used to create more easily parsed patterns. + public static final String DELIM = ":::"; + public static final String VAR_DELIM = ";"; + public static final String NODEID_BS_DELIM = "<<:>>"; + public static final String JOIN_DELIM = "<:>J<:>"; + public static final String TYPE_DELIM = "<<~>>"; + + //to be used in construction of id for each node + public static final String SP_PREFIX = "STATEMENT_PATTERN"; + public static final String JOIN_PREFIX = "JOIN"; + public static final String FILTER_PREFIX = "FILTER"; + public static final String QUERY_PREFIX = "QUERY"; + + public static final String URI_TYPE = "http://www.w3.org/2001/XMLSchema#anyURI"; +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/15ec5d5f/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/JoinResultUpdater.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/JoinResultUpdater.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/JoinResultUpdater.java new file mode 100644 index 0000000..a25cb92 --- /dev/null +++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/JoinResultUpdater.java @@ -0,0 +1,314 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.indexing.pcj.fluo.app; + +import static com.google.common.base.Preconditions.checkNotNull; +import static org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.DELIM; +import static org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.NODEID_BS_DELIM; + +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.Map; + +import javax.annotation.ParametersAreNonnullByDefault; + +import org.apache.log4j.Logger; +import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns; +import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryMetadataDAO; +import org.apache.rya.indexing.pcj.fluo.app.query.JoinMetadata; +import org.openrdf.query.BindingSet; + +import com.google.common.base.Optional; +import com.google.common.collect.Maps; + +import io.fluo.api.client.TransactionBase; +import io.fluo.api.config.ScannerConfiguration; +import io.fluo.api.data.Bytes; +import io.fluo.api.data.Column; +import io.fluo.api.data.Span; +import io.fluo.api.iterator.ColumnIterator; +import io.fluo.api.iterator.RowIterator; +import io.fluo.api.types.Encoder; +import io.fluo.api.types.StringEncoder; +import mvm.rya.indexing.external.tupleSet.PcjTables.VariableOrder; + +/** + * Updates the results of a Join node when one of its children has added a + * new Binding Set to its results. + */ +@ParametersAreNonnullByDefault +public class JoinResultUpdater { + private static final Logger log = Logger.getLogger(JoinResultUpdater.class); + + private final FluoQueryMetadataDAO queryDao = new FluoQueryMetadataDAO(); + private final Encoder encoder = new StringEncoder(); + + /** + * Updates the results of a Join node when one of its children has added a + * new Binding Set to its results. + * + * @param tx - The transaction all Fluo queries will use. (not null) + * @param childId - The Node ID of the child whose results received a new Binding Set. (not null) + * @param childBindingSet - The Binding Set that was just emitted by child node. (not null) + * @param joinMetadata - The metadatat for the Join that has been notified. (not null) + */ + public void updateJoinResults( + final TransactionBase tx, + final String childId, + final BindingSet childBindingSet, + final JoinMetadata joinMetadata) { + checkNotNull(tx); + checkNotNull(childId); + checkNotNull(joinMetadata); + + // Read the Join metadata from the Fluo table. + final String[] joinVarOrder = joinMetadata.getVariableOrder().toArray(); + + // Transform the Child binding set and varaible order values to be easier to work with. + final VariableOrder childVarOrder = getVarOrder(tx, childId); + final String[] childVarOrderArray = childVarOrder.toArray(); + final String childBindingSetString = FluoStringConverter.toBindingSetString(childBindingSet, childVarOrder.toArray()); + final String[] childBindingStrings = FluoStringConverter.toBindingStrings(childBindingSetString); + + // Transform the Sibling binding set and varaible order values to be easier to work with. + final String leftChildId = joinMetadata.getLeftChildNodeId(); + final String rightChildId = joinMetadata.getRightChildNodeId(); + final String siblingId = leftChildId.equals(childId) ? rightChildId : leftChildId; + + final VariableOrder siblingVarOrder = getVarOrder(tx, siblingId); + final String[] siblingVarOrderArray = siblingVarOrder.toArray(); + + // Create a map that will be used later in this algorithm to create new Join result + // Binding Sets. It is initialized with all of the values that are in childBindingSet. + // The common values and any that are added on by the sibling will be overwritten + // for each sibling scan result. + final List<String> commonVars = getCommonVars(childVarOrder, siblingVarOrder); + final Map<String, String> joinBindingSet = Maps.newHashMap(); + for(int i = 0; i < childVarOrderArray.length; i++) { + joinBindingSet.put(childVarOrderArray[i], childBindingStrings[i]); + } + + // Create the prefix that will be used to scan for binding sets of the sibling node. + // This prefix includes the sibling Node ID and the common variable values from + // childBindingSet. + String bsPrefix = ""; + for(int i = 0; i < commonVars.size(); i++) { + if(bsPrefix.length() == 0) { + bsPrefix = childBindingStrings[i]; + } else { + bsPrefix += DELIM + childBindingStrings[i]; + } + } + bsPrefix = siblingId + NODEID_BS_DELIM + bsPrefix; + + // Scan the sibling node's binding sets for those that have the same + // common variable values as childBindingSet. These needs to be joined + // and inserted into the Join's results. It's possible that none of these + // results will be new Join results if they have already been created in + // earlier iterations of this algorithm. + final ScannerConfiguration sc1 = new ScannerConfiguration(); + sc1.setSpan(Span.prefix(bsPrefix)); + setScanColumnFamily(sc1, siblingId); + + try { + final RowIterator ri = tx.get(sc1); + while(ri.hasNext()) { + final ColumnIterator ci = ri.next().getValue(); + while(ci.hasNext()){ + // Get a sibling binding set. + final String siblingBindingSetString = ci.next().getValue().toString(); + final String[] siblingBindingStrings = FluoStringConverter.toBindingStrings(siblingBindingSetString); + + // Overwrite the previous sibling's values to create a new join binding set. + for (int i = 0; i < siblingBindingStrings.length; i++) { + joinBindingSet.put(siblingVarOrderArray[i], siblingBindingStrings[i]); + } + final String joinBindingSetString = makeBindingSetString(joinVarOrder, joinBindingSet); + + // Write the join binding set to Fluo. + final Bytes row = encoder.encode(joinMetadata.getNodeId() + NODEID_BS_DELIM + joinBindingSetString); + final Column col = FluoQueryColumns.JOIN_BINDING_SET; + final Bytes value = encoder.encode(joinBindingSetString); + tx.set(row, col, value); + } + } + } catch (final Exception e) { + log.error("Error while scanning sibling binding sets to create new join results.", e); + } + } + + /** + * Fetch the {@link VariableOrder} of a query node. + * + * @param tx - The transaction that will be used to read the variable order. (not null) + * @param nodeId - The ID of the node to fetch. (not null) + * @return The {@link VariableOrder} of the node. + */ + private VariableOrder getVarOrder(final TransactionBase tx, final String nodeId) { + checkNotNull(tx); + checkNotNull(nodeId); + + final NodeType nodeType = NodeType.fromNodeId(nodeId).get(); + switch(nodeType) { + case STATEMENT_PATTERN: + return queryDao.readStatementPatternMetadata(tx, nodeId).getVariableOrder(); + + case FILTER: + return queryDao.readFilterMetadata(tx, nodeId).getVariableOrder(); + + case JOIN: + return queryDao.readJoinMetadata(tx, nodeId).getVariableOrder(); + + default: + throw new IllegalArgumentException("Could not figure out the variable order for node with ID: " + nodeId); + } + } + + /** + * Assuming that the common variables between two children are already + * shifted to the left, find the common variables between them. + * <p> + * Refer to {@link FluoQueryInitializer} to see why this assumption is being made. + * + * @param vars1 - The first child's variable order. (not null) + * @param vars2 - The second child's variable order. (not null) + * @return An ordered List of the common variables between the two children. + */ + public List<String> getCommonVars(final VariableOrder vars1, final VariableOrder vars2) { + checkNotNull(vars1); + checkNotNull(vars2); + + final List<String> commonVars = new ArrayList<>(); + + // Only need to iteratre through the shorted order's length. + final Iterator<String> vars1It = vars1.iterator(); + final Iterator<String> vars2It = vars2.iterator(); + while(vars1It.hasNext() && vars2It.hasNext()) { + final String var1 = vars1It.next(); + final String var2 = vars2It.next(); + + if(var1.equals(var2)) { + commonVars.add(var1); + } else { + // Because the common variables are left shifted, we can break once + // we encounter a pair that does not match. + break; + } + } + + return commonVars; + } + +// /** +// * Assuming that the common variables between two children are already +// * shifted to the left, find the common variables between them. +// * <p> +// * Refer to {@link FluoQueryInitializer} to see why this assumption is being made. +// * +// * @param vars1 - The first child's variable order. (not null) +// * @param vars2 - The second child's variable order. (not null) +// * @return An ordered List of the common variables between the two children. +// */ +// private List<String> getCommonVars(final String[] vars1, final String[] vars2) { +// checkNotNull(vars1); +// checkNotNull(vars2); +// +// final List<String> commonVars = new ArrayList<>(); +// +// // Only need to iteratre through the shorted order's length. +// final int shortestLen = Math.min(vars1.length, vars2.length); +// for(int i = 0; i < shortestLen; i++) { +// final String var1 = vars1[i]; +// final String var2 = vars2[i]; +// +// if(var1.equals(var2)) { +// commonVars.add(var1); +// } else { +// // Because the common variables are left shifted, we can break once +// // we encounter a pair that does not match. +// break; +// } +// } +// +// return commonVars; +// } + + /** + * Update a {@link ScannerConfiguration} to use the sibling node's binding + * set column for its scan. The column that will be used is determined by the + * node's {@link NodeType}. + * + * @param sc - The scan configuration that will be updated. (not null) + * @param siblingId - The Node ID of the sibling. (not null) + */ + private static void setScanColumnFamily(final ScannerConfiguration sc, final String siblingId) { + checkNotNull(sc); + checkNotNull(siblingId); + + // Determine which type of binding set the sibling is. + final Optional<NodeType> siblingType = NodeType.fromNodeId(siblingId); + if(!siblingType.isPresent()) { + throw new IllegalStateException("The child's sibling is not of a recognized type."); + } + + // Set the column to join with. + Column column; + switch(siblingType.get()) { + case STATEMENT_PATTERN: + column = FluoQueryColumns.STATEMENT_PATTERN_BINDING_SET; + break; + case FILTER: + column = FluoQueryColumns.FILTER_BINDING_SET; + break; + case JOIN: + column = FluoQueryColumns.JOIN_BINDING_SET; + break; + default: + throw new IllegalArgumentException("The child node's sibling is not of type StatementPattern, Join, or Filter."); + } + sc.fetchColumn(column.getFamily(), column.getQualifier()); + } + + /** + * Create a Binding Set String from a variable order and a map of bindings. + * + * @param varOrder - The resulting binding set's variable order. (not null) + * @param bindingSetValues - A map holding the variables and their values that will be + * included in the resulting binding set. + * @return A binding set string build from the map using the prescribed variable order. + */ + private static String makeBindingSetString(final String[] varOrder, final Map<String, String> bindingSetValues) { + checkNotNull(varOrder); + checkNotNull(bindingSetValues); + + String bindingSetString = ""; + + for (final String joinVar : varOrder) { + if (bindingSetString.length() == 0) { + bindingSetString = bindingSetValues.get(joinVar); + } else { + bindingSetString = bindingSetString + DELIM + bindingSetValues.get(joinVar); + } + } + + return bindingSetString; + } +} + http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/15ec5d5f/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/NodeType.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/NodeType.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/NodeType.java new file mode 100644 index 0000000..e9f6243 --- /dev/null +++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/NodeType.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.indexing.pcj.fluo.app; + +import static com.google.common.base.Preconditions.checkNotNull; +import static org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.FILTER_PREFIX; +import static org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.JOIN_PREFIX; +import static org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.QUERY_PREFIX; +import static org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.SP_PREFIX; + +import com.google.common.base.Optional; + +/** + * Represents the different types of nodes that a Query may have. + */ +public enum NodeType { + FILTER, + JOIN, + STATEMENT_PATTERN, + QUERY; + + /** + * Get the {@link NodeType} of a node based on its Node ID. + * + * @param nodeId - The Node ID of a node. (not null) + * @return The {@link NodeType} of the node if it could be derived from the + * node's ID, otherwise absent. + */ + public static Optional<NodeType> fromNodeId(final String nodeId) { + checkNotNull(nodeId); + + NodeType type = null; + + if(nodeId.startsWith(SP_PREFIX)) { + type = STATEMENT_PATTERN; + } else if(nodeId.startsWith(FILTER_PREFIX)) { + type = FILTER; + } else if(nodeId.startsWith(JOIN_PREFIX)) { + type = JOIN; + } else if(nodeId.startsWith(QUERY_PREFIX)) { + type = QUERY; + } + + return Optional.fromNullable(type); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/15ec5d5f/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/QueryResultUpdater.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/QueryResultUpdater.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/QueryResultUpdater.java new file mode 100644 index 0000000..e4d0155 --- /dev/null +++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/QueryResultUpdater.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.indexing.pcj.fluo.app; + +import static com.google.common.base.Preconditions.checkNotNull; +import static org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.NODEID_BS_DELIM; + +import javax.annotation.ParametersAreNonnullByDefault; + +import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns; +import org.apache.rya.indexing.pcj.fluo.app.query.QueryMetadata; +import org.openrdf.query.Binding; +import org.openrdf.query.BindingSet; +import org.openrdf.query.impl.MapBindingSet; + +import io.fluo.api.client.TransactionBase; +import io.fluo.api.data.Bytes; +import io.fluo.api.data.Column; +import io.fluo.api.types.Encoder; +import io.fluo.api.types.StringEncoder; +import mvm.rya.indexing.external.tupleSet.BindingSetStringConverter; +import mvm.rya.indexing.external.tupleSet.PcjTables.VariableOrder; + +/** + * Updates the results of a Query node when one of its children has added a + * new Binding Set to its results. + */ +@ParametersAreNonnullByDefault +public class QueryResultUpdater { + + private final Encoder encoder = new StringEncoder(); + + /** + * Updates the results of a Query node when one of its children has added a + * new Binding Set to its results. + * + * @param tx - The transaction all Fluo queries will use. (not null) + * @param childBindingSet - A binding set that the query's child node has emmitted. (not null) + * @param queryMetadata - The metadata of the Query whose results will be updated. (not null) + */ + public void updateQueryResults( + final TransactionBase tx, + final BindingSet childBindingSet, + final QueryMetadata queryMetadata) { + checkNotNull(tx); + checkNotNull(childBindingSet); + checkNotNull(queryMetadata); + + // Create the query's Binding Set from the child node's binding set. + final VariableOrder queryVarOrder = queryMetadata.getVariableOrder(); + + final MapBindingSet queryBindingSet = new MapBindingSet(); + for(final String bindingName : queryVarOrder) { + final Binding binding = childBindingSet.getBinding(bindingName); + queryBindingSet.addBinding(binding); + } + final String queryBindingSetString = BindingSetStringConverter.toString(queryBindingSet, queryVarOrder); + + // Commit it to the Fluo table for the SPARQL query. This isn't guaranteed to be a new entry. + final Bytes row = encoder.encode(queryMetadata.getNodeId() + NODEID_BS_DELIM + queryBindingSetString); + final Column col = FluoQueryColumns.QUERY_BINDING_SET; + final Bytes value = encoder.encode(queryBindingSetString); + tx.set(row, col, value); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/15ec5d5f/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/StringTypeLayer.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/StringTypeLayer.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/StringTypeLayer.java new file mode 100644 index 0000000..35fcf52 --- /dev/null +++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/StringTypeLayer.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.indexing.pcj.fluo.app; + +import io.fluo.api.types.StringEncoder; +import io.fluo.api.types.TypeLayer; + +public class StringTypeLayer extends TypeLayer { + + public StringTypeLayer() { + super(new StringEncoder()); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/15ec5d5f/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/IncrementalResultExporter.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/IncrementalResultExporter.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/IncrementalResultExporter.java new file mode 100644 index 0000000..fbbae33 --- /dev/null +++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/IncrementalResultExporter.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.indexing.pcj.fluo.app.export; + +import javax.annotation.ParametersAreNonnullByDefault; + +import org.openrdf.query.BindingSet; + +import io.fluo.api.types.TypedTransactionBase; + +/** + * Exports a single Binding Set that is a new result for a SPARQL query to some + * other location. + */ +@ParametersAreNonnullByDefault +public interface IncrementalResultExporter { + + /** + * Export a Binding Set that is a result of a SPARQL query. + * + * @param tx - The Fluo transaction this export is a part of. (not null) + * @param queryId - The Fluo ID of the SPARQL query the binding set is a result of. (not null) + * @param bindingSetString - The binding set as it was represented within the + * Fluo application. (not null) + * @throws ResultExportException The result could not be exported. + */ + public void export(TypedTransactionBase tx, String queryId, BindingSet result) throws ResultExportException; + + /** + * A result could not be exported. + */ + public static class ResultExportException extends Exception { + private static final long serialVersionUID = 1L; + + /** + * Constructs an instance of {@link ResultExportException}. + * + * @param message - Explains why the exception was thrown. + */ + public ResultExportException(final String message) { + super(message); + } + + /** + * Constructs an instance of {@link ResultExportException}. + * + * @param message - Explains why the exception was thrown. + * @param cause - The exception that caused this one to be thrown. + */ + public ResultExportException(final String message, final Throwable cause) { + super(message, cause); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/15ec5d5f/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/IncrementalResultExporterFactory.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/IncrementalResultExporterFactory.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/IncrementalResultExporterFactory.java new file mode 100644 index 0000000..aae42ef --- /dev/null +++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/IncrementalResultExporterFactory.java @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.indexing.pcj.fluo.app.export; + +import javax.annotation.ParametersAreNonnullByDefault; + +import com.google.common.base.Optional; + +import io.fluo.api.observer.Observer.Context; + +/** + * Builds instances of {@link IncrementalResultExporter} using the provided + * configurations. + */ +@ParametersAreNonnullByDefault +public interface IncrementalResultExporterFactory { + + /** + * Builds an instance of {@link IncrementalResultExporter} using the + * configurations that are provided. + * + * @param context - Contains the host application's configuration values + * and any parameters that were provided at initialization. (not null) + * @return An exporter if configurations were found in the context; otherwise absent. + * @throws IncrementalExporterFactoryException A non-configuration related + * problem has occurred and the exporter could not be created as a result. + * @throws ConfigurationException Thrown if configuration values were + * provided, but an instance of the exporter could not be initialized + * using them. This could be because they were improperly formatted, + * a required field was missing, or some other configuration based problem. + */ + public Optional<IncrementalResultExporter> build(Context context) throws IncrementalExporterFactoryException, ConfigurationException; + + /** + * Indicates a {@link IncrementalResultExporter} could not be created by a + * {@link IncrementalResultExporterFactory}. + */ + public static class IncrementalExporterFactoryException extends Exception { + private static final long serialVersionUID = 1L; + + /** + * Constructs an instance of {@link }. + * + * @param message - Explains why this exception is being thrown. + */ + public IncrementalExporterFactoryException(final String message) { + super(message); + } + + /** + * Constructs an instance of {@link }. + * + * @param message - Explains why this exception is being thrown. + * @param cause - The exception that caused this one to be thrown. + */ + public IncrementalExporterFactoryException(final String message, final Throwable t) { + super(message, t); + } + } + + /** + * The configuration could not be interpreted because required fields were + * missing or a value wasn't properly formatted. + */ + public static class ConfigurationException extends IncrementalExporterFactoryException { + private static final long serialVersionUID = 1L; + + /** + * Constructs an instance of {@link ConfigurationException}. + * + * @param message - Explains why this exception is being thrown. + */ + public ConfigurationException(final String message) { + super(message); + } + + /** + * Constructs an instance of {@link ConfigurationException}. + * + * @param message - Explains why this exception is being thrown. + * @param cause - The exception that caused this one to be thrown. + */ + public ConfigurationException(final String message, final Throwable cause) { + super(message, cause); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/15ec5d5f/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/ParametersBase.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/ParametersBase.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/ParametersBase.java new file mode 100644 index 0000000..57938c7 --- /dev/null +++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/ParametersBase.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.indexing.pcj.fluo.app.export; + +import static com.google.common.base.Preconditions.checkNotNull; + +import java.util.Map; + +import javax.annotation.ParametersAreNonnullByDefault; + +import com.google.common.base.Optional; + +/** + * Contains common parsing functions that make it easier to interpret parameter maps. + */ +@ParametersAreNonnullByDefault +public abstract class ParametersBase { + + protected final Map<String, String> params; + + /** + * Constructs an instance of {@link ParametersBase}. + * + * @param params - The parameters that will be wrapped. (not null) + */ + public ParametersBase(final Map<String, String> params) { + this.params = checkNotNull(params); + } + + protected static void setBoolean(final Map<String, String> params, final String paramName, final boolean value) { + checkNotNull(params); + checkNotNull(paramName); + params.put(paramName, Boolean.toString(value)); + } + + protected static boolean getBoolean(final Map<String, String> params, final String paramName, final boolean defaultValue) { + checkNotNull(params); + checkNotNull(paramName); + + final Optional<String> paramValue = Optional.fromNullable( params.get(paramName) ); + return paramValue.isPresent() ? Boolean.parseBoolean( paramValue.get() ) : defaultValue; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/15ec5d5f/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/RyaExportParameters.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/RyaExportParameters.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/RyaExportParameters.java new file mode 100644 index 0000000..525f9c3 --- /dev/null +++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/RyaExportParameters.java @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.indexing.pcj.fluo.app.export.rya; + +import java.util.Map; + +import javax.annotation.Nullable; +import javax.annotation.ParametersAreNonnullByDefault; + +import org.apache.rya.indexing.pcj.fluo.app.export.ParametersBase; + +import com.google.common.base.Optional; + +import io.fluo.api.observer.Observer; + +/** + * Provides read/write functions to the parameters map that is passed into an + * {@link Observer#init(io.fluo.api.observer.Observer.Context)} method related + * to Rya PCJ exporting. + */ +@ParametersAreNonnullByDefault +public class RyaExportParameters extends ParametersBase { + + public static final String CONF_EXPORT_TO_RYA = "pcj.fluo.export.rya.enabled"; + public static final String CONF_ACCUMULO_INSTANCE_NAME = "pcj.fluo.export.rya.accumuloInstanceName"; + public static final String CONF_ZOOKEEPER_SERVERS = "pcj.fluo.export.rya.zookeeperServers"; + public static final String CONF_EXPORTER_USERNAME = "pcj.fluo.export.rya.exporterUsername"; + public static final String CONF_EXPORTER_PASSWORD = "pcj.fluo.export.rya.exporterPassword"; + + /** + * Constructs an instance of {@link RyaExportParameters}. + * + * @param params - The parameters object that will be read/writen to. (not null) + */ + public RyaExportParameters(final Map<String, String> params) { + super(params); + } + + /** + * @param isExportToRya - {@code True} if the Fluo application should export + * to Rya; otherwise {@code false}. + */ + public void setExportToRya(final boolean isExportToRya) { + setBoolean(params, CONF_EXPORT_TO_RYA, isExportToRya); + } + + /** + * @return {@code True} if the Fluo application should export to Rya; otherwise + * {@code false}. Defaults to {@code false} if no value is present. + */ + public boolean isExportToRya() { + return getBoolean(params, CONF_EXPORT_TO_RYA, false); + } + + /** + * @param accumuloInstanceName - The name of the Accumulo instance the exporter will connect to. + */ + public void setAccumuloInstanceName(@Nullable final String accumuloInstanceName) { + params.put(CONF_ACCUMULO_INSTANCE_NAME, accumuloInstanceName); + } + + /** + * @return The name of the Accumulo instance the exporter will connect to. + */ + public Optional<String> getAccumuloInstanceName() { + return Optional.fromNullable( params.get(CONF_ACCUMULO_INSTANCE_NAME) ); + } + + /** + * @param zookeeperServers - A semicolon delimited list of Zookeeper + * server hostnames for the zookeepers that provide connections ot the + * target Accumulo instance. + */ + public void setZookeeperServers(@Nullable final String zookeeperServers) { + params.put(CONF_ZOOKEEPER_SERVERS, zookeeperServers); + } + + /** + * @return A semicolon delimited list of Zookeeper server hostnames for the + * zookeepers that provide connections ot the target Accumulo instance. + */ + public Optional<String> getZookeeperServers() { + return Optional.fromNullable( params.get(CONF_ZOOKEEPER_SERVERS) ); + } + + /** + * @param exporterUsername - The username that will be used to export PCJ + * results to the destination Accumulo table. + */ + public void setExporterUsername(@Nullable final String exporterUsername) { + params.put(CONF_EXPORTER_USERNAME, exporterUsername); + } + + /** + * @return The username that will be used to export PCJ results to the + * destination Accumulo table. + */ + public Optional<String> getExporterUsername() { + return Optional.fromNullable( params.get(CONF_EXPORTER_USERNAME) ); + } + + /** + * @param exporterPassword - The password that will be used to export PCJ + * results to the destination Accummulo table. + */ + public void setExporterPassword(@Nullable final String exporterPassword) { + params.put(CONF_EXPORTER_PASSWORD, exporterPassword); + } + + /** + * @return The password that will be used to export PCJ + * results to the destination Accummulo table. + */ + public Optional<String> getExporterPassword() { + return Optional.fromNullable( params.get(CONF_EXPORTER_PASSWORD) ); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/15ec5d5f/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/RyaResultExporter.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/RyaResultExporter.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/RyaResultExporter.java new file mode 100644 index 0000000..4d51798 --- /dev/null +++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/RyaResultExporter.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.indexing.pcj.fluo.app.export.rya; + +import static com.google.common.base.Preconditions.checkNotNull; + +import java.util.Collections; + +import org.apache.accumulo.core.client.Connector; +import org.apache.rya.indexing.pcj.fluo.app.export.IncrementalResultExporter; +import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns; +import org.openrdf.query.BindingSet; + +import io.fluo.api.data.Bytes; +import io.fluo.api.types.TypedTransactionBase; +import mvm.rya.indexing.external.tupleSet.PcjTables; +import mvm.rya.indexing.external.tupleSet.PcjTables.PcjException; + +/** + * Incrementally exports SPARQL query results to Accumulo PCJ tables as they are defined by Rya. + */ +public class RyaResultExporter implements IncrementalResultExporter { + + private final Connector accumuloConn; + private final PcjTables pcjTables; + + /** + * Constructs an instance of {@link RyaResultExporter}. + * + * @param accumuloConn - A connection to the Accumulo instance that hosts Rya PCJ tables. (not null) + * @param pcjTables - A utility used to interact with Rya's PCJ tables. (not null) + */ + public RyaResultExporter(final Connector accumuloConn, final PcjTables pcjTables) { + this.accumuloConn = checkNotNull(accumuloConn); + this.pcjTables = checkNotNull(pcjTables); + } + + @Override + public void export( + final TypedTransactionBase fluoTx, + final String queryId, + final BindingSet result) throws ResultExportException { + checkNotNull(fluoTx); + checkNotNull(queryId); + checkNotNull(result); + + // Get the name of the table the PCJ results will be written to. + final String pcjTableName = fluoTx.get(Bytes.of(queryId), FluoQueryColumns.QUERY_RYA_EXPORT_TABLE_NAME).toString(); + + // Write the result to the PCJ table. + try { + pcjTables.addResults(accumuloConn, pcjTableName, Collections.singleton(result)); + } catch (final PcjException e) { + throw new ResultExportException("A result could not be exported to the PCJ table in Accumulo.", e); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/15ec5d5f/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/RyaResultExporterFactory.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/RyaResultExporterFactory.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/RyaResultExporterFactory.java new file mode 100644 index 0000000..fd63314 --- /dev/null +++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/RyaResultExporterFactory.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.indexing.pcj.fluo.app.export.rya; + +import static com.google.common.base.Preconditions.checkNotNull; + +import org.apache.accumulo.core.client.AccumuloException; +import org.apache.accumulo.core.client.AccumuloSecurityException; +import org.apache.accumulo.core.client.Connector; +import org.apache.accumulo.core.client.Instance; +import org.apache.accumulo.core.client.ZooKeeperInstance; +import org.apache.accumulo.core.client.security.tokens.PasswordToken; +import org.apache.rya.indexing.pcj.fluo.app.export.IncrementalResultExporter; +import org.apache.rya.indexing.pcj.fluo.app.export.IncrementalResultExporterFactory; + +import com.google.common.base.Optional; + +import io.fluo.api.observer.Observer.Context; +import mvm.rya.indexing.external.tupleSet.PcjTables; + +/** + * Creates instances of {@link RyaResultExporter}. + */ +public class RyaResultExporterFactory implements IncrementalResultExporterFactory { + + @Override + public Optional<IncrementalResultExporter> build(final Context context) throws IncrementalExporterFactoryException, ConfigurationException { + checkNotNull(context); + + // Wrap the context's parameters for parsing. + final RyaExportParameters params = new RyaExportParameters( context.getParameters() ); + + if(params.isExportToRya()) { + final String accumuloInstance = params.getAccumuloInstanceName().get(); + final String zookeeperServers = params.getZookeeperServers().get().replaceAll(";", ","); + final Instance inst = new ZooKeeperInstance(accumuloInstance, zookeeperServers); + + try { + final String exporterUsername = params.getExporterUsername().get(); + final String exporterPassword = params.getExporterPassword().get(); + final Connector accumuloConn = inst.getConnector(exporterUsername, new PasswordToken(exporterPassword)); + + final IncrementalResultExporter exporter = new RyaResultExporter(accumuloConn, new PcjTables()); + return Optional.of(exporter); + + } catch (final AccumuloException | AccumuloSecurityException e) { + throw new IncrementalExporterFactoryException("Could not initialize the Accumulo connector using the provided configuration.", e); + } + } else { + return Optional.absent(); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/15ec5d5f/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/BindingSetUpdater.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/BindingSetUpdater.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/BindingSetUpdater.java new file mode 100644 index 0000000..d3c5bb5 --- /dev/null +++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/BindingSetUpdater.java @@ -0,0 +1,155 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.indexing.pcj.fluo.app.observers; + +import static com.google.common.base.Preconditions.checkNotNull; + +import javax.annotation.ParametersAreNonnullByDefault; + +import org.apache.rya.indexing.pcj.fluo.app.BindingSetRow; +import org.apache.rya.indexing.pcj.fluo.app.FilterResultUpdater; +import org.apache.rya.indexing.pcj.fluo.app.JoinResultUpdater; +import org.apache.rya.indexing.pcj.fluo.app.NodeType; +import org.apache.rya.indexing.pcj.fluo.app.QueryResultUpdater; +import org.apache.rya.indexing.pcj.fluo.app.query.FilterMetadata; +import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryMetadataDAO; +import org.apache.rya.indexing.pcj.fluo.app.query.JoinMetadata; +import org.apache.rya.indexing.pcj.fluo.app.query.QueryMetadata; +import org.openrdf.query.BindingSet; + +import io.fluo.api.client.TransactionBase; +import io.fluo.api.data.Bytes; +import io.fluo.api.data.Column; +import io.fluo.api.types.TypedObserver; +import io.fluo.api.types.TypedTransactionBase; + +/** + * Notified when the results of a node have been updated to include a new Binding + * Set. This observer updates its parent if the new Binding Set effects the parent's + * results. + */ +@ParametersAreNonnullByDefault +public abstract class BindingSetUpdater extends TypedObserver { + + // DAO + private final FluoQueryMetadataDAO queryDao = new FluoQueryMetadataDAO(); + + // Updaters + private final JoinResultUpdater joinUpdater = new JoinResultUpdater(); + private final FilterResultUpdater filterUpdater = new FilterResultUpdater(); + private final QueryResultUpdater queryUpdater = new QueryResultUpdater(); + + @Override + public abstract ObservedColumn getObservedColumn(); + + /** + * Create an {@link Observation} that defines the work that needs to be done. + * + * @param tx - The Fluo transaction being used for the observer notification. (not null) + * @param parsedRow - The RowID parsed into a Binding Set and Node ID. (not null) + * @return An {@link Observation} that defines the work that needs to be done. + */ + public abstract Observation parseObservation(TransactionBase tx, final BindingSetRow parsedRow); + + @Override + public final void process(final TypedTransactionBase tx, final Bytes row, final Column col) { + checkNotNull(tx); + checkNotNull(row); + checkNotNull(col); + + final Observation observation = parseObservation( tx, BindingSetRow.make(row) ); + final String observedNodeId = observation.getObservedNodeId(); + final BindingSet observedBindingSet = observation.getObservedBindingSet(); + final String parentNodeId = observation.getParentId(); + + // Figure out which node needs to handle the new metadata. + final NodeType parentNodeType = NodeType.fromNodeId(parentNodeId).get(); + switch(parentNodeType) { + case QUERY: + final QueryMetadata parentQuery = queryDao.readQueryMetadata(tx, parentNodeId); + queryUpdater.updateQueryResults(tx, observedBindingSet, parentQuery); + break; + + case FILTER: + final FilterMetadata parentFilter = queryDao.readFilterMetadata(tx, parentNodeId); + try { + filterUpdater.updateFilterResults(tx, observedBindingSet, parentFilter); + } catch (final Exception e) { + throw new RuntimeException("Could not process a Filter node.", e); + } + break; + + case JOIN: + final JoinMetadata parentJoin = queryDao.readJoinMetadata(tx, parentNodeId); + joinUpdater.updateJoinResults(tx, observedNodeId, observedBindingSet, parentJoin); + break; + + default: + throw new IllegalArgumentException("The parent node's NodeType must be of type Filter, Join, or Query, but was " + parentNodeType); + } + } + + /** + * Defines who just emitted a new Binding Set result, the Binding Set itself, + * and which node must now handle it. + */ + public static final class Observation { + + private final String observedNodeId; + private final BindingSet observedBindingSet; + private final String parentNodeId; + + /** + * Creates an instance of {@link Observation}. + * + * @param observedNodeId - The Node ID that just emitted a new Binding Set. (not null) + * @param observedBindingSet - A Binding Set that was just emitted. (not null) + * @param parentNodeId - The Node ID of the node that must handle the new Binding Set input. (not null) + */ + public Observation( + final String observedNodeId, + final BindingSet observedBindingSet, + final String parentNodeId) { + this.observedNodeId = checkNotNull(observedNodeId); + this.observedBindingSet = checkNotNull(observedBindingSet); + this.parentNodeId = checkNotNull(parentNodeId); + } + + /** + * @return The Node ID that just emitted a new Binding Set. + */ + public String getObservedNodeId() { + return observedNodeId; + } + + /** + * @return A Binding Set that was just emitted. + */ + public BindingSet getObservedBindingSet() { + return observedBindingSet; + } + + /** + * @return The Node ID of the node that must handle the new Binding Set input. + */ + public String getParentId() { + return parentNodeId; + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/15ec5d5f/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/FilterObserver.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/FilterObserver.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/FilterObserver.java new file mode 100644 index 0000000..c6b6303 --- /dev/null +++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/FilterObserver.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.indexing.pcj.fluo.app.observers; + +import static com.google.common.base.Preconditions.checkNotNull; + +import org.apache.rya.indexing.pcj.fluo.app.BindingSetRow; +import org.apache.rya.indexing.pcj.fluo.app.FluoStringConverter; +import org.apache.rya.indexing.pcj.fluo.app.query.FilterMetadata; +import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns; +import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryMetadataDAO; +import org.openrdf.query.BindingSet; + +import io.fluo.api.client.TransactionBase; + +/** + * Notified when the results of a Filter have been updated to include a new + * {@link BindingSet}. This observer updates its parent if the new Binding Set + * effects the parent's results. + */ +public class FilterObserver extends BindingSetUpdater { + + private final FluoQueryMetadataDAO queryDao = new FluoQueryMetadataDAO(); + + @Override + public ObservedColumn getObservedColumn() { + return new ObservedColumn(FluoQueryColumns.FILTER_BINDING_SET, NotificationType.STRONG); + } + + @Override + public Observation parseObservation(final TransactionBase tx, final BindingSetRow parsedRow) { + checkNotNull(tx); + checkNotNull(parsedRow); + + // Read the Filter metadata. + final String filterNodeId = parsedRow.getNodeId(); + final FilterMetadata filterMetadata = queryDao.readFilterMetadata(tx, filterNodeId); + + // Read the Binding Set that was just emmitted by the Filter. + final String[] filterBindingStrings = parsedRow.getBindingStrings(); + final String[] filterVarOrder = filterMetadata.getVariableOrder().toArray(); + final BindingSet filterBindingSet = FluoStringConverter.toBindingSet(filterBindingStrings, filterVarOrder); + + // Figure out which node needs to handle the new metadata. + final String parentNodeId = filterMetadata.getParentNodeId(); + + return new Observation(filterNodeId, filterBindingSet, parentNodeId); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/15ec5d5f/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/JoinObserver.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/JoinObserver.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/JoinObserver.java new file mode 100644 index 0000000..980c7bc --- /dev/null +++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/JoinObserver.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.indexing.pcj.fluo.app.observers; + +import static com.google.common.base.Preconditions.checkNotNull; + +import org.apache.rya.indexing.pcj.fluo.app.BindingSetRow; +import org.apache.rya.indexing.pcj.fluo.app.FluoStringConverter; +import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns; +import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryMetadataDAO; +import org.apache.rya.indexing.pcj.fluo.app.query.JoinMetadata; +import org.openrdf.query.BindingSet; + +import io.fluo.api.client.TransactionBase; + +/** + * Notified when the results of a Join have been updated to include a new + * {@link BindingSet}. This observer updates its parent if the new Binding Set + * effects the parent's results. + */ +public class JoinObserver extends BindingSetUpdater { + + private final FluoQueryMetadataDAO queryDao = new FluoQueryMetadataDAO(); + + @Override + public ObservedColumn getObservedColumn() { + return new ObservedColumn(FluoQueryColumns.JOIN_BINDING_SET, NotificationType.STRONG); + } + + @Override + public Observation parseObservation(final TransactionBase tx, final BindingSetRow parsedRow) { + checkNotNull(parsedRow); + + // Read the Join metadata. + final String joinNodeId = parsedRow.getNodeId(); + final JoinMetadata joinMetadata = queryDao.readJoinMetadata(tx, joinNodeId); + + // Read the Binding Set that was just emmitted by the Join. + final String[] joinBindingStrings = parsedRow.getBindingStrings(); + final String[] joinVarOrder = joinMetadata.getVariableOrder().toArray(); + final BindingSet joinBindingSet = FluoStringConverter.toBindingSet(joinBindingStrings, joinVarOrder); + + // Figure out which node needs to handle the new metadata. + final String parentNodeId = joinMetadata.getParentNodeId(); + + return new Observation(joinNodeId, joinBindingSet, parentNodeId); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/15ec5d5f/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/QueryResultObserver.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/QueryResultObserver.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/QueryResultObserver.java new file mode 100644 index 0000000..8a60039 --- /dev/null +++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/QueryResultObserver.java @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.indexing.pcj.fluo.app.observers; + +import static org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.NODEID_BS_DELIM; + +import org.apache.log4j.Logger; +import org.apache.rya.indexing.pcj.fluo.app.FluoStringConverter; +import org.apache.rya.indexing.pcj.fluo.app.export.IncrementalResultExporter; +import org.apache.rya.indexing.pcj.fluo.app.export.IncrementalResultExporter.ResultExportException; +import org.apache.rya.indexing.pcj.fluo.app.export.IncrementalResultExporterFactory; +import org.apache.rya.indexing.pcj.fluo.app.export.IncrementalResultExporterFactory.IncrementalExporterFactoryException; +import org.apache.rya.indexing.pcj.fluo.app.export.rya.RyaResultExporterFactory; +import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns; +import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryMetadataDAO; +import org.apache.rya.indexing.pcj.fluo.app.query.QueryMetadata; +import org.openrdf.query.BindingSet; + +import com.google.common.base.Optional; +import com.google.common.collect.ImmutableSet; + +import io.fluo.api.data.Bytes; +import io.fluo.api.data.Column; +import io.fluo.api.types.TypedObserver; +import io.fluo.api.types.TypedTransactionBase; + +/** + * Performs incremental result exporting to the configured destinations. + */ +public class QueryResultObserver extends TypedObserver { + private static final Logger log = Logger.getLogger(QueryResultObserver.class); + + private final FluoQueryMetadataDAO queryDao = new FluoQueryMetadataDAO(); + + /** + * Builders for each type of result exporter we support. + */ + private static final ImmutableSet<IncrementalResultExporterFactory> factories = + ImmutableSet.<IncrementalResultExporterFactory>builder() + .add(new RyaResultExporterFactory()) + .build(); + + /** + * The exporters that are configured. + */ + private ImmutableSet<IncrementalResultExporter> exporters = null; + + @Override + public ObservedColumn getObservedColumn() { + return new ObservedColumn(FluoQueryColumns.QUERY_BINDING_SET, NotificationType.STRONG); + } + + /** + * Before running, determine which exporters are configured and set them up. + */ + @Override + public void init(final Context context) { + final ImmutableSet.Builder<IncrementalResultExporter> exportersBuilder = ImmutableSet.builder(); + + for(final IncrementalResultExporterFactory builder : factories) { + try { + final Optional<IncrementalResultExporter> exporter = builder.build(context); + if(exporter.isPresent()) { + exportersBuilder.add(exporter.get()); + } + } catch (final IncrementalExporterFactoryException e) { + log.error("Could not initialize a result exporter.", e); + } + } + + exporters = exportersBuilder.build(); + } + + @Override + public void process(final TypedTransactionBase tx, final Bytes row, final Column col) { + // Read the SPARQL query and it Binding Set from the row id. + final String[] queryAndBindingSet = row.toString().split(NODEID_BS_DELIM); + final String queryId = queryAndBindingSet[0]; + final String bindingSetString = queryAndBindingSet[1]; + + // Fetch the query's Variable Order from the Fluo table. + final QueryMetadata queryMetadata = queryDao.readQueryMetadata(tx, queryId); + final String[] varOrder = queryMetadata.getVariableOrder().toArray(); + + // Export the result using each of the provided exporters. + final BindingSet result = FluoStringConverter.toBindingSet(bindingSetString, varOrder); + for(final IncrementalResultExporter exporter : exporters) { + try { + exporter.export(tx, queryId, result); + } catch (final ResultExportException e) { + log.error("Could not export a binding set for query '" + queryId + "'. Binding Set: " + bindingSetString); + } + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/15ec5d5f/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/StatementPatternObserver.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/StatementPatternObserver.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/StatementPatternObserver.java new file mode 100644 index 0000000..00f9b13 --- /dev/null +++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/StatementPatternObserver.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.indexing.pcj.fluo.app.observers; + +import static com.google.common.base.Preconditions.checkNotNull; + +import org.apache.rya.indexing.pcj.fluo.app.BindingSetRow; +import org.apache.rya.indexing.pcj.fluo.app.FluoStringConverter; +import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns; +import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryMetadataDAO; +import org.apache.rya.indexing.pcj.fluo.app.query.StatementPatternMetadata; +import org.openrdf.query.BindingSet; + +import io.fluo.api.client.TransactionBase; + +/** + * Notified when the results of a Statement Pattern have been updated to include + * a new {@link BindingSet}. This observer updates its parent if the new + * Binding Set effects the parent's results. + */ +public class StatementPatternObserver extends BindingSetUpdater { + + // DAO + private final FluoQueryMetadataDAO queryDao = new FluoQueryMetadataDAO(); + + @Override + public ObservedColumn getObservedColumn() { + return new ObservedColumn(FluoQueryColumns.STATEMENT_PATTERN_BINDING_SET, NotificationType.STRONG); + } + + @Override + public Observation parseObservation(final TransactionBase tx, final BindingSetRow parsedRow) { + checkNotNull(tx); + + // Read the Statement Pattern metadata. + final String spNodeId = parsedRow.getNodeId(); + final StatementPatternMetadata spMetadata = queryDao.readStatementPatternMetadata(tx, spNodeId); + + // Read the Binding Set that was just emmitted by the Statement Pattern. + final String[] spBindingStrings = parsedRow.getBindingStrings(); + final String[] spVarOrder = spMetadata.getVariableOrder().toArray(); + final BindingSet spBindingSet = FluoStringConverter.toBindingSet(spBindingStrings, spVarOrder); + + // Figure out which node needs to handle the new metadata. + final String parentNodeId = spMetadata.getParentNodeId(); + + return new Observation(spNodeId, spBindingSet, parentNodeId); + } +} \ No newline at end of file
