http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/62de7c5d/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/SpanBatchBindingSetUpdater.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/SpanBatchBindingSetUpdater.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/SpanBatchBindingSetUpdater.java index 749a77d..d56574e 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/SpanBatchBindingSetUpdater.java +++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/SpanBatchBindingSetUpdater.java @@ -30,18 +30,18 @@ import org.apache.fluo.api.data.ColumnValue; import org.apache.fluo.api.data.RowColumn; import org.apache.fluo.api.data.Span; import org.apache.log4j.Logger; +import org.apache.rya.indexing.pcj.fluo.app.BindingSetRow; +import org.apache.rya.indexing.pcj.fluo.app.NodeType; import org.apache.rya.indexing.pcj.fluo.app.batch.BatchInformation.Task; import com.google.common.base.Preconditions; /** - * This class processes {@link SpanBatchDeleteInformation} objects by - * deleting the entries in the Fluo Column corresponding to the {@link Span} - * of the BatchInformation object. This class will delete entries until the - * batch size is met, and then create a new SpanBatchDeleteInformation object - * with an updated Span whose starting point is the stopping point of this - * batch. If the batch limit is not met, then a new batch is not created and - * the task is complete. + * This class processes {@link SpanBatchDeleteInformation} objects by deleting the entries in the Fluo Column + * corresponding to the {@link Span} of the BatchInformation object. This class will delete entries until the batch size + * is met, and then create a new SpanBatchDeleteInformation object with an updated Span whose starting point is the + * stopping point of this batch. If the batch limit is not met, then a new batch is not created and the task is + * complete. * */ public class SpanBatchBindingSetUpdater extends AbstractBatchBindingSetUpdater { @@ -49,8 +49,8 @@ public class SpanBatchBindingSetUpdater extends AbstractBatchBindingSetUpdater { private static final Logger log = Logger.getLogger(SpanBatchBindingSetUpdater.class); /** - * Process SpanBatchDeleteInformation objects by deleting all entries indicated - * by Span until batch limit is met. + * Process SpanBatchDeleteInformation objects by deleting all entries indicated by Span until batch limit is met. + * * @param tx - Fluo Transaction * @param row - Byte row identifying BatchInformation * @param batch - SpanBatchDeleteInformation object to be processed @@ -60,6 +60,7 @@ public class SpanBatchBindingSetUpdater extends AbstractBatchBindingSetUpdater { super.processBatch(tx, row, batch); Preconditions.checkArgument(batch instanceof SpanBatchDeleteInformation); SpanBatchDeleteInformation spanBatch = (SpanBatchDeleteInformation) batch; + Optional<String> nodeId = spanBatch.getNodeId(); Task task = spanBatch.getTask(); int batchSize = spanBatch.getBatchSize(); Span span = spanBatch.getSpan(); @@ -71,7 +72,7 @@ public class SpanBatchBindingSetUpdater extends AbstractBatchBindingSetUpdater { log.trace("The Task Add is not supported for SpanBatchBindingSetUpdater. Batch " + batch + " will not be processed."); break; case Delete: - rowCol = deleteBatch(tx, span, column, batchSize); + rowCol = deleteBatch(tx, nodeId, span, column, batchSize); break; case Update: log.trace("The Task Update is not supported for SpanBatchBindingSetUpdater. Batch " + batch + " will not be processed."); @@ -90,7 +91,7 @@ public class SpanBatchBindingSetUpdater extends AbstractBatchBindingSetUpdater { } } - private Optional<RowColumn> deleteBatch(TransactionBase tx, Span span, Column column, int batchSize) { + private Optional<RowColumn> deleteBatch(TransactionBase tx, Optional<String> nodeId, Span span, Column column, int batchSize) { log.trace("Deleting batch of size: " + batchSize + " using Span: " + span + " and Column: " + column); RowScanner rs = tx.scanner().over(span).fetch(column).byRow().build(); @@ -100,18 +101,39 @@ public class SpanBatchBindingSetUpdater extends AbstractBatchBindingSetUpdater { int count = 0; boolean batchLimitMet = false; Bytes row = span.getStart().getRow(); + + //get prefix if nodeId is specified + Optional<Bytes> prefixBytes = Optional.empty(); + if (nodeId.isPresent()) { + NodeType type = NodeType.fromNodeId(nodeId.get()).get(); + prefixBytes = Optional.ofNullable(Bytes.of(type.getNodeTypePrefix())); + } + while (colScannerIter.hasNext() && !batchLimitMet) { ColumnScanner colScanner = colScannerIter.next(); row = colScanner.getRow(); - Iterator<ColumnValue> iter = colScanner.iterator(); - while (iter.hasNext()) { - if (count >= batchSize) { - batchLimitMet = true; - break; + + //extract the nodeId from the returned row if a nodeId was passed + //into the SpanBatchInformation. This is to ensure that the returned + //row nodeId is equal to the nodeId passed in to the span batch information + Optional<String> rowNodeId = Optional.empty(); + if (prefixBytes.isPresent()) { + rowNodeId = Optional.of(BindingSetRow.makeFromShardedRow(prefixBytes.get(), row).getNodeId()); + } + + //if nodeId is present, then results returned by span are filtered + //on the nodeId. This occurs when the hash is not included in the span + if (!rowNodeId.isPresent() || rowNodeId.equals(nodeId)) { + Iterator<ColumnValue> iter = colScanner.iterator(); + while (iter.hasNext()) { + if (count >= batchSize) { + batchLimitMet = true; + break; + } + ColumnValue colVal = iter.next(); + tx.delete(row, colVal.getColumn()); + count++; } - ColumnValue colVal = iter.next(); - tx.delete(row, colVal.getColumn()); - count++; } }
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/62de7c5d/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/SpanBatchDeleteInformation.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/SpanBatchDeleteInformation.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/SpanBatchDeleteInformation.java index 3b1e245..87158b7 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/SpanBatchDeleteInformation.java +++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/SpanBatchDeleteInformation.java @@ -1,4 +1,8 @@ package org.apache.rya.indexing.pcj.fluo.app.batch; +import static com.google.common.base.Preconditions.checkNotNull; + +import java.util.Optional; + /* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file @@ -19,22 +23,38 @@ package org.apache.rya.indexing.pcj.fluo.app.batch; */ import org.apache.fluo.api.data.Column; import org.apache.fluo.api.data.Span; +import org.apache.rya.indexing.pcj.fluo.app.util.BindingHashShardingFunction; /** * This class represents a batch order to delete all entries in the Fluo table indicated * by the given Span and Column. These batch orders are processed by the {@link BatchObserver}, * which uses this batch information along with the nodeId passed into the Observer to perform - * batch deletes. + * batch deletes. * */ public class SpanBatchDeleteInformation extends AbstractSpanBatchInformation { private static final BatchBindingSetUpdater updater = new SpanBatchBindingSetUpdater(); - - public SpanBatchDeleteInformation(int batchSize, Column column, Span span) { + private Optional<String> nodeId; + + /** + * Create a new SpanBatchInformation object. + * @param nodeId - Optional nodeId that is used to filter returned results. Useful if the shard Id + * is not included in the Span (see {@link BindingHashShardingFunction} for more info about how sharded + * row keys are generated). + * @param batchSize - size of batch to be deleted + * @param column - column whose entries will be deleted + * @param span - Span indicating the range of data to delete. Sometimes the Span cannot contain the hash + * (for example, if you are deleting all of the results associated with a nodeId). In this case, a nodeId + * should be specified along with a Span equal to the prefix of the nodeId. + * @throws IllegalArgumentException if nodeId, column or span is null and if batchSize <= 0. + */ + public SpanBatchDeleteInformation(Optional<String> nodeId, int batchSize, Column column, Span span) { super(batchSize, Task.Delete, column, span); + checkNotNull(nodeId); + this.nodeId = nodeId; } - + /** * @return Updater that applies the {@link Task} to the given {@link Span} and {@link Column} */ @@ -42,17 +62,42 @@ public class SpanBatchDeleteInformation extends AbstractSpanBatchInformation { public BatchBindingSetUpdater getBatchUpdater() { return updater; } - - + + /** + * Returns an Optional nodeId. If this value is specified, the results + * returned from the Fluo scan over the indicated range will be filtered + * by the nodeId. The nodeId allows results for a given query nodeId to be + * deleted using a Span even if the hash cannot be specified when forming the + * rowId in the table. + * @return - the nodeId whose results will be batch deleted + */ + public Optional<String> getNodeId() { + return nodeId; + } + + @Override + public String toString() { + return new StringBuilder() + .append("Span Batch Information {\n") + .append(" Span: " + super.getSpan() + "\n") + .append(" Batch Size: " + super.getBatchSize() + "\n") + .append(" Task: " + super.getTask() + "\n") + .append(" Column: " + super.getColumn() + "\n") + .append(" NodeId: " + nodeId + "\n") + .append("}") + .toString(); + } + public static Builder builder() { return new Builder(); } - + public static class Builder { private int batchSize = DEFAULT_BATCH_SIZE; private Column column; private Span span; + private Optional<String> nodeId = Optional.empty(); /** * @param batchSize - {@link Task}s are applied in batches of this size @@ -74,19 +119,34 @@ public class SpanBatchDeleteInformation extends AbstractSpanBatchInformation { /** * @param span - span that batch {@link Task} will be applied to - * + * */ public Builder setSpan(Span span) { this.span = span; return this; } + /** + * Sets the nodeId whose results will be batch deleted. This optional value + * allows the {@link SpanBatchBindingSetUpdater} to filter on the indicated + * nodeId. Because the results of the Fluo table are sharded, if the Span does + * not include the shard, then it is not possible to scan exactly for all results + * pertaining to a specific nodeId. In the event that a user wants to delete all nodes + * related to a specific entry, this Optional nodeId should be specified to retrieve + * only the results associated with the indicated nodeId. + * @param nodeId - node whose results will be batch deleted + * @return - Builder for chaining method calls + */ + public Builder setNodeId(Optional<String> nodeId) { + this.nodeId = nodeId; + return this; + } /** * @return an instance of {@link SpanBatchDeleteInformation} constructed from parameters passed to this Builder */ public SpanBatchDeleteInformation build() { - return new SpanBatchDeleteInformation(batchSize, column, span); + return new SpanBatchDeleteInformation(nodeId, batchSize, column, span); } } http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/62de7c5d/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/serializer/SpanBatchInformationTypeAdapter.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/serializer/SpanBatchInformationTypeAdapter.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/serializer/SpanBatchInformationTypeAdapter.java index 98deb8e..8644c31 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/serializer/SpanBatchInformationTypeAdapter.java +++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/serializer/SpanBatchInformationTypeAdapter.java @@ -1,4 +1,5 @@ package org.apache.rya.indexing.pcj.fluo.app.batch.serializer; + /* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file @@ -18,6 +19,7 @@ package org.apache.rya.indexing.pcj.fluo.app.batch.serializer; * under the License. */ import java.lang.reflect.Type; +import java.util.Optional; import org.apache.fluo.api.data.Column; import org.apache.fluo.api.data.RowColumn; @@ -37,10 +39,12 @@ import com.google.gson.JsonSerializer; * JsonSerializer/JsonDeserializer used to serialize/deserialize {@link SpanBatchDeleteInformation} objects. * */ -public class SpanBatchInformationTypeAdapter implements JsonSerializer<SpanBatchDeleteInformation>, JsonDeserializer<SpanBatchDeleteInformation> { +public class SpanBatchInformationTypeAdapter + implements JsonSerializer<SpanBatchDeleteInformation>, JsonDeserializer<SpanBatchDeleteInformation> { @Override - public SpanBatchDeleteInformation deserialize(JsonElement element, Type typeOfT, JsonDeserializationContext context) throws JsonParseException { + public SpanBatchDeleteInformation deserialize(JsonElement element, Type typeOfT, JsonDeserializationContext context) + throws JsonParseException { JsonObject json = element.getAsJsonObject(); int batchSize = json.get("batchSize").getAsInt(); String[] colArray = json.get("column").getAsString().split("\u0000"); @@ -49,7 +53,12 @@ public class SpanBatchInformationTypeAdapter implements JsonSerializer<SpanBatch boolean startInc = json.get("startInc").getAsBoolean(); boolean endInc = json.get("endInc").getAsBoolean(); Span span = new Span(new RowColumn(rows[0]), startInc, new RowColumn(rows[1]), endInc); - return SpanBatchDeleteInformation.builder().setBatchSize(batchSize).setSpan(span).setColumn(column).build(); + String nodeId = json.get("nodeId").getAsString(); + Optional<String> id = Optional.empty(); + if (!nodeId.isEmpty()) { + id = Optional.of(nodeId); + } + return SpanBatchDeleteInformation.builder().setNodeId(id).setBatchSize(batchSize).setSpan(span).setColumn(column).build(); } @Override @@ -63,6 +72,8 @@ public class SpanBatchInformationTypeAdapter implements JsonSerializer<SpanBatch result.add("span", new JsonPrimitive(span.getStart().getsRow() + "\u0000" + span.getEnd().getsRow())); result.add("startInc", new JsonPrimitive(span.isStartInclusive())); result.add("endInc", new JsonPrimitive(span.isEndInclusive())); + String nodeId = batch.getNodeId().orElse(""); + result.add("nodeId", new JsonPrimitive(nodeId)); return result; } http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/62de7c5d/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/RyaSubGraphExporter.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/RyaSubGraphExporter.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/RyaSubGraphExporter.java index e33ea97..20a6b97 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/RyaSubGraphExporter.java +++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/RyaSubGraphExporter.java @@ -19,6 +19,7 @@ package org.apache.rya.indexing.pcj.fluo.app.export.rya; import static com.google.common.base.Preconditions.checkNotNull; +import static org.apache.rya.indexing.pcj.fluo.app.util.TriplePrefixUtils.addTriplePrefixAndConvertToBytes; import java.util.Collection; import java.util.Map; @@ -54,11 +55,11 @@ public class RyaSubGraphExporter implements IncrementalRyaSubGraphExporter { private static final Logger log = Logger.getLogger(RyaSubGraphExporter.class); private static final WholeRowTripleResolver TRIPLE_RESOLVER = new WholeRowTripleResolver(); private final FluoClient fluo; - + public RyaSubGraphExporter(FluoClient fluo) { this.fluo = Preconditions.checkNotNull(fluo); } - + @Override public Set<QueryType> getQueryTypes() { return Sets.newHashSet(QueryType.CONSTRUCT); @@ -78,12 +79,12 @@ public class RyaSubGraphExporter implements IncrementalRyaSubGraphExporter { public void export(String constructID, RyaSubGraph subgraph) throws ResultExportException { insertTriples(fluo.newTransaction(), subgraph.getStatements()); } - + private void insertTriples(TransactionBase tx, final Collection<RyaStatement> triples) { for (final RyaStatement triple : triples) { Optional<byte[]> visibility = Optional.fromNullable(triple.getColumnVisibility()); try { - tx.set(Bytes.of(spoFormat(triple)), FluoQueryColumns.TRIPLES, Bytes.of(visibility.or(new byte[0]))); + tx.set(spoFormat(triple), FluoQueryColumns.TRIPLES, Bytes.of(visibility.or(new byte[0]))); } catch (final TripleRowResolverException e) { log.error("Could not convert a Triple into the SPO format: " + triple); } @@ -97,10 +98,10 @@ public class RyaSubGraphExporter implements IncrementalRyaSubGraphExporter { * @return The Rya SPO representation of the triple. * @throws TripleRowResolverException The triple could not be converted. */ - private static byte[] spoFormat(final RyaStatement triple) throws TripleRowResolverException { + private static Bytes spoFormat(final RyaStatement triple) throws TripleRowResolverException { checkNotNull(triple); final Map<TABLE_LAYOUT, TripleRow> serialized = TRIPLE_RESOLVER.serialize(triple); final TripleRow spoRow = serialized.get(TABLE_LAYOUT.SPO); - return spoRow.getRow(); + return addTriplePrefixAndConvertToBytes(spoRow.getRow()); } } http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/62de7c5d/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/AggregationObserver.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/AggregationObserver.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/AggregationObserver.java index 6147fa8..a8c4d58 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/AggregationObserver.java +++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/AggregationObserver.java @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -19,6 +19,7 @@ package org.apache.rya.indexing.pcj.fluo.app.observers; import static java.util.Objects.requireNonNull; +import static org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.AGGREGATION_PREFIX; import org.apache.fluo.api.client.TransactionBase; import org.apache.fluo.api.data.Bytes; @@ -54,8 +55,8 @@ public class AggregationObserver extends BindingSetUpdater { requireNonNull(tx); requireNonNull(row); - // Fetch the Aggregation node's metadata. - final String nodeId = BindingSetRow.make(row).getNodeId(); + // Make nodeId and fetch the Aggregation node's metadata. + final String nodeId = BindingSetRow.makeFromShardedRow(Bytes.of(AGGREGATION_PREFIX), row).getNodeId(); final AggregationMetadata metadata = queryDao.readAggregationMetadata(tx, nodeId); // Read the Visibility Binding Set from the value. http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/62de7c5d/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/ConstructQueryResultObserver.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/ConstructQueryResultObserver.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/ConstructQueryResultObserver.java index 09d9ede..01c9d73 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/ConstructQueryResultObserver.java +++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/ConstructQueryResultObserver.java @@ -18,6 +18,8 @@ */ package org.apache.rya.indexing.pcj.fluo.app.observers; +import static org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.CONSTRUCT_PREFIX; + import org.apache.fluo.api.client.TransactionBase; import org.apache.fluo.api.data.Bytes; import org.apache.fluo.api.data.Column; @@ -25,12 +27,12 @@ import org.apache.fluo.api.observer.AbstractObserver; import org.apache.log4j.Logger; import org.apache.rya.api.domain.RyaStatement; import org.apache.rya.indexing.pcj.fluo.app.BindingSetRow; -import org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants; import org.apache.rya.indexing.pcj.fluo.app.NodeType; import org.apache.rya.indexing.pcj.fluo.app.export.IncrementalRyaSubGraphExporter; import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns; import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryMetadataCache; import org.apache.rya.indexing.pcj.fluo.app.query.MetadataCacheSupplier; +import org.apache.rya.indexing.pcj.fluo.app.util.BindingHashShardingFunction; /** * Monitors the Column {@link FluoQueryColumns#CONSTRUCT_STATEMENTS} for new @@ -42,7 +44,7 @@ import org.apache.rya.indexing.pcj.fluo.app.query.MetadataCacheSupplier; public class ConstructQueryResultObserver extends AbstractObserver { private static final Logger log = Logger.getLogger(ConstructQueryResultObserver.class); - protected final FluoQueryMetadataCache queryDao = MetadataCacheSupplier.getOrCreateCache(); + private final FluoQueryMetadataCache queryDao = MetadataCacheSupplier.getOrCreateCache(); @Override public ObservedColumn getObservedColumn() { @@ -53,18 +55,18 @@ public class ConstructQueryResultObserver extends AbstractObserver { public void process(TransactionBase tx, Bytes row, Column col) throws Exception { //Build row for parent that result will be written to - BindingSetRow bsRow = BindingSetRow.make(row); + BindingSetRow bsRow = BindingSetRow.makeFromShardedRow(Bytes.of(CONSTRUCT_PREFIX), row); String constructNodeId = bsRow.getNodeId(); String bsString= bsRow.getBindingSetString(); String parentNodeId = queryDao.readMetadadataEntry(tx, constructNodeId, FluoQueryColumns.CONSTRUCT_PARENT_NODE_ID).toString(); - String rowString = parentNodeId + IncrementalUpdateConstants.NODEID_BS_DELIM + bsString; + Bytes rowBytes = BindingHashShardingFunction.getShardedScanPrefix(parentNodeId, bsString); //Get NodeType of the parent node NodeType parentType = NodeType.fromNodeId(parentNodeId).get(); //Get data for the ConstructQuery result Bytes bytes = tx.get(row, col); //Write result to parent - tx.set(Bytes.of(rowString), parentType.getResultColumn(), bytes); + tx.set(rowBytes, parentType.getResultColumn(), bytes); } } http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/62de7c5d/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/FilterObserver.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/FilterObserver.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/FilterObserver.java index b4edfea..844343c 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/FilterObserver.java +++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/FilterObserver.java @@ -19,6 +19,7 @@ package org.apache.rya.indexing.pcj.fluo.app.observers; import static java.util.Objects.requireNonNull; +import static org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.FILTER_PREFIX; import org.apache.fluo.api.client.TransactionBase; import org.apache.fluo.api.data.Bytes; @@ -49,7 +50,7 @@ public class FilterObserver extends BindingSetUpdater { requireNonNull(row); // Read the Filter metadata. - final String filterNodeId = BindingSetRow.make(row).getNodeId(); + final String filterNodeId = BindingSetRow.makeFromShardedRow(Bytes.of(FILTER_PREFIX), row).getNodeId(); final FilterMetadata filterMetadata = queryDao.readFilterMetadata(tx, filterNodeId); // Read the Visibility Binding Set from the value. http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/62de7c5d/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/JoinObserver.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/JoinObserver.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/JoinObserver.java index c56a98f..f3f409e 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/JoinObserver.java +++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/JoinObserver.java @@ -19,6 +19,7 @@ package org.apache.rya.indexing.pcj.fluo.app.observers; import static java.util.Objects.requireNonNull; +import static org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.JOIN_PREFIX; import org.apache.fluo.api.client.TransactionBase; import org.apache.fluo.api.data.Bytes; @@ -49,7 +50,7 @@ public class JoinObserver extends BindingSetUpdater { requireNonNull(row); // Read the Join metadata. - final String joinNodeId = BindingSetRow.make(row).getNodeId(); + final String joinNodeId = BindingSetRow.makeFromShardedRow(Bytes.of(JOIN_PREFIX), row).getNodeId(); final JoinMetadata joinMetadata = queryDao.readJoinMetadata(tx, joinNodeId); // Read the Visibility Binding Set from the value. http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/62de7c5d/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/PeriodicQueryObserver.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/PeriodicQueryObserver.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/PeriodicQueryObserver.java index 7d96baa..87d0ca2 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/PeriodicQueryObserver.java +++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/PeriodicQueryObserver.java @@ -19,6 +19,7 @@ package org.apache.rya.indexing.pcj.fluo.app.observers; import static java.util.Objects.requireNonNull; +import static org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.PERIODIC_QUERY_PREFIX; import org.apache.fluo.api.client.TransactionBase; import org.apache.fluo.api.data.Bytes; @@ -53,7 +54,7 @@ public class PeriodicQueryObserver extends BindingSetUpdater { requireNonNull(row); // Read the Join metadata. - final String periodicBinNodeId = BindingSetRow.make(row).getNodeId(); + final String periodicBinNodeId = BindingSetRow.makeFromShardedRow(Bytes.of(PERIODIC_QUERY_PREFIX), row).getNodeId(); final PeriodicQueryMetadata periodicBinMetadata = queryDao.readPeriodicQueryMetadata(tx, periodicBinNodeId); // Read the Visibility Binding Set from the Value. @@ -65,6 +66,6 @@ public class PeriodicQueryObserver extends BindingSetUpdater { return new Observation(periodicBinNodeId, periodicBinBindingSet, parentNodeId); } - + } http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/62de7c5d/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/ProjectionObserver.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/ProjectionObserver.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/ProjectionObserver.java index 5d73b2e..b77bf91 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/ProjectionObserver.java +++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/ProjectionObserver.java @@ -19,6 +19,7 @@ package org.apache.rya.indexing.pcj.fluo.app.observers; import static java.util.Objects.requireNonNull; +import static org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.PROJECTION_PREFIX; import org.apache.fluo.api.client.TransactionBase; import org.apache.fluo.api.data.Bytes; @@ -48,7 +49,7 @@ public class ProjectionObserver extends BindingSetUpdater { requireNonNull(row); // Read the Filter metadata. - final String projectionNodeId = BindingSetRow.make(row).getNodeId(); + final String projectionNodeId = BindingSetRow.makeFromShardedRow(Bytes.of(PROJECTION_PREFIX), row).getNodeId(); final ProjectionMetadata projectionMetadata = queryDao.readProjectionMetadata(tx, projectionNodeId); // Read the Visibility Binding Set from the value. http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/62de7c5d/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/QueryResultObserver.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/QueryResultObserver.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/QueryResultObserver.java index 78d0ec5..7fa4d38 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/QueryResultObserver.java +++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/QueryResultObserver.java @@ -18,13 +18,14 @@ */ package org.apache.rya.indexing.pcj.fluo.app.observers; -import static org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.NODEID_BS_DELIM; +import static org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.QUERY_PREFIX; import static org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns.QUERY_BINDING_SET; import org.apache.fluo.api.client.TransactionBase; import org.apache.fluo.api.data.Bytes; import org.apache.fluo.api.data.Column; import org.apache.fluo.api.observer.AbstractObserver; +import org.apache.rya.indexing.pcj.fluo.app.BindingSetRow; import org.apache.rya.indexing.pcj.fluo.app.export.ExporterManager; import org.apache.rya.indexing.pcj.fluo.app.export.IncrementalBindingSetExporter; import org.apache.rya.indexing.pcj.fluo.app.export.IncrementalResultExporter; @@ -50,7 +51,7 @@ import com.google.common.collect.ImmutableSet; public class QueryResultObserver extends AbstractObserver { private static final Logger log = LoggerFactory.getLogger(QueryResultObserver.class); - protected final FluoQueryMetadataCache queryDao = MetadataCacheSupplier.getOrCreateCache(); + private final FluoQueryMetadataCache queryDao = MetadataCacheSupplier.getOrCreateCache(); /** * Builders for each type of {@link IncrementalBindingSetExporter} we support. */ @@ -97,10 +98,9 @@ public class QueryResultObserver extends AbstractObserver { @Override public void process(final TransactionBase tx, final Bytes brow, final Column col) throws Exception { - final String row = brow.toString(); // Read the queryId from the row and get the QueryMetadata. - final String queryId = row.split(NODEID_BS_DELIM)[0]; + final String queryId = BindingSetRow.makeFromShardedRow(Bytes.of(QUERY_PREFIX), brow).getNodeId(); final QueryMetadata metadata = queryDao.readQueryMetadata(tx, queryId); // Read the Child Binding Set that will be exported. http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/62de7c5d/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/StatementPatternObserver.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/StatementPatternObserver.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/StatementPatternObserver.java index 607267a..e3c5b95 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/StatementPatternObserver.java +++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/StatementPatternObserver.java @@ -19,6 +19,7 @@ package org.apache.rya.indexing.pcj.fluo.app.observers; import static java.util.Objects.requireNonNull; +import static org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.SP_PREFIX; import org.apache.fluo.api.client.TransactionBase; import org.apache.fluo.api.data.Bytes; @@ -48,8 +49,8 @@ public class StatementPatternObserver extends BindingSetUpdater { requireNonNull(tx); requireNonNull(row); - // Read the Statement Pattern metadata. - final String spNodeId = BindingSetRow.make(row).getNodeId(); + // Make nodeId and get the Statement Pattern metadata. + final String spNodeId = BindingSetRow.makeFromShardedRow(Bytes.of(SP_PREFIX), row).getNodeId(); final StatementPatternMetadata spMetadata = queryDao.readStatementPatternMetadata(tx, spNodeId); // Read the Visibility Binding Set from the value. http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/62de7c5d/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/TripleObserver.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/TripleObserver.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/TripleObserver.java index d6fd8bd..83517bd 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/TripleObserver.java +++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/TripleObserver.java @@ -19,24 +19,23 @@ package org.apache.rya.indexing.pcj.fluo.app.observers; import static org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.DELIM; -import static org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.NODEID_BS_DELIM; -import static org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.SP_PREFIX; import java.util.Map; +import java.util.Set; import org.apache.fluo.api.client.TransactionBase; -import org.apache.fluo.api.client.scanner.ColumnScanner; -import org.apache.fluo.api.client.scanner.RowScanner; import org.apache.fluo.api.data.Bytes; import org.apache.fluo.api.data.Column; -import org.apache.fluo.api.data.Span; import org.apache.fluo.api.observer.AbstractObserver; import org.apache.rya.api.domain.RyaStatement; import org.apache.rya.indexing.pcj.fluo.app.IncUpdateDAO; import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns; import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryMetadataCache; import org.apache.rya.indexing.pcj.fluo.app.query.MetadataCacheSupplier; +import org.apache.rya.indexing.pcj.fluo.app.query.StatementPatternIdCache; +import org.apache.rya.indexing.pcj.fluo.app.query.StatementPatternIdCacheSupplier; import org.apache.rya.indexing.pcj.fluo.app.query.StatementPatternMetadata; +import org.apache.rya.indexing.pcj.fluo.app.util.BindingHashShardingFunction; import org.apache.rya.indexing.pcj.storage.accumulo.VariableOrder; import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet; import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSetSerDe; @@ -44,7 +43,6 @@ import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSetStringCo import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.google.common.base.Charsets; import com.google.common.collect.Maps; /** @@ -56,11 +54,10 @@ public class TripleObserver extends AbstractObserver { private static final Logger log = LoggerFactory.getLogger(TripleObserver.class); private static final VisibilityBindingSetSerDe BS_SERDE = new VisibilityBindingSetSerDe(); - private static final FluoQueryMetadataCache QUERY_METADATA_DAO = MetadataCacheSupplier.getOrCreateCache(); + private final FluoQueryMetadataCache QUERY_METADATA_DAO = MetadataCacheSupplier.getOrCreateCache(); + private final StatementPatternIdCache SP_ID_CACHE = StatementPatternIdCacheSupplier.getOrCreateCache(); private static final VisibilityBindingSetStringConverter VIS_BS_CONVERTER = new VisibilityBindingSetStringConverter(); - public TripleObserver() {} - @Override public ObservedColumn getObservedColumn() { return new ObservedColumn(FluoQueryColumns.TRIPLES, NotificationType.STRONG); @@ -71,53 +68,45 @@ public class TripleObserver extends AbstractObserver { // Get string representation of triple. final RyaStatement ryaStatement = IncUpdateDAO.deserializeTriple(brow); log.trace("Transaction ID: {}\nRya Statement: {}\n", tx.getStartTimestamp(), ryaStatement); + log.trace("Beginging to process triple."); final String triple = IncUpdateDAO.getTripleString(ryaStatement); - // Iterate over each of the Statement Patterns that are being matched against. - final RowScanner spScanner = tx.scanner() - .over(Span.prefix(SP_PREFIX)) - - // Only fetch rows that have the pattern in them. There will only be a single row with a pattern per SP. - .fetch(FluoQueryColumns.STATEMENT_PATTERN_PATTERN) - .byRow() - .build(); + Set<String> spIDs = SP_ID_CACHE.getStatementPatternIds(tx); //see if triple matches conditions of any of the SP - for (final ColumnScanner colScanner : spScanner) { - // Get the Statement Pattern's node id. - final String spID = colScanner.getsRow(); - + for (String spID: spIDs) { // Fetch its metadata. final StatementPatternMetadata spMetadata = QUERY_METADATA_DAO.readStatementPatternMetadata(tx, spID); + log.trace("Retrieved metadata: {}", spMetadata); + // Attempt to match the triple against the pattern. final String pattern = spMetadata.getStatementPattern(); final VariableOrder varOrder = spMetadata.getVariableOrder(); final String bindingSetString = getBindingSet(triple, pattern, varOrder); + log.trace("Created binding set match string: {}", bindingSetString); + // Statement matches to a binding set. if(bindingSetString.length() != 0) { // Fetch the triple's visibility label. final String visibility = tx.gets(brow.toString(), FluoQueryColumns.TRIPLES, ""); - // Create the Row ID for the emitted binding set. It does not contain visibilities. - final String row = spID + NODEID_BS_DELIM + bindingSetString; - final Bytes rowBytes = Bytes.of( row.getBytes(Charsets.UTF_8) ); + //Make BindingSet and sharded row + final VisibilityBindingSet visBindingSet = VIS_BS_CONVERTER.convert(bindingSetString, varOrder); + visBindingSet.setVisibility(visibility); + Bytes row = BindingHashShardingFunction.addShard(spID, varOrder, visBindingSet); // If this is a new Binding Set, then emit it. - if(tx.get(rowBytes, FluoQueryColumns.STATEMENT_PATTERN_BINDING_SET) == null) { - // Create the Binding Set that goes in the Node Value. It does contain visibilities. - final VisibilityBindingSet visBindingSet = VIS_BS_CONVERTER.convert(bindingSetString, varOrder); - visBindingSet.setVisibility(visibility); - + if(tx.get(row, FluoQueryColumns.STATEMENT_PATTERN_BINDING_SET) == null) { try { final Bytes valueBytes = BS_SERDE.serialize(visBindingSet); log.trace("Transaction ID: {}\nMatched Statement Pattern: {}\nBinding Set: {}\n", tx.getStartTimestamp(), spID, visBindingSet); - tx.set(rowBytes, FluoQueryColumns.STATEMENT_PATTERN_BINDING_SET, valueBytes); + tx.set(row, FluoQueryColumns.STATEMENT_PATTERN_BINDING_SET, valueBytes); } catch(final Exception e) { log.error("Couldn't serialize a Binding Set. This value will be skipped.", e); } http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/62de7c5d/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryColumns.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryColumns.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryColumns.java index 6ca0e8d..c30843d 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryColumns.java +++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryColumns.java @@ -53,7 +53,7 @@ import edu.umd.cs.findbugs.annotations.NonNull; * <table border="1" style="width:100%"> * <tr> <th>Fluo Row</td> <th>Fluo Column</td> <th>Fluo Value</td> </tr> * <tr> <td>Node ID</td> <td>projectionMetadata:nodeId</td> <td>The Node ID of the Query.</td> </tr> - * <tr> <td>Node ID</td> <td>projectionMetadata:projectedVars</td> <td>The variables that results are projected onto.</td> </tr>* + * <tr> <td>Node ID</td> <td>projectionMetadata:projectedVars</td> <td>The variables that results are projected onto.</td> </tr>* * <tr> <td>Node ID</td> <td>projectionMetadata:variableOrder</td> <td>The Variable Order that Binding values are written in in the Row to identify solutions.</td> </tr> * <tr> <td>Node ID</td> <td>projectionMetadata:childNodeId</td> <td>The Node ID of the child who feeds this node.</td> </tr> * <tr> <td>Node ID</td> <td>projectionMetadata:parentNodeId</td> <td>The Node ID of the parent of this node.</td> </tr> @@ -109,7 +109,7 @@ import edu.umd.cs.findbugs.annotations.NonNull; * <tr> <td>Node ID</td> <td>joinMetadata:parentNodeId</td> <td>The Node ID this join emits Binding Sets to.</td> </tr> * <tr> <td>Node ID</td> <td>joinMetadata:leftChildNodeId</td> <td>A Node ID of the node that feeds this node Binding Sets.</td> </tr> * <tr> <td>Node ID</td> <td>joinMetadata:rightChildNodeId</td> <td>A Node ID of the node that feeds this node Binding Sets.</td> </tr> - * <tr> <td>Node ID</td> <td>joinMetadata:joinBatchSize</td> <td>Batch size used for processing joins</td> </tr> + * <tr> <td>Node ID</td> <td>joinMetadata:joinBatchSize</td> <td>Batch size used for processing joins</td> </tr> * <tr> <td>Node ID + DELIM + Binding Set String</td> <td>joinMetadata:bindingSet</td> <td>A {@link VisibilityBindingSet} object.</td> </tr> * </table> * </p> @@ -171,7 +171,7 @@ public class FluoQueryColumns { public static final Column QUERY_BINDING_SET = new Column(QUERY_METADATA_CF, "bindingSet"); public static final Column QUERY_EXPORT_STRATEGIES = new Column(QUERY_METADATA_CF, "exportStrategies"); public static final Column QUERY_TYPE = new Column(QUERY_METADATA_CF, "queryType"); - + // Query Metadata columns. public static final Column PROJECTION_NODE_ID = new Column(PROJECTION_METADATA_CF, "nodeId"); public static final Column PROJECTION_PROJECTED_VARS = new Column(PROJECTION_METADATA_CF, "projectedVars"); @@ -195,7 +195,7 @@ public class FluoQueryColumns { public static final Column FILTER_PARENT_NODE_ID = new Column(FILTER_METADATA_CF, "parentNodeId"); public static final Column FILTER_CHILD_NODE_ID = new Column(FILTER_METADATA_CF, "childNodeId"); public static final Column FILTER_BINDING_SET = new Column(FILTER_METADATA_CF, "bindingSet"); - + // Periodic Bin Metadata columns. public static final Column PERIODIC_QUERY_NODE_ID = new Column(PERIODIC_QUERY_METADATA_CF, "nodeId"); public static final Column PERIODIC_QUERY_VARIABLE_ORDER = new Column(PERIODIC_QUERY_METADATA_CF, "variableOrder"); @@ -206,7 +206,7 @@ public class FluoQueryColumns { public static final Column PERIODIC_QUERY_WINDOWSIZE = new Column(PERIODIC_QUERY_METADATA_CF, "windowSize"); public static final Column PERIODIC_QUERY_TIMEUNIT = new Column(PERIODIC_QUERY_METADATA_CF, "timeUnit"); public static final Column PERIODIC_QUERY_TEMPORAL_VARIABLE = new Column(PERIODIC_QUERY_METADATA_CF, "temporalVariable"); - + // Join Metadata columns. public static final Column JOIN_NODE_ID = new Column(JOIN_METADATA_CF, "nodeId"); public static final Column JOIN_VARIABLE_ORDER = new Column(JOIN_METADATA_CF, "variableOrder"); @@ -246,6 +246,13 @@ public class FluoQueryColumns { public static final Column BATCH_COLUMN = new Column("batch","information"); /** + * Column indicating a set of all StatementPattern ids in the Fluo table. This is used + * by the Triple of Observer for finding new queries to match incoming triple to. + */ + public static final Column STATEMENT_PATTERN_IDS = new Column("statementPattern", "ids"); + public static final Column STATEMENT_PATTERN_IDS_HASH = new Column("statementPattern", "hash"); + + /** * Enumerates the {@link Column}s that hold all of the fields for each type * of node that can compose a query. */ @@ -261,7 +268,7 @@ public class FluoQueryColumns { QUERY_TYPE, QUERY_EXPORT_STRATEGIES, QUERY_CHILD_NODE_ID)), - + /** * The columns a {@link ProjectionMetadata} object's fields are stored within. */ @@ -271,8 +278,8 @@ public class FluoQueryColumns { PROJECTION_VARIABLE_ORDER, PROJECTION_PARENT_NODE_ID, PROJECTION_CHILD_NODE_ID)), - - + + /** * The columns a {@link PeriodicBinMetadata} object's fields are stored within. */ @@ -297,7 +304,7 @@ public class FluoQueryColumns { CONSTRUCT_PARENT_NODE_ID, CONSTRUCT_STATEMENTS)), - + /** * The columns a {@link FilterMetadata} object's fields are stored within. */ @@ -317,7 +324,7 @@ public class FluoQueryColumns { JOIN_TYPE, JOIN_PARENT_NODE_ID, JOIN_LEFT_CHILD_NODE_ID, - JOIN_BATCH_SIZE, + JOIN_BATCH_SIZE, JOIN_RIGHT_CHILD_NODE_ID)), /** http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/62de7c5d/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryMetadataCache.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryMetadataCache.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryMetadataCache.java index 8adc40d..b1b4076 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryMetadataCache.java +++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryMetadataCache.java @@ -18,8 +18,7 @@ */package org.apache.rya.indexing.pcj.fluo.app.query; import static com.google.common.base.Preconditions.checkArgument; - -import java.util.concurrent.Callable; +import static com.google.common.base.Preconditions.checkNotNull; import org.apache.fluo.api.client.SnapshotBase; import org.apache.fluo.api.data.Bytes; @@ -32,15 +31,18 @@ import com.google.common.base.Optional; import com.google.common.cache.Cache; import com.google.common.cache.CacheBuilder; + /** * Wrapper for {@link FluoQueryMetadataDAO} that caches any metadata that has been retrieved from Fluo. This class first * checks the cache to see if the metadata is present before delegating to the underlying DAO method to retrieve the - * data. + * data. The cache has a fixed capacity (determined at construction time), and evicts the least recently used entries + * when space is needed. * */ public class FluoQueryMetadataCache extends FluoQueryMetadataDAO { private static final Logger LOG = LoggerFactory.getLogger(FluoQueryMetadataCache.class); + private final FluoQueryMetadataDAO dao; private final Cache<String, CommonNodeMetadata> commonNodeMetadataCache; private final Cache<String, Bytes> metadataCache; @@ -49,10 +51,15 @@ public class FluoQueryMetadataCache extends FluoQueryMetadataDAO { /** * Creates a FluoQueryMetadataCache with the specified capacity. Old, unused results are evicted as necessary. - * * @param capacity - max size of the cache + * @param concurrencyLevel - indicates how the cache will be partitioned to that different threads can access those + * partitions in a non-serialized manner + * @throws IllegalArgumentException if dao is null, capacity <= 0, or concurrencyLevel <= 0 */ public FluoQueryMetadataCache(FluoQueryMetadataDAO dao, int capacity, int concurrencyLevel) { + checkNotNull(dao); + checkArgument(capacity > 0); + checkArgument(concurrencyLevel > 0); this.dao = dao; commonNodeMetadataCache = CacheBuilder.newBuilder().concurrencyLevel(concurrencyLevel).maximumSize(capacity).build(); metadataCache = CacheBuilder.newBuilder().concurrencyLevel(concurrencyLevel).maximumSize(capacity).build(); @@ -75,166 +82,204 @@ public class FluoQueryMetadataCache extends FluoQueryMetadataDAO { return concurrencyLevel; } + + /** + * @throws IllegalArgumentException if tx or nodeId is null, and if {@link NodeType#fromNodeId(String)} + * does not return an Optional containing {@link NodeType#STATEMENT_PATTERN}. + */ @Override public StatementPatternMetadata readStatementPatternMetadata(SnapshotBase tx, String nodeId) { + checkNotNull(nodeId); + checkNotNull(tx); Optional<NodeType> type = NodeType.fromNodeId(nodeId); - + checkArgument(type.isPresent() && type.get() == NodeType.STATEMENT_PATTERN); try { - checkArgument(type.isPresent() && type.get() == NodeType.STATEMENT_PATTERN); LOG.debug("Retrieving Metadata from Cache: {}", nodeId); - return (StatementPatternMetadata) commonNodeMetadataCache.get(nodeId, new Callable<CommonNodeMetadata>() { - @Override - public CommonNodeMetadata call() throws Exception { - LOG.debug("Seeking Metadata from Fluo Table: {}", nodeId); - return dao.readStatementPatternMetadata(tx, nodeId); - } + return (StatementPatternMetadata) commonNodeMetadataCache.get(nodeId, () -> { + LOG.debug("Seeking Metadata from Fluo Table: {}", nodeId); + return dao.readStatementPatternMetadata(tx, nodeId); }); } catch (Exception e) { throw new RuntimeException("Unable to access StatementPatternMetadata for nodeId: " + nodeId, e); } } + /** + * @throws IllegalArgumentException if tx or nodeId is null, and if {@link NodeType#fromNodeId(String)} + * does not return an Optional containing {@link NodeType#JOIN}. + */ @Override public JoinMetadata readJoinMetadata(SnapshotBase tx, String nodeId) { + checkNotNull(nodeId); + checkNotNull(tx); Optional<NodeType> type = NodeType.fromNodeId(nodeId); + checkArgument(type.isPresent() && type.get() == NodeType.JOIN); try { - checkArgument(type.isPresent() && type.get() == NodeType.JOIN); LOG.debug("Retrieving Metadata from Cache: {}.", nodeId); - return (JoinMetadata) commonNodeMetadataCache.get(nodeId, new Callable<CommonNodeMetadata>() { - @Override - public CommonNodeMetadata call() throws Exception { - LOG.debug("Seeking Metadata from Fluo Table: {}.", nodeId); - return dao.readJoinMetadata(tx, nodeId); - } + return (JoinMetadata) commonNodeMetadataCache.get(nodeId, () -> { + LOG.debug("Seeking Metadata from Fluo Table: {}.", nodeId); + return dao.readJoinMetadata(tx, nodeId); }); } catch (Exception e) { throw new RuntimeException("Unable to access JoinMetadata for nodeId: " + nodeId, e); } } + /** + * @throws IllegalArgumentException if tx or nodeId is null, and if {@link NodeType#fromNodeId(String)} + * does not return an Optional containing {@link NodeType#FILTER}. + */ @Override public FilterMetadata readFilterMetadata(SnapshotBase tx, String nodeId) { + checkNotNull(nodeId); + checkNotNull(tx); Optional<NodeType> type = NodeType.fromNodeId(nodeId); + checkArgument(type.isPresent() && type.get() == NodeType.FILTER); try { - checkArgument(type.isPresent() && type.get() == NodeType.FILTER); LOG.debug("Retrieving Metadata from Cache: {}", nodeId); - return (FilterMetadata) commonNodeMetadataCache.get(nodeId, new Callable<CommonNodeMetadata>() { - @Override - public CommonNodeMetadata call() throws Exception { - LOG.debug("Seeking Metadata from Fluo Table: {}.", nodeId); - return dao.readFilterMetadata(tx, nodeId); - } + return (FilterMetadata) commonNodeMetadataCache.get(nodeId, () -> { + LOG.debug("Seeking Metadata from Fluo Table: {}.", nodeId); + return dao.readFilterMetadata(tx, nodeId); }); } catch (Exception e) { throw new RuntimeException("Unable to access FilterMetadata for nodeId: " + nodeId, e); } } + /** + * @throws IllegalArgumentException if tx or nodeId is null, and if {@link NodeType#fromNodeId(String)} + * does not return an Optional containing {@link NodeType#PROJECTION}. + */ @Override public ProjectionMetadata readProjectionMetadata(SnapshotBase tx, String nodeId) { + checkNotNull(nodeId); + checkNotNull(tx); Optional<NodeType> type = NodeType.fromNodeId(nodeId); checkArgument(type.isPresent() && type.get() == NodeType.PROJECTION); - LOG.debug("Retrieving Metadata from Cache: {}", nodeId); try { - return (ProjectionMetadata) commonNodeMetadataCache.get(nodeId, new Callable<CommonNodeMetadata>() { - @Override - public CommonNodeMetadata call() throws Exception { - LOG.debug("Seeking Metadata from Fluo Table: {}.", nodeId); - return dao.readProjectionMetadata(tx, nodeId); - } + LOG.debug("Retrieving Metadata from Cache: {}", nodeId); + return (ProjectionMetadata) commonNodeMetadataCache.get(nodeId, () -> { + LOG.debug("Seeking Metadata from Fluo Table: {}.", nodeId); + return dao.readProjectionMetadata(tx, nodeId); }); } catch (Exception e) { throw new RuntimeException("Unable to access ProjectionMetadata for nodeId: " + nodeId, e); } } + /** + * @throws IllegalArgumentException if tx or nodeId is null, and if {@link NodeType#fromNodeId(String)} + * does not return an Optional containing {@link NodeType#AGGREGATION}. + */ @Override public AggregationMetadata readAggregationMetadata(SnapshotBase tx, String nodeId) { + checkNotNull(nodeId); + checkNotNull(tx); Optional<NodeType> type = NodeType.fromNodeId(nodeId); + checkArgument(type.isPresent() && type.get() == NodeType.AGGREGATION); try { - checkArgument(type.isPresent() && type.get() == NodeType.AGGREGATION); LOG.debug("Retrieving Metadata from Cache: {}", nodeId); - return (AggregationMetadata) commonNodeMetadataCache.get(nodeId, new Callable<CommonNodeMetadata>() { - @Override - public CommonNodeMetadata call() throws Exception { - LOG.debug("Seeking Metadata from Fluo Table: {}.", nodeId); - return dao.readAggregationMetadata(tx, nodeId); - } + return (AggregationMetadata) commonNodeMetadataCache.get(nodeId, () -> { + LOG.debug("Seeking Metadata from Fluo Table: {}.", nodeId); + return dao.readAggregationMetadata(tx, nodeId); }); } catch (Exception e) { throw new RuntimeException("Unable to access AggregationMetadata for nodeId: " + nodeId, e); } } + /** + * @throws IllegalArgumentException if tx or nodeId is null, and if {@link NodeType#fromNodeId(String)} + * does not return an Optional containing {@link NodeType#CONSTRUCT}. + */ @Override public ConstructQueryMetadata readConstructQueryMetadata(SnapshotBase tx, String nodeId) { + checkNotNull(nodeId); + checkNotNull(tx); Optional<NodeType> type = NodeType.fromNodeId(nodeId); + checkArgument(type.isPresent() && type.get() == NodeType.CONSTRUCT); try { - checkArgument(type.isPresent() && type.get() == NodeType.CONSTRUCT); LOG.debug("Retrieving Metadata from Cache: {}", nodeId); - return (ConstructQueryMetadata) commonNodeMetadataCache.get(nodeId, new Callable<CommonNodeMetadata>() { - @Override - public CommonNodeMetadata call() throws Exception { - LOG.debug("Seeking Metadata from Fluo Table: {}.", nodeId); - return dao.readConstructQueryMetadata(tx, nodeId); - } + return (ConstructQueryMetadata) commonNodeMetadataCache.get(nodeId, () -> { + LOG.debug("Seeking Metadata from Fluo Table: {}.", nodeId); + return dao.readConstructQueryMetadata(tx, nodeId); }); } catch (Exception e) { throw new RuntimeException("Unable to access ConstructQueryMetadata for nodeId: " + nodeId, e); } } + /** + * @throws IllegalArgumentException if tx or nodeId is null, and if {@link NodeType#fromNodeId(String)} + * does not return an Optional containing {@link NodeType#PERIODIC_QUERY}. + */ @Override public PeriodicQueryMetadata readPeriodicQueryMetadata(SnapshotBase tx, String nodeId) { + checkNotNull(nodeId); + checkNotNull(tx); Optional<NodeType> type = NodeType.fromNodeId(nodeId); + checkArgument(type.isPresent() && type.get() == NodeType.PERIODIC_QUERY); try { - checkArgument(type.isPresent() && type.get() == NodeType.PERIODIC_QUERY); LOG.debug("Retrieving Metadata from Cache: {}", nodeId); - return (PeriodicQueryMetadata) commonNodeMetadataCache.get(nodeId, new Callable<CommonNodeMetadata>() { - @Override - public CommonNodeMetadata call() throws Exception { - LOG.debug("Seeking Metadata from Fluo Table: {}.", nodeId); - return dao.readPeriodicQueryMetadata(tx, nodeId); - } + return (PeriodicQueryMetadata) commonNodeMetadataCache.get(nodeId, () -> { + LOG.debug("Seeking Metadata from Fluo Table: {}.", nodeId); + return dao.readPeriodicQueryMetadata(tx, nodeId); }); } catch (Exception e) { throw new RuntimeException("Unable to access PeriodicQueryMetadata for nodeId: " + nodeId, e); } } + /** + * @throws IllegalArgumentException if tx or nodeId is null, and if {@link NodeType#fromNodeId(String)} + * does not return an Optional containing {@link NodeType#QUERY}. + */ @Override public QueryMetadata readQueryMetadata(SnapshotBase tx, String nodeId) { + checkNotNull(nodeId); + checkNotNull(tx); Optional<NodeType> type = NodeType.fromNodeId(nodeId); + checkArgument(type.isPresent() && type.get() == NodeType.QUERY); try { - checkArgument(type.isPresent() && type.get() == NodeType.QUERY); LOG.debug("Retrieving Metadata from Cache: {}", nodeId); - return (QueryMetadata) commonNodeMetadataCache.get(nodeId, new Callable<CommonNodeMetadata>() { - @Override - public CommonNodeMetadata call() throws Exception { - LOG.debug("Seeking Metadata from Fluo Table: {}.", nodeId); - return dao.readQueryMetadata(tx, nodeId); - } + return (QueryMetadata) commonNodeMetadataCache.get(nodeId, () -> { + LOG.debug("Seeking Metadata from Fluo Table: {}.", nodeId); + return dao.readQueryMetadata(tx, nodeId); }); } catch (Exception e) { throw new RuntimeException("Unable to access QueryMetadata for nodeId: " + nodeId, e); } } + /** + * Reads specific metadata entries from the cache. This method will retrieve the entry + * from the Fluo table if it does not already exist in the cache. + * @param tx - Transaction for interacting with Fluo + * @param rowId - rowId for metadata entry + * @param column - column of metadata entry + * @return - value associated with the metadata entry + */ public Bytes readMetadadataEntry(SnapshotBase tx, String rowId, Column column) { + checkNotNull(rowId); + checkNotNull(tx); + checkNotNull(column); Optional<NodeType> type = NodeType.fromNodeId(rowId); + checkArgument(type.isPresent() && type.get().getMetaDataColumns().contains(column)); try { - checkArgument(type.isPresent() && type.get().getMetaDataColumns().contains(column)); - return metadataCache.get(getKey(rowId, column), new Callable<Bytes>() { - @Override - public Bytes call() throws Exception { - return tx.get(Bytes.of(rowId), column); - } - }); + return metadataCache.get(getKey(rowId, column), () -> tx.get(Bytes.of(rowId), column)); } catch (Exception e) { throw new RuntimeException("Unable to access Metadata Entry with rowId: " + rowId + " and column: " + column, e); } } + /** + * Deletes contents of cache. + */ + public void clear() { + commonNodeMetadataCache.asMap().clear(); + metadataCache.asMap().clear(); + } + private String getKey(String row, Column column) { return row + ":" + column.getsQualifier() + ":" + column.getsQualifier(); } http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/62de7c5d/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryMetadataDAO.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryMetadataDAO.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryMetadataDAO.java index c132ad4..55e521e 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryMetadataDAO.java +++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryMetadataDAO.java @@ -47,6 +47,7 @@ import org.apache.rya.indexing.pcj.storage.accumulo.VariableOrder; import com.google.common.base.Charsets; import com.google.common.base.Joiner; import com.google.common.base.Optional; +import com.google.common.collect.Sets; import edu.umd.cs.findbugs.annotations.DefaultAnnotation; import edu.umd.cs.findbugs.annotations.NonNull; @@ -617,15 +618,19 @@ public class FluoQueryMetadataDAO { write(tx, join); } + Set<String> ids = new HashSet<>(); for(final StatementPatternMetadata statementPattern : query.getStatementPatternMetadata()) { write(tx, statementPattern); + ids.add(statementPattern.getNodeId()); } + StatementPatternIdManager.addStatementPatternIds(tx, Sets.newHashSet(ids)); for(final AggregationMetadata aggregation : query.getAggregationMetadata()) { write(tx, aggregation); } } + /** * Read an instance of {@link FluoQuery} from the Fluo table. * http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/62de7c5d/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/MetadataCacheSupplier.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/MetadataCacheSupplier.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/MetadataCacheSupplier.java index faab952..761100d 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/MetadataCacheSupplier.java +++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/MetadataCacheSupplier.java @@ -1,8 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ package org.apache.rya.indexing.pcj.fluo.app.query; +import java.util.concurrent.locks.ReentrantLock; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; +/** + * Manages the creation of the {@link FluoQueryMetadataCache} in the Fluo application. + * This supplier enforces singleton like behavior in that it will only create the cache if it + * doesn't already exist. The FluoQueryMetadataCache is not a singleton in itself. + */ public class MetadataCacheSupplier { private static final Logger LOG = LoggerFactory.getLogger(MetadataCacheSupplier.class); @@ -10,6 +35,7 @@ public class MetadataCacheSupplier { private static boolean initialized = false; private static final int DEFAULT_CAPACITY = 10000; private static final int DEFAULT_CONCURRENCY = 8; + private static final ReentrantLock lock = new ReentrantLock(); /** * Returns an existing cache with the specified instance name, or creates a cache. The created cache will have the @@ -19,21 +45,26 @@ public class MetadataCacheSupplier { * @param concurrencyLevel - concurrencyLevel used to create a new cache */ public static FluoQueryMetadataCache getOrCreateCache(int capacity, int concurrencyLevel) { - if (!initialized) { - LOG.debug("Cache has not been initialized. Initializing cache with capacity: {} and concurrencylevel: {}", capacity, - concurrencyLevel); - CACHE = new FluoQueryMetadataCache(new FluoQueryMetadataDAO(), capacity, concurrencyLevel); - initialized = true; - } else { - LOG.debug("Cache has already been initialized. Returning cache with capacity: {} and concurrencylevel: {}", - CACHE.getCapacity(), CACHE.getConcurrencyLevel()); + lock.lock(); + try { + if (!initialized) { + LOG.debug("Cache has not been initialized. Initializing cache with capacity: {} and concurrencylevel: {}", capacity, + concurrencyLevel); + CACHE = new FluoQueryMetadataCache(new FluoQueryMetadataDAO(), capacity, concurrencyLevel); + initialized = true; + } else { + LOG.warn( + "A cache has already been initialized, so a cache with capacity: {} and concurrency level: {} will not be created. Returning existing cache with capacity: {} and concurrencylevel: {}", + capacity, concurrencyLevel, CACHE.getCapacity(), CACHE.getConcurrencyLevel()); + } + return CACHE; + } finally { + lock.unlock(); } - return CACHE; } /** - * Returns cache with the name {@link FluoQueryMetadataCache#FLUO_CACHE_INSTANCE} if it exists, otherwise creates it - * with a default size of 10000 entries and a default concurrency level of 8. + * Creates a FluoQueryMetadataCache with a default size of 10000 entries and a default concurrency level of 8. * * @return - FluoQueryMetadataCache with default instance name and default capacity and concurrency */ @@ -41,4 +72,21 @@ public class MetadataCacheSupplier { return getOrCreateCache(DEFAULT_CAPACITY, DEFAULT_CONCURRENCY); } + /** + * Clears contents of cache and makes supplier uninitialized so that it creates a new cache. + * This is useful for integration tests. + */ + public static void clear() { + lock.lock(); + try{ + if(initialized) { + CACHE.clear(); + CACHE = null; + initialized = false; + } + } finally { + lock.unlock(); + } + } + } http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/62de7c5d/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/StatementPatternIdCache.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/StatementPatternIdCache.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/StatementPatternIdCache.java new file mode 100644 index 0000000..f1ddb02 --- /dev/null +++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/StatementPatternIdCache.java @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.indexing.pcj.fluo.app.query; + +import static com.google.common.base.Preconditions.checkNotNull; +import static org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.STATEMENT_PATTERN_ID; +import static org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.VAR_DELIM; +import static org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns.STATEMENT_PATTERN_IDS; +import static org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns.STATEMENT_PATTERN_IDS_HASH; + +import java.util.HashSet; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.locks.ReentrantLock; + +import org.apache.fluo.api.client.TransactionBase; +import org.apache.fluo.api.data.Bytes; + +import com.google.common.collect.Sets; + +/** + * This class caches the StatementPattern Ids so they don't have + * to be looked up each time a new Statement needs to be processed + * in the TripleObserver. + * + */ +public class StatementPatternIdCache { + + private final ReentrantLock lock = new ReentrantLock(); + private static Optional<String> HASH = Optional.empty(); + private static Set<String> IDS = new HashSet<>(); + + /** + * This method retrieves the StatementPattern NodeIds registered in the Fluo table. + * To determine whether the StatementPattern NodeIds have changed in the underlying Fluo table, + * this class maintains a local hash of the ids. When this method is called, it looks up the + * hash of the StatementPattern Id Strings in the Fluo table, and only if it is different + * than the local hash will the StatementPattern nodeIds be retrieved from the Fluo table. Otherwise, + * this method returns a local cache of the StatementPattern nodeIds. This method is thread safe. + * @param tx + * @return - Set of StatementPattern nodeIds + */ + public Set<String> getStatementPatternIds(TransactionBase tx) { + checkNotNull(tx); + Optional<Bytes> hashBytes = Optional.ofNullable(tx.get(Bytes.of(STATEMENT_PATTERN_ID), STATEMENT_PATTERN_IDS_HASH)); + if (hashBytes.isPresent()) { + String hash = hashBytes.get().toString(); + if ((HASH.isPresent() && HASH.get().equals(hash))) { + return IDS; + } + lock.lock(); + try { + String ids = tx.get(Bytes.of(STATEMENT_PATTERN_ID), STATEMENT_PATTERN_IDS).toString(); + IDS = Sets.newHashSet(ids.split(VAR_DELIM)); + HASH = Optional.of(hash); + return IDS; + } finally { + lock.unlock(); + } + } + return IDS; + } + + /** + * Clears contexts of cache so that it will be re-populated next time + * {@link StatementPatternIdCache#getStatementPatternIds(TransactionBase)} is called. + */ + public void clear() { + HASH = Optional.empty(); + IDS.clear(); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/62de7c5d/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/StatementPatternIdCacheSupplier.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/StatementPatternIdCacheSupplier.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/StatementPatternIdCacheSupplier.java new file mode 100644 index 0000000..01264dc --- /dev/null +++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/StatementPatternIdCacheSupplier.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.indexing.pcj.fluo.app.query; + +import java.util.concurrent.locks.ReentrantLock; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Manages the creation of the {@link StatementPatternIdCache} in the Fluo application. + * This supplier enforces singleton like behavior in that it will only create the cache if it + * doesn't already exist. The StatementPatternIdCache is not a singleton in itself. + */ +public class StatementPatternIdCacheSupplier { + + private static final Logger LOG = LoggerFactory.getLogger(StatementPatternIdCacheSupplier.class); + private static boolean initialized = false; + private static StatementPatternIdCache CACHE; + private static final ReentrantLock lock = new ReentrantLock(); + + /** + * Returns an existing cache if one has been created, otherwise creates a new cache. + * + * @return - existing StatementPatternIdCache or new cache if one didn't already exist + */ + public static StatementPatternIdCache getOrCreateCache() { + lock.lock(); + try { + if (!initialized) { + LOG.debug("Cache has not been initialized. Initializing StatementPatternIdCache"); + CACHE = new StatementPatternIdCache(); + initialized = true; + } else { + LOG.debug("A StatementPatternIdCache has already been initialized."); + } + return CACHE; + } finally { + lock.unlock(); + } + } + + /** + * Deletes stored cache and flags Supplier as uninitialized. + */ + public static void clear() { + lock.lock(); + try { + if (initialized) { + CACHE.clear(); + CACHE = null; + initialized = false; + } + } finally { + lock.unlock(); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/62de7c5d/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/StatementPatternIdManager.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/StatementPatternIdManager.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/StatementPatternIdManager.java new file mode 100644 index 0000000..ee4c053 --- /dev/null +++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/StatementPatternIdManager.java @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.indexing.pcj.fluo.app.query; + +import static com.google.common.base.Preconditions.checkNotNull; +import static org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.STATEMENT_PATTERN_ID; +import static org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.VAR_DELIM; +import static org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns.STATEMENT_PATTERN_IDS; +import static org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns.STATEMENT_PATTERN_IDS_HASH; + +import java.util.HashSet; +import java.util.Set; + +import org.apache.fluo.api.client.TransactionBase; +import org.apache.fluo.api.data.Bytes; + +import com.google.common.base.Joiner; +import com.google.common.base.Optional; +import com.google.common.collect.Sets; +import com.google.common.hash.Hashing; + +/** + * Utility class for updating and removing StatementPattern nodeIds in the Fluo table. All StatementPattern nodeIds are + * stored in a single set under a single entry in the Fluo table. This is to eliminate the need for a scan to find all + * StatementPattern nodeIds every time a new Triple enters the Fluo table. Instead, the nodeIds are cached locally, and + * only updated when the local hash of the nodeId set is dirty (not equal to the hash in the Fluo table). + */ +public class StatementPatternIdManager { + + /** + * Add specified Set of ids to the Fluo table with Column {@link FluoQueryColumns#STATEMENT_PATTERN_IDS}. Also + * updates the hash of the updated nodeId Set and writes that to the Column + * {@link FluoQueryColumns#STATEMENT_PATTERN_IDS_HASH} + * + * @param tx - Fluo Transaction object for performing atomic operations on Fluo table. + * @param ids - ids to add to the StatementPattern nodeId Set + */ + public static void addStatementPatternIds(TransactionBase tx, Set<String> ids) { + checkNotNull(tx); + checkNotNull(ids); + Optional<Bytes> val = Optional.fromNullable(tx.get(Bytes.of(STATEMENT_PATTERN_ID), STATEMENT_PATTERN_IDS)); + StringBuilder builder = new StringBuilder(); + if (val.isPresent()) { + builder.append(val.get().toString()); + builder.append(VAR_DELIM); + } + String idString = builder.append(Joiner.on(VAR_DELIM).join(ids)).toString(); + tx.set(Bytes.of(STATEMENT_PATTERN_ID), STATEMENT_PATTERN_IDS, Bytes.of(idString)); + tx.set(Bytes.of(STATEMENT_PATTERN_ID), STATEMENT_PATTERN_IDS_HASH, Bytes.of(Hashing.sha256().hashString(idString).toString())); + } + + /** + * Remove specified Set of ids from the Fluo table and updates the entry with Column + * {@link FluoQueryColumns#STATEMENT_PATTERN_IDS}. Also updates the hash of the updated nodeId Set and writes that + * to the Column {@link FluoQueryColumns#STATEMENT_PATTERN_IDS_HASH} + * + * @param tx - Fluo Transaction object for performing atomic operations on Fluo table. + * @param ids - ids to remove from the StatementPattern nodeId Set + */ + public static void removeStatementPatternIds(TransactionBase tx, Set<String> ids) { + checkNotNull(tx); + checkNotNull(ids); + Optional<Bytes> val = Optional.fromNullable(tx.get(Bytes.of(STATEMENT_PATTERN_ID), STATEMENT_PATTERN_IDS)); + Set<String> storedIds = new HashSet<>(); + if (val.isPresent()) { + storedIds = Sets.newHashSet(val.get().toString().split(VAR_DELIM)); + } + storedIds.removeAll(ids); + String idString = Joiner.on(VAR_DELIM).join(ids); + tx.set(Bytes.of(STATEMENT_PATTERN_ID), STATEMENT_PATTERN_IDS, Bytes.of(idString)); + tx.set(Bytes.of(STATEMENT_PATTERN_ID), STATEMENT_PATTERN_IDS_HASH, Bytes.of(Hashing.sha256().hashString(idString).toString())); + } + +}
