RYA-283-Batch-Observer-Integration. Closes #198.
Project: http://git-wip-us.apache.org/repos/asf/incubator-rya/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-rya/commit/ad60aca8 Tree: http://git-wip-us.apache.org/repos/asf/incubator-rya/tree/ad60aca8 Diff: http://git-wip-us.apache.org/repos/asf/incubator-rya/diff/ad60aca8 Branch: refs/heads/master Commit: ad60aca8d8a1002b528aeaa4376e86925dd0b2e0 Parents: e387818 Author: Caleb Meier <caleb.me...@parsons.com> Authored: Sat Aug 5 16:53:49 2017 -0700 Committer: Caleb Meier <caleb.me...@parsons.com> Committed: Fri Aug 18 17:04:45 2017 -0700 ---------------------------------------------------------------------- .../indexing/pcj/fluo/api/CreateFluoPcj.java | 17 +- .../pcj/fluo/app/JoinResultUpdater.java | 146 ++++--- .../batch/AbstractBatchBindingSetUpdater.java | 2 +- .../fluo/app/batch/JoinBatchInformation.java | 36 +- .../JoinBatchInformationTypeAdapter.java | 4 +- .../pcj/fluo/app/query/FluoQueryColumns.java | 3 + .../fluo/app/query/FluoQueryMetadataDAO.java | 4 + .../pcj/fluo/app/query/JoinMetadata.java | 34 +- .../pcj/fluo/app/query/QueryMetadata.java | 22 + .../fluo/app/query/SparqlFluoQueryBuilder.java | 12 + .../pcj/fluo/app/util/FluoQueryUtils.java | 12 + .../pcj/fluo/app/util/NodeIdCollector.java | 92 ++++ .../BatchInformationSerializerTest.java | 5 +- .../fluo/app/query/QueryBuilderVisitorTest.java | 4 - .../pcj/fluo/integration/BatchDeleteIT.java | 316 -------------- .../indexing/pcj/fluo/integration/BatchIT.java | 424 +++++++++++++++++++ .../pcj/fluo/integration/KafkaExportIT.java | 3 - 17 files changed, 702 insertions(+), 434 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/ad60aca8/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/CreateFluoPcj.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/CreateFluoPcj.java b/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/CreateFluoPcj.java index e450960..150a256 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/CreateFluoPcj.java +++ b/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/CreateFluoPcj.java @@ -47,9 +47,7 @@ import org.apache.rya.api.resolver.RdfToRyaConversions; import org.apache.rya.indexing.pcj.fluo.app.FluoStringConverter; import org.apache.rya.indexing.pcj.fluo.app.NodeType; import org.apache.rya.indexing.pcj.fluo.app.query.FluoQuery; -import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns; import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryMetadataDAO; -import org.apache.rya.indexing.pcj.fluo.app.query.QueryMetadata; import org.apache.rya.indexing.pcj.fluo.app.query.SparqlFluoQueryBuilder; import org.apache.rya.indexing.pcj.fluo.app.query.StatementPatternMetadata; import org.apache.rya.indexing.pcj.storage.PcjException; @@ -90,6 +88,10 @@ public class CreateFluoPcj { * The default Statement Pattern batch insert size is 1000. */ private static final int DEFAULT_SP_INSERT_BATCH_SIZE = 1000; + /** + * The default Join batch size is 5000. + */ + private static final int DEFAULT_JOIN_BATCH_SIZE = 5000; /** * The maximum number of binding sets that will be inserted into each Statement @@ -98,11 +100,15 @@ public class CreateFluoPcj { private final int spInsertBatchSize; /** + * The maximum number of join results that will be processed per transaction. + */ + private final int joinBatchSize; + /** * Constructs an instance of {@link CreateFluoPcj} that uses * {@link #DEFAULT_SP_INSERT_BATCH_SIZE} as the default batch insert size. */ public CreateFluoPcj() { - this(DEFAULT_SP_INSERT_BATCH_SIZE); + this(DEFAULT_SP_INSERT_BATCH_SIZE, DEFAULT_JOIN_BATCH_SIZE); } /** @@ -111,9 +117,11 @@ public class CreateFluoPcj { * @param spInsertBatchSize - The maximum number of binding sets that will be * inserted into each Statement Pattern's result set per Fluo transaction. */ - public CreateFluoPcj(final int spInsertBatchSize) { + public CreateFluoPcj(final int spInsertBatchSize, final int joinBatchSize) { checkArgument(spInsertBatchSize > 0, "The SP insert batch size '" + spInsertBatchSize + "' must be greater than 0."); + checkArgument(joinBatchSize > 0, "The Join batch size '" + joinBatchSize + "' must be greater than 0."); this.spInsertBatchSize = spInsertBatchSize; + this.joinBatchSize = joinBatchSize; } @@ -173,6 +181,7 @@ public class CreateFluoPcj { SparqlFluoQueryBuilder builder = new SparqlFluoQueryBuilder(); builder.setFluoQueryId(queryId); builder.setSparql(sparql); + builder.setJoinBatchSize(joinBatchSize); return builder.build(); } http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/ad60aca8/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/JoinResultUpdater.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/JoinResultUpdater.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/JoinResultUpdater.java index 0f448a6..fb3ee0c 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/JoinResultUpdater.java +++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/JoinResultUpdater.java @@ -23,6 +23,7 @@ import static org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.DE import static org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.NODEID_BS_DELIM; import java.util.ArrayList; +import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Set; @@ -33,14 +34,18 @@ import org.apache.fluo.api.client.scanner.RowScanner; import org.apache.fluo.api.data.Bytes; import org.apache.fluo.api.data.Column; import org.apache.fluo.api.data.ColumnValue; +import org.apache.fluo.api.data.RowColumn; import org.apache.fluo.api.data.Span; import org.apache.log4j.Logger; import org.apache.rya.accumulo.utils.VisibilitySimplifier; +import org.apache.rya.indexing.pcj.fluo.app.batch.AbstractBatchBindingSetUpdater; +import org.apache.rya.indexing.pcj.fluo.app.batch.BatchInformation.Task; +import org.apache.rya.indexing.pcj.fluo.app.batch.BatchInformationDAO; +import org.apache.rya.indexing.pcj.fluo.app.batch.JoinBatchInformation; import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns; import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryMetadataDAO; import org.apache.rya.indexing.pcj.fluo.app.query.JoinMetadata; import org.apache.rya.indexing.pcj.fluo.app.util.RowKeyUtil; -import org.apache.rya.indexing.pcj.storage.accumulo.BindingSetConverter.BindingSetConversionException; import org.apache.rya.indexing.pcj.storage.accumulo.VariableOrder; import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet; import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSetSerDe; @@ -51,7 +56,6 @@ import org.openrdf.query.impl.MapBindingSet; import com.google.common.base.Optional; import com.google.common.collect.Lists; -import com.google.common.collect.Sets; import edu.umd.cs.findbugs.annotations.DefaultAnnotation; import edu.umd.cs.findbugs.annotations.NonNull; @@ -122,14 +126,17 @@ public class JoinResultUpdater { } // Iterates over the sibling node's BindingSets that join with the new binding set. - final FluoTableIterator siblingBindingSets = makeSiblingScanIterator(childNodeId, childBindingSet, siblingId, tx); - + Set<VisibilityBindingSet> siblingBindingSets = new HashSet<>(); + Span siblingSpan = getSpan(tx, childNodeId, childBindingSet, siblingId); + Column siblingColumn = getScanColumnFamily(siblingId); + Optional<RowColumn> rowColumn = fillSiblingBatch(tx, siblingSpan, siblingColumn, siblingBindingSets, joinMetadata.getJoinBatchSize()); + // Iterates over the resulting BindingSets from the join. final Iterator<VisibilityBindingSet> newJoinResults; if(emittingSide == Side.LEFT) { - newJoinResults = joinAlgorithm.newLeftResult(childBindingSet, siblingBindingSets); + newJoinResults = joinAlgorithm.newLeftResult(childBindingSet, siblingBindingSets.iterator()); } else { - newJoinResults = joinAlgorithm.newRightResult(siblingBindingSets, childBindingSet); + newJoinResults = joinAlgorithm.newRightResult(siblingBindingSets.iterator(), childBindingSet); } // Insert the new join binding sets to the Fluo table. @@ -152,6 +159,22 @@ public class JoinResultUpdater { tx.set(resultRow, FluoQueryColumns.JOIN_BINDING_SET, nodeValueBytes); } } + + // if batch limit met, there are additional entries to process + // update the span and register updated batch job + if (rowColumn.isPresent()) { + Span newSpan = AbstractBatchBindingSetUpdater.getNewSpan(rowColumn.get(), siblingSpan); + JoinBatchInformation joinBatch = JoinBatchInformation.builder() + .setBatchSize(joinMetadata.getJoinBatchSize()) + .setBs(childBindingSet) + .setColumn(siblingColumn) + .setJoinType(joinMetadata.getJoinType()) + .setSide(emittingSide) + .setSpan(newSpan) + .setTask(Task.Add) + .build(); + BatchInformationDAO.addBatch(tx, joinMetadata.getNodeId(), joinBatch); + } } /** @@ -160,8 +183,55 @@ public class JoinResultUpdater { public static enum Side { LEFT, RIGHT; } + + + /** + * Fetches batch to be processed by scanning over the Span specified by the + * {@link JoinBatchInformation}. The number of results is less than or equal + * to the batch size specified by the JoinBatchInformation. + * + * @param tx - Fluo transaction in which batch operation is performed + * @param siblingSpan - span of sibling to retrieve elements to join with + * @param bsSet- set that batch results are added to + * @return Set - containing results of sibling scan. + * @throws Exception + */ + private Optional<RowColumn> fillSiblingBatch(TransactionBase tx, Span siblingSpan, Column siblingColumn, Set<VisibilityBindingSet> bsSet, int batchSize) throws Exception { + + RowScanner rs = tx.scanner().over(siblingSpan).fetch(siblingColumn).byRow().build(); + Iterator<ColumnScanner> colScannerIter = rs.iterator(); + + boolean batchLimitMet = false; + Bytes row = siblingSpan.getStart().getRow(); + while (colScannerIter.hasNext() && !batchLimitMet) { + ColumnScanner colScanner = colScannerIter.next(); + row = colScanner.getRow(); + Iterator<ColumnValue> iter = colScanner.iterator(); + while (iter.hasNext() && !batchLimitMet) { + bsSet.add(BS_SERDE.deserialize(iter.next().getValue())); + //check if batch size has been met and set flag if it has been met + if (bsSet.size() >= batchSize) { + batchLimitMet = true; + } + } + } - private FluoTableIterator makeSiblingScanIterator(final String childId, final BindingSet childBindingSet, final String siblingId, final TransactionBase tx) throws BindingSetConversionException { + if (batchLimitMet) { + return Optional.of(new RowColumn(row, siblingColumn)); + } else { + return Optional.absent(); + } + } + + /** + * Creates a Span for the sibling node to retrieve BindingSets to join with + * @param tx + * @param childId - Id of the node that was updated + * @param childBindingSet - BindingSet update + * @param siblingId - Id of the sibling node whose BindingSets will be retrieved and joined with the update + * @return Span to retrieve sibling node's BindingSets to form join results + */ + private Span getSpan(TransactionBase tx, final String childId, final BindingSet childBindingSet, final String siblingId) { // Get the common variable orders. These are used to build the prefix. final VariableOrder childVarOrder = getVarOrder(tx, childId); final VariableOrder siblingVarOrder = getVarOrder(tx, siblingId); @@ -184,15 +254,7 @@ public class JoinResultUpdater { } } siblingScanPrefix = siblingId + NODEID_BS_DELIM + siblingScanPrefix; - - // Scan the sibling node's binding sets for those that have the same - // common variable values as childBindingSet. These needs to be joined - // and inserted into the Join's results. It's possible that none of these - // results will be new Join results if they have already been created in - // earlier iterations of this algorithm. - - final RowScanner rs = tx.scanner().over(Span.prefix(siblingScanPrefix)).fetch(getScanColumnFamily(siblingId)).byRow().build(); - return new FluoTableIterator(rs); + return Span.prefix(siblingScanPrefix); } @@ -468,56 +530,4 @@ public class JoinResultUpdater { } } - /** - * Iterates over rows that have a Binding Set column and returns the unmarshalled - * {@link BindingSet}s. - */ - private static final class FluoTableIterator implements Iterator<VisibilityBindingSet> { - - private static final VisibilityBindingSetSerDe BS_SERDE = new VisibilityBindingSetSerDe(); - - private static final Set<Column> BINDING_SET_COLUMNS = Sets.newHashSet( - FluoQueryColumns.STATEMENT_PATTERN_BINDING_SET, - FluoQueryColumns.JOIN_BINDING_SET, - FluoQueryColumns.FILTER_BINDING_SET); - - private final Iterator<ColumnScanner> rows; - - /** - * Constructs an instance of {@link FluoTableIterator}. - * - * @param rows - Iterates over RowId values in a Fluo Table. (not null) - */ - public FluoTableIterator(final RowScanner rows) { - this.rows = checkNotNull(rows).iterator(); - } - - @Override - public boolean hasNext() { - return rows.hasNext(); - } - - @Override - public VisibilityBindingSet next() { - final ColumnScanner columns = rows.next(); - - for (final ColumnValue cv : columns) { - if(BINDING_SET_COLUMNS.contains(cv.getColumn())) { - final Bytes value = cv.getValue(); - try { - return BS_SERDE.deserialize(value); - } catch (final Exception e) { - throw new RuntimeException("Row did not containing a Binding Set.", e); - } - } - } - - throw new RuntimeException("Row did not containing a Binding Set."); - } - - @Override - public void remove() { - throw new UnsupportedOperationException("remove() is unsupported."); - } - } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/ad60aca8/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/AbstractBatchBindingSetUpdater.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/AbstractBatchBindingSetUpdater.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/AbstractBatchBindingSetUpdater.java index db33d3b..9584a10 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/AbstractBatchBindingSetUpdater.java +++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/AbstractBatchBindingSetUpdater.java @@ -38,7 +38,7 @@ public abstract class AbstractBatchBindingSetUpdater implements BatchBindingSetU * @param oldSpan - old Span to be updated with newStart * @return - updated Span used with an updated BatchInformation object to complete the batch task */ - public Span getNewSpan(RowColumn newStart, Span oldSpan) { + public static Span getNewSpan(RowColumn newStart, Span oldSpan) { return new Span(newStart, oldSpan.isStartInclusive(), oldSpan.getEnd(), oldSpan.isEndInclusive()); } http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/ad60aca8/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/JoinBatchInformation.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/JoinBatchInformation.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/JoinBatchInformation.java index 71ac557..d049ff0 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/JoinBatchInformation.java +++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/JoinBatchInformation.java @@ -37,8 +37,7 @@ import jline.internal.Preconditions; * VariableOrder are specified. This is so that the sibling node (the node that * wasn't updated) can be scanned to obtain results that can be joined with the * VisibilityBindingSet. The assumption here is that the Span is derived from - * the {@link Binding}s of common variables between the join children, with - * Values ordered according to the indicated {@link VariableOrder}. This class + * the {@link Binding}s of common variables between the join children. This class * represents a batch order to perform a given task on join BindingSet results. * The {@link Task} is to Add, Delete, or Update. This batch order is processed * by the {@link BatchObserver} and used with the nodeId provided to the @@ -54,7 +53,6 @@ public class JoinBatchInformation extends AbstractSpanBatchInformation { private static final BatchBindingSetUpdater updater = new JoinBatchBindingSetUpdater(); private VisibilityBindingSet bs; //update for join child indicated by side - private VariableOrder varOrder; //variable order for child indicated by Span private Side side; //join child that was updated by bs private JoinType join; /** @@ -63,20 +61,18 @@ public class JoinBatchInformation extends AbstractSpanBatchInformation { * @param column - Column of join child to be scanned * @param span - span of join child to be scanned (derived from common variables of left and right join children) * @param bs - BindingSet to be joined with results of child scan - * @param varOrder - VariableOrder used to form join (order for join child corresponding to Span) * @param side - The side of the child that the VisibilityBindingSet update occurred at * @param join - JoinType (left, right, natural inner) */ - public JoinBatchInformation(int batchSize, Task task, Column column, Span span, VisibilityBindingSet bs, VariableOrder varOrder, Side side, JoinType join) { + public JoinBatchInformation(int batchSize, Task task, Column column, Span span, VisibilityBindingSet bs, Side side, JoinType join) { super(batchSize, task, column, span); this.bs = Preconditions.checkNotNull(bs); - this.varOrder = Preconditions.checkNotNull(varOrder); this.side = Preconditions.checkNotNull(side); this.join = Preconditions.checkNotNull(join); } - public JoinBatchInformation(Task task, Column column, Span span, VisibilityBindingSet bs, VariableOrder varOrder, Side side, JoinType join) { - this(DEFAULT_BATCH_SIZE, task, column, span, bs, varOrder, side, join); + public JoinBatchInformation(Task task, Column column, Span span, VisibilityBindingSet bs, Side side, JoinType join) { + this(DEFAULT_BATCH_SIZE, task, column, span, bs, side, join); } /** @@ -95,13 +91,6 @@ public class JoinBatchInformation extends AbstractSpanBatchInformation { return join; } - /** - * Returns the VariableOrder for the join child corresponding to the Span. - * @return {@link VariableOrder} used to join {@link VisibilityBindingSet}s. - */ - public VariableOrder getVarOrder() { - return varOrder; - } /** * Sets the VisibilityBindingSet that represents an update to the join child. The join child @@ -129,7 +118,6 @@ public class JoinBatchInformation extends AbstractSpanBatchInformation { .append(" Batch Size: " + super.getBatchSize() + "\n") .append(" Task: " + super.getTask() + "\n") .append(" Column: " + super.getColumn() + "\n") - .append(" VariableOrder: " + varOrder + "\n") .append(" Join Type: " + join + "\n") .append(" Join Side: " + side + "\n") .append(" Binding Set: " + bs + "\n") @@ -149,12 +137,12 @@ public class JoinBatchInformation extends AbstractSpanBatchInformation { JoinBatchInformation batch = (JoinBatchInformation) other; return super.equals(other) && Objects.equals(this.bs, batch.bs) && Objects.equals(this.join, batch.join) - && Objects.equals(this.side, batch.side) && Objects.equals(this.varOrder, batch.varOrder); + && Objects.equals(this.side, batch.side); } @Override public int hashCode() { - return Objects.hash(super.getBatchSize(), super.getColumn(), super.getSpan(), super.getTask(), bs, join, side, varOrder); + return Objects.hash(super.getBatchSize(), super.getColumn(), super.getSpan(), super.getTask(), bs, join, side); } @@ -169,7 +157,6 @@ public class JoinBatchInformation extends AbstractSpanBatchInformation { private Column column; private Span span; private VisibilityBindingSet bs; - private VariableOrder varOrder; private JoinType join; private Side side; @@ -237,19 +224,10 @@ public class JoinBatchInformation extends AbstractSpanBatchInformation { } /** - * Sets the variable order for the join child corresponding to the Span - * @param varOrder - Variable order used to join BindingSet with result of scan - */ - public Builder setVarOrder(VariableOrder varOrder) { - this.varOrder = varOrder; - return this; - } - - /** * @return an instance of {@link JoinBatchInformation} constructed from the parameters passed to this Builder */ public JoinBatchInformation build() { - return new JoinBatchInformation(batchSize, task, column, span, bs, varOrder, side, join); + return new JoinBatchInformation(batchSize, task, column, span, bs, side, join); } } } http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/ad60aca8/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/serializer/JoinBatchInformationTypeAdapter.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/serializer/JoinBatchInformationTypeAdapter.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/serializer/JoinBatchInformationTypeAdapter.java index 9f3f1a6..f42a2ba 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/serializer/JoinBatchInformationTypeAdapter.java +++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/serializer/JoinBatchInformationTypeAdapter.java @@ -60,7 +60,6 @@ public class JoinBatchInformationTypeAdapter implements JsonSerializer<JoinBatch result.add("span", new JsonPrimitive(span.getStart().getsRow() + "\u0000" + span.getEnd().getsRow())); result.add("startInc", new JsonPrimitive(span.isStartInclusive())); result.add("endInc", new JsonPrimitive(span.isEndInclusive())); - result.add("varOrder", new JsonPrimitive(Joiner.on(";").join(batch.getVarOrder().getVariableOrders()))); result.add("side", new JsonPrimitive(batch.getSide().name())); result.add("joinType", new JsonPrimitive(batch.getJoinType().name())); String updateVarOrderString = Joiner.on(";").join(batch.getBs().getBindingNames()); @@ -82,12 +81,11 @@ public class JoinBatchInformationTypeAdapter implements JsonSerializer<JoinBatch boolean startInc = json.get("startInc").getAsBoolean(); boolean endInc = json.get("endInc").getAsBoolean(); Span span = new Span(new RowColumn(rows[0]), startInc, new RowColumn(rows[1]), endInc); - VariableOrder varOrder = new VariableOrder(json.get("varOrder").getAsString()); VariableOrder updateVarOrder = new VariableOrder(json.get("updateVarOrder").getAsString()); VisibilityBindingSet bs = converter.convert(json.get("bindingSet").getAsString(), updateVarOrder); Side side = Side.valueOf(json.get("side").getAsString()); JoinType join = JoinType.valueOf(json.get("joinType").getAsString()); - return JoinBatchInformation.builder().setBatchSize(batchSize).setTask(task).setSpan(span).setColumn(column).setBs(bs).setVarOrder(varOrder) + return JoinBatchInformation.builder().setBatchSize(batchSize).setTask(task).setSpan(span).setColumn(column).setBs(bs) .setSide(side).setJoinType(join).build(); } http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/ad60aca8/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryColumns.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryColumns.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryColumns.java index 8cd25d0..2eae4ff 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryColumns.java +++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryColumns.java @@ -108,6 +108,7 @@ import edu.umd.cs.findbugs.annotations.NonNull; * <tr> <td>Node ID</td> <td>joinMetadata:parentNodeId</td> <td>The Node ID this join emits Binding Sets to.</td> </tr> * <tr> <td>Node ID</td> <td>joinMetadata:leftChildNodeId</td> <td>A Node ID of the node that feeds this node Binding Sets.</td> </tr> * <tr> <td>Node ID</td> <td>joinMetadata:rightChildNodeId</td> <td>A Node ID of the node that feeds this node Binding Sets.</td> </tr> + * <tr> <td>Node ID</td> <td>joinMetadata:joinBatchSize</td> <td>Batch size used for processing joins</td> </tr> * <tr> <td>Node ID + DELIM + Binding Set String</td> <td>joinMetadata:bindingSet</td> <td>A {@link VisibilityBindingSet} object.</td> </tr> * </table> * </p> @@ -238,6 +239,7 @@ public class FluoQueryColumns { public static final Column JOIN_PARENT_NODE_ID = new Column(JOIN_METADATA_CF, "parentNodeId"); public static final Column JOIN_LEFT_CHILD_NODE_ID = new Column(JOIN_METADATA_CF, "leftChildNodeId"); public static final Column JOIN_RIGHT_CHILD_NODE_ID = new Column(JOIN_METADATA_CF, "rightChildNodeId"); + public static final Column JOIN_BATCH_SIZE = new Column(JOIN_METADATA_CF, "joinBatchSize"); public static final Column JOIN_BINDING_SET = new Column(JOIN_METADATA_CF, "bindingSet"); // Statement Pattern Metadata columns. @@ -340,6 +342,7 @@ public class FluoQueryColumns { JOIN_TYPE, JOIN_PARENT_NODE_ID, JOIN_LEFT_CHILD_NODE_ID, + JOIN_BATCH_SIZE, JOIN_RIGHT_CHILD_NODE_ID)), /** http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/ad60aca8/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryMetadataDAO.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryMetadataDAO.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryMetadataDAO.java index 5ba7383..1c34836 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryMetadataDAO.java +++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryMetadataDAO.java @@ -385,6 +385,7 @@ public class FluoQueryMetadataDAO { tx.set(rowId, FluoQueryColumns.JOIN_TYPE, metadata.getJoinType().toString() ); tx.set(rowId, FluoQueryColumns.JOIN_PARENT_NODE_ID, metadata.getParentNodeId() ); tx.set(rowId, FluoQueryColumns.JOIN_LEFT_CHILD_NODE_ID, metadata.getLeftChildNodeId() ); + tx.set(rowId, FluoQueryColumns.JOIN_BATCH_SIZE, Integer.toString(metadata.getJoinBatchSize())); tx.set(rowId, FluoQueryColumns.JOIN_RIGHT_CHILD_NODE_ID, metadata.getRightChildNodeId() ); } @@ -410,6 +411,7 @@ public class FluoQueryMetadataDAO { FluoQueryColumns.JOIN_TYPE, FluoQueryColumns.JOIN_PARENT_NODE_ID, FluoQueryColumns.JOIN_LEFT_CHILD_NODE_ID, + FluoQueryColumns.JOIN_BATCH_SIZE, FluoQueryColumns.JOIN_RIGHT_CHILD_NODE_ID); // Return an object holding them. @@ -421,12 +423,14 @@ public class FluoQueryMetadataDAO { final String parentNodeId = values.get(FluoQueryColumns.JOIN_PARENT_NODE_ID); final String leftChildNodeId = values.get(FluoQueryColumns.JOIN_LEFT_CHILD_NODE_ID); + final int joinBatchSize = Integer.parseInt(values.get(FluoQueryColumns.JOIN_BATCH_SIZE)); final String rightChildNodeId = values.get(FluoQueryColumns.JOIN_RIGHT_CHILD_NODE_ID); return JoinMetadata.builder(nodeId) .setVarOrder(varOrder) .setJoinType(joinType) .setParentNodeId(parentNodeId) + .setJoinBatchSize(joinBatchSize) .setLeftChildNodeId(leftChildNodeId) .setRightChildNodeId(rightChildNodeId); } http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/ad60aca8/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/JoinMetadata.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/JoinMetadata.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/JoinMetadata.java index aa79daf..d6c488b 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/JoinMetadata.java +++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/JoinMetadata.java @@ -29,6 +29,7 @@ import org.apache.commons.lang3.builder.EqualsBuilder; import org.apache.rya.indexing.pcj.storage.accumulo.VariableOrder; import com.google.common.base.Objects; +import com.google.common.base.Preconditions; /** * Metadata that is specific to Join nodes. @@ -49,6 +50,9 @@ public class JoinMetadata extends CommonNodeMetadata { private final String parentNodeId; private final String leftChildNodeId; private final String rightChildNodeId; + private int joinBatchSize; + + public static final int DEFAULT_JOIN_BATCH_SIZE = 5000; /** * Constructs an instance of {@link JoinMetadata}. @@ -59,6 +63,7 @@ public class JoinMetadata extends CommonNodeMetadata { * @param parentNodeId - The node id of this node's parent. (not null) * @param leftChildNodeId - One of the nodes whose results are being joined. (not null) * @param rightChildNodeId - The other node whose results are being joined. (not null) + * @param joinBatchSize - Batch size used to process large joins */ public JoinMetadata( final String nodeId, @@ -66,12 +71,15 @@ public class JoinMetadata extends CommonNodeMetadata { final JoinType joinType, final String parentNodeId, final String leftChildNodeId, - final String rightChildNodeId) { + final String rightChildNodeId, + final int joinBatchSize) { super(nodeId, varOrder); this.joinType = checkNotNull(joinType); this.parentNodeId = checkNotNull(parentNodeId); this.leftChildNodeId = checkNotNull(leftChildNodeId); this.rightChildNodeId = checkNotNull(rightChildNodeId); + Preconditions.checkArgument(joinBatchSize > 0); + this.joinBatchSize = joinBatchSize; } /** @@ -101,6 +109,13 @@ public class JoinMetadata extends CommonNodeMetadata { public String getRightChildNodeId() { return rightChildNodeId; } + + /** + * @return - Batch size used to process large joins + */ + public int getJoinBatchSize() { + return joinBatchSize; + } @Override public int hashCode() { @@ -110,6 +125,7 @@ public class JoinMetadata extends CommonNodeMetadata { joinType, parentNodeId, leftChildNodeId, + joinBatchSize, rightChildNodeId); } @@ -127,6 +143,7 @@ public class JoinMetadata extends CommonNodeMetadata { .append(parentNodeId, joinMetadata.parentNodeId) .append(leftChildNodeId, joinMetadata.leftChildNodeId) .append(rightChildNodeId, joinMetadata.rightChildNodeId) + .append(joinBatchSize, joinMetadata.joinBatchSize) .isEquals(); } return false; @@ -145,6 +162,7 @@ public class JoinMetadata extends CommonNodeMetadata { .append(" Parent Node ID: " + parentNodeId + "\n") .append(" Left Child Node ID: " + leftChildNodeId + "\n") .append(" Right Child Node ID: " + rightChildNodeId + "\n") + .append(" Join Batch Size: " + joinBatchSize + "\n") .append("}") .toString(); } @@ -171,6 +189,7 @@ public class JoinMetadata extends CommonNodeMetadata { private String parentNodeId; private String leftChildNodeId; private String rightChildNodeId; + private int joinBatchSize = DEFAULT_JOIN_BATCH_SIZE; /** * Constructs an instance of {@link Builder}. @@ -248,6 +267,16 @@ public class JoinMetadata extends CommonNodeMetadata { return this; } + /** + * Sets the batch size used to process large joins. + * @param joinBatchSize - batch size used to process large joins + * @return This builder so that method invocation could be chained. + */ + public Builder setJoinBatchSize(int joinBatchSize) { + this.joinBatchSize = joinBatchSize; + return this; + } + public String getLeftChildNodeId() { return leftChildNodeId; } @@ -266,7 +295,8 @@ public class JoinMetadata extends CommonNodeMetadata { joinType, parentNodeId, leftChildNodeId, - rightChildNodeId); + rightChildNodeId, + joinBatchSize); } } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/ad60aca8/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/QueryMetadata.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/QueryMetadata.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/QueryMetadata.java index fe130fb..e46b405 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/QueryMetadata.java +++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/QueryMetadata.java @@ -20,6 +20,7 @@ package org.apache.rya.indexing.pcj.fluo.app.query; import static com.google.common.base.Preconditions.checkNotNull; +import java.util.Optional; import java.util.Set; import org.apache.commons.lang3.builder.EqualsBuilder; @@ -28,6 +29,7 @@ import org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.QueryType import org.apache.rya.indexing.pcj.storage.accumulo.VariableOrder; import com.google.common.base.Objects; +import com.google.common.base.Preconditions; import edu.umd.cs.findbugs.annotations.DefaultAnnotation; import edu.umd.cs.findbugs.annotations.NonNull; @@ -48,6 +50,7 @@ public class QueryMetadata extends CommonNodeMetadata { private final Set<ExportStrategy> exportStrategy; private final QueryType queryType; private final String exportId; + /** * Constructs an instance of {@link QueryMetadata}. @@ -109,6 +112,7 @@ public class QueryMetadata extends CommonNodeMetadata { return queryType; } + @Override public int hashCode() { return Objects.hashCode( @@ -178,6 +182,8 @@ public class QueryMetadata extends CommonNodeMetadata { private String childNodeId; private Set<ExportStrategy> exportStrategies; private QueryType queryType; + private Optional<Integer> joinBatchSize = Optional.empty(); + /** * Constructs an instance of {@link Builder}. @@ -267,6 +273,22 @@ public class QueryMetadata extends CommonNodeMetadata { public String getChildNodeId() { return childNodeId; } + + /** + * Sets batch size used to process joins for this query + * @param joinBatchSize - batch size used to process joins + */ + public Builder setJoinBatchSize(Optional<Integer> joinBatchSize) { + this.joinBatchSize = joinBatchSize; + return this; + } + + /** + * @return Optional containing the batch size used to process large joins + */ + public Optional<Integer> getJoinBatchSize() { + return joinBatchSize; + } /** * @return An instance of {@link QueryMetadata} build using this builder's values. http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/ad60aca8/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/SparqlFluoQueryBuilder.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/SparqlFluoQueryBuilder.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/SparqlFluoQueryBuilder.java index 6c03be1..7bf6f45 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/SparqlFluoQueryBuilder.java +++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/SparqlFluoQueryBuilder.java @@ -102,6 +102,8 @@ public class SparqlFluoQueryBuilder { private TupleExpr te; private String queryId; private NodeIds nodeIds; + private Optional<Integer> joinBatchSize = Optional.empty(); + //Default behavior is to export to Kafka - subject to change when user can //specify their own export strategy private Set<ExportStrategy> exportStrategies = new HashSet<>(Arrays.asList(ExportStrategy.Kafka)); @@ -137,6 +139,12 @@ public class SparqlFluoQueryBuilder { return this; } + public SparqlFluoQueryBuilder setJoinBatchSize(int joinBatchSize) { + Preconditions.checkArgument(joinBatchSize > 0); + this.joinBatchSize = Optional.of(joinBatchSize); + return this; + } + public FluoQuery build() { Preconditions.checkNotNull(sparql); Preconditions.checkNotNull(queryId); @@ -167,6 +175,7 @@ public class SparqlFluoQueryBuilder { queryBuilder.setSparql(sparql); queryBuilder.setChildNodeId(childNodeId); queryBuilder.setExportStrategies(exportStrategies); + queryBuilder.setJoinBatchSize(joinBatchSize); fluoQueryBuilder.setQueryMetadata(queryBuilder); setChildMetadata(fluoQueryBuilder, childNodeId, queryBuilder.getVariableOrder(), queryId); @@ -427,6 +436,9 @@ public class SparqlFluoQueryBuilder { joinBuilder.setJoinType(joinType); joinBuilder.setLeftChildNodeId( leftChildNodeId ); joinBuilder.setRightChildNodeId( rightChildNodeId ); + if(fluoQueryBuilder.getQueryBuilder().getJoinBatchSize().isPresent()) { + joinBuilder.setJoinBatchSize(fluoQueryBuilder.getQueryBuilder().getJoinBatchSize().get()); + } // Figure out the variable order for each child node's binding set and // store it. Also store that each child node's parent is this join. http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/ad60aca8/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/util/FluoQueryUtils.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/util/FluoQueryUtils.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/util/FluoQueryUtils.java index 303f9bb..ac41160 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/util/FluoQueryUtils.java +++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/util/FluoQueryUtils.java @@ -59,4 +59,16 @@ public class FluoQueryUtils { return queryIdParts[1]; } + /** + * Uses a {@link NodeIdCollector} visitor to do a pre-order traverse of the + * FluoQuery and gather the nodeIds of the metadata nodes. + * @param query - FluoQuery to be traversed + * @return - List of nodeIds, ordered according to the pre-order traversal of the FluoQuery + */ + public static List<String> collectNodeIds(FluoQuery query) { + NodeIdCollector collector = new NodeIdCollector(query); + collector.visit(); + return collector.getNodeIds(); + } + } http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/ad60aca8/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/util/NodeIdCollector.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/util/NodeIdCollector.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/util/NodeIdCollector.java new file mode 100644 index 0000000..6d374d8 --- /dev/null +++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/util/NodeIdCollector.java @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.indexing.pcj.fluo.app.util; + +import java.util.ArrayList; +import java.util.List; + +import org.apache.rya.indexing.pcj.fluo.app.query.AggregationMetadata; +import org.apache.rya.indexing.pcj.fluo.app.query.ConstructQueryMetadata; +import org.apache.rya.indexing.pcj.fluo.app.query.FilterMetadata; +import org.apache.rya.indexing.pcj.fluo.app.query.FluoQuery; +import org.apache.rya.indexing.pcj.fluo.app.query.JoinMetadata; +import org.apache.rya.indexing.pcj.fluo.app.query.PeriodicQueryMetadata; +import org.apache.rya.indexing.pcj.fluo.app.query.ProjectionMetadata; +import org.apache.rya.indexing.pcj.fluo.app.query.QueryMetadata; +import org.apache.rya.indexing.pcj.fluo.app.query.QueryMetadataVisitorBase; +import org.apache.rya.indexing.pcj.fluo.app.query.StatementPatternMetadata; + +/** + * A visitor that does a pre-order traversal of the FluoQuery and + * collects the ids of metadata query nodes along the way. + * + */ +public class NodeIdCollector extends QueryMetadataVisitorBase { + + List<String> ids; + + public NodeIdCollector(FluoQuery fluoQuery ) { + super(fluoQuery); + ids = new ArrayList<>(); + } + + public List<String> getNodeIds() { + return ids; + } + + public void visit(QueryMetadata metadata) { + ids.add(metadata.getNodeId()); + super.visit(metadata); + } + + public void visit(ProjectionMetadata metadata) { + ids.add(metadata.getNodeId()); + super.visit(metadata); + } + + public void visit(ConstructQueryMetadata metadata) { + ids.add(metadata.getNodeId()); + super.visit(metadata); + } + + public void visit(FilterMetadata metadata) { + ids.add(metadata.getNodeId()); + super.visit(metadata); + } + + public void visit(JoinMetadata metadata) { + ids.add(metadata.getNodeId()); + super.visit(metadata); + } + + public void visit(StatementPatternMetadata metadata) { + ids.add(metadata.getNodeId()); + } + + public void visit(PeriodicQueryMetadata metadata) { + ids.add(metadata.getNodeId()); + super.visit(metadata); + } + + public void visit(AggregationMetadata metadata) { + ids.add(metadata.getNodeId()); + super.visit(metadata); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/ad60aca8/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/batch/serializer/BatchInformationSerializerTest.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/batch/serializer/BatchInformationSerializerTest.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/batch/serializer/BatchInformationSerializerTest.java index fe89325..210aa0c 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/batch/serializer/BatchInformationSerializerTest.java +++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/batch/serializer/BatchInformationSerializerTest.java @@ -20,7 +20,6 @@ package org.apache.rya.indexing.pcj.fluo.app.batch.serializer; import static org.junit.Assert.assertEquals; -import java.util.Arrays; import java.util.Optional; import org.apache.fluo.api.data.Bytes; @@ -32,7 +31,6 @@ import org.apache.rya.indexing.pcj.fluo.app.batch.JoinBatchInformation; import org.apache.rya.indexing.pcj.fluo.app.batch.SpanBatchDeleteInformation; import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns; import org.apache.rya.indexing.pcj.fluo.app.query.JoinMetadata.JoinType; -import org.apache.rya.indexing.pcj.storage.accumulo.VariableOrder; import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet; import org.junit.Test; import org.openrdf.model.impl.URIImpl; @@ -62,8 +60,7 @@ public class BatchInformationSerializerTest { JoinBatchInformation batch = JoinBatchInformation.builder().setBatchSize(1000).setTask(Task.Update) .setColumn(FluoQueryColumns.PERIODIC_QUERY_BINDING_SET).setSpan(Span.prefix(Bytes.of("prefix346"))) - .setJoinType(JoinType.LEFT_OUTER_JOIN).setSide(Side.RIGHT).setVarOrder(new VariableOrder(Arrays.asList("a", "b"))) - .setBs(vBis).build(); + .setJoinType(JoinType.LEFT_OUTER_JOIN).setSide(Side.RIGHT).setBs(vBis).build(); byte[] batchBytes = BatchInformationSerializer.toBytes(batch); Optional<BatchInformation> decodedBatch = BatchInformationSerializer.fromBytes(batchBytes); http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/ad60aca8/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/query/QueryBuilderVisitorTest.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/query/QueryBuilderVisitorTest.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/query/QueryBuilderVisitorTest.java index b432868..64504ca 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/query/QueryBuilderVisitorTest.java +++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/query/QueryBuilderVisitorTest.java @@ -79,25 +79,21 @@ public class QueryBuilderVisitorTest { } public void visit(QueryMetadata.Builder queryBuilder) { - System.out.println(queryBuilder.getNodeId()); ids.add(queryBuilder.getNodeId()); super.visit(queryBuilder); } public void visit(ProjectionMetadata.Builder projectionBuilder) { - System.out.println(projectionBuilder.getNodeId()); ids.add(projectionBuilder.getNodeId()); super.visit(projectionBuilder); } public void visit(JoinMetadata.Builder joinBuilder) { - System.out.println(joinBuilder.getNodeId()); ids.add(joinBuilder.getNodeId()); super.visit(joinBuilder); } public void visit(StatementPatternMetadata.Builder statementBuilder) { - System.out.println(statementBuilder.getNodeId()); ids.add(statementBuilder.getNodeId()); } } http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/ad60aca8/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/BatchDeleteIT.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/BatchDeleteIT.java b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/BatchDeleteIT.java deleted file mode 100644 index 1707308..0000000 --- a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/BatchDeleteIT.java +++ /dev/null @@ -1,316 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.rya.indexing.pcj.fluo.integration; - -import static org.junit.Assert.assertEquals; - -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; -import java.util.HashSet; -import java.util.Iterator; -import java.util.List; -import java.util.Set; - -import org.apache.fluo.api.client.FluoClient; -import org.apache.fluo.api.client.Snapshot; -import org.apache.fluo.api.client.Transaction; -import org.apache.fluo.api.client.scanner.ColumnScanner; -import org.apache.fluo.api.client.scanner.RowScanner; -import org.apache.fluo.api.data.Bytes; -import org.apache.fluo.api.data.Column; -import org.apache.fluo.api.data.ColumnValue; -import org.apache.fluo.api.data.Span; -import org.apache.fluo.core.client.FluoClientImpl; -import org.apache.log4j.Logger; -import org.apache.rya.api.domain.RyaStatement; -import org.apache.rya.api.domain.RyaURI; -import org.apache.rya.indexing.pcj.fluo.api.CreateFluoPcj; -import org.apache.rya.indexing.pcj.fluo.api.InsertTriples; -import org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants; -import org.apache.rya.indexing.pcj.fluo.app.JoinResultUpdater.Side; -import org.apache.rya.indexing.pcj.fluo.app.NodeType; -import org.apache.rya.indexing.pcj.fluo.app.batch.BatchInformation; -import org.apache.rya.indexing.pcj.fluo.app.batch.BatchInformation.Task; -import org.apache.rya.indexing.pcj.fluo.app.batch.BatchInformationDAO; -import org.apache.rya.indexing.pcj.fluo.app.batch.JoinBatchInformation; -import org.apache.rya.indexing.pcj.fluo.app.batch.SpanBatchDeleteInformation; -import org.apache.rya.indexing.pcj.fluo.app.query.FluoQuery; -import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns; -import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryMetadataDAO; -import org.apache.rya.indexing.pcj.fluo.app.query.JoinMetadata; -import org.apache.rya.indexing.pcj.fluo.app.query.JoinMetadata.JoinType; -import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage; -import org.apache.rya.indexing.pcj.storage.accumulo.AccumuloPcjStorage; -import org.apache.rya.indexing.pcj.storage.accumulo.VariableOrder; -import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet; -import org.apache.rya.pcj.fluo.test.base.RyaExportITBase; -import org.junit.Test; -import org.openrdf.model.impl.URIImpl; -import org.openrdf.query.algebra.evaluation.QueryBindingSet; - -import com.google.common.base.Optional; -import com.google.common.base.Preconditions; - -public class BatchDeleteIT extends RyaExportITBase { - - private static final Logger log = Logger.getLogger(BatchDeleteIT.class); - private static final FluoQueryMetadataDAO dao = new FluoQueryMetadataDAO(); - - @Test - public void simpleScanDelete() throws Exception { - - final String sparql = "SELECT ?subject ?object1 ?object2 WHERE { ?subject <urn:predicate_1> ?object1; " - + " <urn:predicate_2> ?object2 } "; - try (FluoClient fluoClient = new FluoClientImpl(getFluoConfiguration())) { - - RyaURI subj = new RyaURI("urn:subject_1"); - RyaStatement statement1 = new RyaStatement(subj, new RyaURI("urn:predicate_1"), null); - RyaStatement statement2 = new RyaStatement(subj, new RyaURI("urn:predicate_2"), null); - Set<RyaStatement> statements1 = getRyaStatements(statement1, 10); - Set<RyaStatement> statements2 = getRyaStatements(statement2, 10); - - // Create the PCJ table. - final PrecomputedJoinStorage pcjStorage = new AccumuloPcjStorage(getAccumuloConnector(), getRyaInstanceName()); - final String pcjId = pcjStorage.createPcj(sparql); - - // Tell the Fluo app to maintain the PCJ. - String queryId = new CreateFluoPcj().withRyaIntegration(pcjId, pcjStorage, fluoClient, getAccumuloConnector(), getRyaInstanceName()); - - List<String> ids = getNodeIdStrings(fluoClient, queryId); - List<String> prefixes = Arrays.asList("urn:subject_1", "urn:object", "urn:subject_1", "urn:subject_1"); - - // Stream the data into Fluo. - InsertTriples inserter = new InsertTriples(); - inserter.insert(fluoClient, statements1, Optional.<String> absent()); - inserter.insert(fluoClient, statements2, Optional.<String> absent()); - - // Verify the end results of the query match the expected results. - getMiniFluo().waitForObservers(); - - verifyCounts(fluoClient, ids, Arrays.asList(100, 100, 10, 10)); - - createSpanBatches(fluoClient, ids, prefixes, 10); - getMiniFluo().waitForObservers(); - - verifyCounts(fluoClient, ids, Arrays.asList(0, 0, 0, 0)); - } - } - - @Test - public void simpleJoinDelete() throws Exception { - final String sparql = "SELECT ?subject ?object1 ?object2 WHERE { ?subject <urn:predicate_1> ?object1; " - + " <urn:predicate_2> ?object2 } "; - try (FluoClient fluoClient = new FluoClientImpl(getFluoConfiguration())) { - - RyaURI subj = new RyaURI("urn:subject_1"); - RyaStatement statement1 = new RyaStatement(subj, new RyaURI("urn:predicate_1"), null); - RyaStatement statement2 = new RyaStatement(subj, new RyaURI("urn:predicate_2"), null); - Set<RyaStatement> statements1 = getRyaStatements(statement1, 5); - Set<RyaStatement> statements2 = getRyaStatements(statement2, 5); - - // Create the PCJ table. - final PrecomputedJoinStorage pcjStorage = new AccumuloPcjStorage(getAccumuloConnector(), getRyaInstanceName()); - final String pcjId = pcjStorage.createPcj(sparql); - - // Tell the Fluo app to maintain the PCJ. - String queryId = new CreateFluoPcj().withRyaIntegration(pcjId, pcjStorage, fluoClient, getAccumuloConnector(), getRyaInstanceName()); - - List<String> ids = getNodeIdStrings(fluoClient, queryId); - String joinId = ids.get(1); - String rightSp = ids.get(3); - QueryBindingSet bs = new QueryBindingSet(); - bs.addBinding("subject", new URIImpl("urn:subject_1")); - bs.addBinding("object1", new URIImpl("urn:object_0")); - VisibilityBindingSet vBs = new VisibilityBindingSet(bs); - Span span = Span.prefix(Bytes.of(rightSp + IncrementalUpdateConstants.NODEID_BS_DELIM + "urn:subject_1")); - VariableOrder varOrder = new VariableOrder(Arrays.asList("subject", "object2")); - - // Stream the data into Fluo. - InsertTriples inserter = new InsertTriples(); - inserter.insert(fluoClient, statements1, Optional.<String> absent()); - inserter.insert(fluoClient, statements2, Optional.<String> absent()); - - getMiniFluo().waitForObservers(); - verifyCounts(fluoClient, ids, Arrays.asList(25, 25, 5, 5)); - - JoinBatchInformation batch = JoinBatchInformation.builder().setBatchSize(1) - .setColumn(FluoQueryColumns.STATEMENT_PATTERN_BINDING_SET).setSpan(span).setTask(Task.Delete) - .setJoinType(JoinType.NATURAL_JOIN).setSide(Side.LEFT).setBs(vBs).setVarOrder(varOrder).build(); - // Verify the end results of the query match the expected results. - createSpanBatch(fluoClient, joinId, batch); - - getMiniFluo().waitForObservers(); - verifyCounts(fluoClient, ids, Arrays.asList(25, 20, 5, 5)); - } - } - - @Test - public void simpleJoinAdd() throws Exception { - final String sparql = "SELECT ?subject ?object1 ?object2 WHERE { ?subject <urn:predicate_1> ?object1; " - + " <urn:predicate_2> ?object2 } "; - try (FluoClient fluoClient = new FluoClientImpl(getFluoConfiguration())) { - - RyaURI subj = new RyaURI("urn:subject_1"); - RyaStatement statement2 = new RyaStatement(subj, new RyaURI("urn:predicate_2"), null); - Set<RyaStatement> statements2 = getRyaStatements(statement2, 5); - - // Create the PCJ table. - final PrecomputedJoinStorage pcjStorage = new AccumuloPcjStorage(getAccumuloConnector(), getRyaInstanceName()); - final String pcjId = pcjStorage.createPcj(sparql); - - // Tell the Fluo app to maintain the PCJ. - String queryId = new CreateFluoPcj().withRyaIntegration(pcjId, pcjStorage, fluoClient, getAccumuloConnector(), getRyaInstanceName()); - - List<String> ids = getNodeIdStrings(fluoClient, queryId); - String joinId = ids.get(1); - String rightSp = ids.get(3); - QueryBindingSet bs = new QueryBindingSet(); - bs.addBinding("subject", new URIImpl("urn:subject_1")); - bs.addBinding("object1", new URIImpl("urn:object_0")); - VisibilityBindingSet vBs = new VisibilityBindingSet(bs); - Span span = Span.prefix(Bytes.of(rightSp + IncrementalUpdateConstants.NODEID_BS_DELIM + "urn:subject_1")); - VariableOrder varOrder = new VariableOrder(Arrays.asList("subject", "object2")); - - // Stream the data into Fluo. - InsertTriples inserter = new InsertTriples(); - inserter.insert(fluoClient, statements2, Optional.<String> absent()); - - getMiniFluo().waitForObservers(); - verifyCounts(fluoClient, ids, Arrays.asList(0, 0, 0, 5)); - - JoinBatchInformation batch = JoinBatchInformation.builder().setBatchSize(1) - .setColumn(FluoQueryColumns.STATEMENT_PATTERN_BINDING_SET).setSpan(span).setTask(Task.Add) - .setJoinType(JoinType.NATURAL_JOIN).setSide(Side.LEFT).setBs(vBs).setVarOrder(varOrder).build(); - // Verify the end results of the query match the expected results. - createSpanBatch(fluoClient, joinId, batch); - - getMiniFluo().waitForObservers(); - verifyCounts(fluoClient, ids, Arrays.asList(5, 5, 0, 5)); - } - } - - private Set<RyaStatement> getRyaStatements(RyaStatement statement, int numTriples) { - - Set<RyaStatement> statements = new HashSet<>(); - final String subject = "urn:subject_"; - final String predicate = "urn:predicate_"; - final String object = "urn:object_"; - - for (int i = 0; i < numTriples; i++) { - RyaStatement stmnt = new RyaStatement(statement.getSubject(), statement.getPredicate(), statement.getObject()); - if (stmnt.getSubject() == null) { - stmnt.setSubject(new RyaURI(subject + i)); - } - if (stmnt.getPredicate() == null) { - stmnt.setPredicate(new RyaURI(predicate + i)); - } - if (stmnt.getObject() == null) { - stmnt.setObject(new RyaURI(object + i)); - } - statements.add(stmnt); - } - return statements; - } - - private List<String> getNodeIdStrings(FluoClient fluoClient, String queryId) { - List<String> nodeStrings = new ArrayList<>(); - try (Snapshot sx = fluoClient.newSnapshot()) { - FluoQuery query = dao.readFluoQuery(sx, queryId); - nodeStrings.add(queryId); - Collection<JoinMetadata> jMeta = query.getJoinMetadata(); - for (JoinMetadata meta : jMeta) { - nodeStrings.add(meta.getNodeId()); - nodeStrings.add(meta.getLeftChildNodeId()); - nodeStrings.add(meta.getRightChildNodeId()); - } - } - return nodeStrings; - } - - private void createSpanBatches(FluoClient fluoClient, List<String> ids, List<String> prefixes, int batchSize) { - - Preconditions.checkArgument(ids.size() == prefixes.size()); - - try (Transaction tx = fluoClient.newTransaction()) { - for (int i = 0; i < ids.size(); i++) { - String id = ids.get(i); - String bsPrefix = prefixes.get(i); - NodeType type = NodeType.fromNodeId(id).get(); - Column bsCol = type.getResultColumn(); - String row = id + IncrementalUpdateConstants.NODEID_BS_DELIM + bsPrefix; - Span span = Span.prefix(Bytes.of(row)); - BatchInformation batch = SpanBatchDeleteInformation.builder().setBatchSize(batchSize).setColumn(bsCol).setSpan(span) - .build(); - BatchInformationDAO.addBatch(tx, id, batch); - } - tx.commit(); - } - } - - private void createSpanBatch(FluoClient fluoClient, String nodeId, BatchInformation batch) { - try (Transaction tx = fluoClient.newTransaction()) { - BatchInformationDAO.addBatch(tx, nodeId, batch); - tx.commit(); - } - } - - private int countResults(FluoClient fluoClient, String nodeId, Column bsColumn) { - try (Transaction tx = fluoClient.newTransaction()) { - int count = 0; - RowScanner scanner = tx.scanner().over(Span.prefix(nodeId)).fetch(bsColumn).byRow().build(); - Iterator<ColumnScanner> colScanners = scanner.iterator(); - while (colScanners.hasNext()) { - ColumnScanner colScanner = colScanners.next(); - Iterator<ColumnValue> vals = colScanner.iterator(); - while (vals.hasNext()) { - vals.next(); - count++; - } - } - tx.commit(); - return count; - } - } - - private void verifyCounts(FluoClient fluoClient, List<String> ids, List<Integer> expectedCounts) { - Preconditions.checkArgument(ids.size() == expectedCounts.size()); - for (int i = 0; i < ids.size(); i++) { - String id = ids.get(i); - int expected = expectedCounts.get(i); - NodeType type = NodeType.fromNodeId(id).get(); - int count = countResults(fluoClient, id, type.getResultColumn()); - log.trace("NodeId: " + id + " Count: " + count + " Expected: " + expected); - switch (type) { - case STATEMENT_PATTERN: - assertEquals(expected, count); - break; - case JOIN: - assertEquals(expected, count); - break; - case QUERY: - assertEquals(expected, count); - break; - default: - break; - } - } - } - -} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/ad60aca8/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/BatchIT.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/BatchIT.java b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/BatchIT.java new file mode 100644 index 0000000..32d0e41 --- /dev/null +++ b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/BatchIT.java @@ -0,0 +1,424 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.indexing.pcj.fluo.integration; + +import static org.junit.Assert.assertEquals; + +import java.util.Arrays; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Set; + +import org.apache.fluo.api.client.FluoClient; +import org.apache.fluo.api.client.Snapshot; +import org.apache.fluo.api.client.Transaction; +import org.apache.fluo.api.client.scanner.ColumnScanner; +import org.apache.fluo.api.client.scanner.RowScanner; +import org.apache.fluo.api.data.Bytes; +import org.apache.fluo.api.data.Column; +import org.apache.fluo.api.data.ColumnValue; +import org.apache.fluo.api.data.Span; +import org.apache.fluo.core.client.FluoClientImpl; +import org.apache.log4j.Logger; +import org.apache.rya.api.domain.RyaStatement; +import org.apache.rya.api.domain.RyaURI; +import org.apache.rya.indexing.pcj.fluo.api.CreateFluoPcj; +import org.apache.rya.indexing.pcj.fluo.api.InsertTriples; +import org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants; +import org.apache.rya.indexing.pcj.fluo.app.JoinResultUpdater.Side; +import org.apache.rya.indexing.pcj.fluo.app.NodeType; +import org.apache.rya.indexing.pcj.fluo.app.batch.BatchInformation; +import org.apache.rya.indexing.pcj.fluo.app.batch.BatchInformation.Task; +import org.apache.rya.indexing.pcj.fluo.app.batch.BatchInformationDAO; +import org.apache.rya.indexing.pcj.fluo.app.batch.JoinBatchInformation; +import org.apache.rya.indexing.pcj.fluo.app.batch.SpanBatchDeleteInformation; +import org.apache.rya.indexing.pcj.fluo.app.query.FluoQuery; +import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns; +import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryMetadataDAO; +import org.apache.rya.indexing.pcj.fluo.app.query.JoinMetadata.JoinType; +import org.apache.rya.indexing.pcj.fluo.app.util.FluoQueryUtils; +import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage; +import org.apache.rya.indexing.pcj.storage.accumulo.AccumuloPcjStorage; +import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet; +import org.apache.rya.pcj.fluo.test.base.RyaExportITBase; +import org.junit.Test; +import org.openrdf.model.impl.URIImpl; +import org.openrdf.query.algebra.evaluation.QueryBindingSet; + +import com.google.common.base.Optional; +import com.google.common.base.Preconditions; + +public class BatchIT extends RyaExportITBase { + + private static final Logger log = Logger.getLogger(BatchIT.class); + private static final FluoQueryMetadataDAO dao = new FluoQueryMetadataDAO(); + + @Test + public void simpleScanDelete() throws Exception { + + final String sparql = "SELECT ?subject ?object1 ?object2 WHERE { ?subject <urn:predicate_1> ?object1; " + + " <urn:predicate_2> ?object2 } "; + try (FluoClient fluoClient = new FluoClientImpl(getFluoConfiguration())) { + + RyaURI subj = new RyaURI("urn:subject_1"); + RyaStatement statement1 = new RyaStatement(subj, new RyaURI("urn:predicate_1"), null); + RyaStatement statement2 = new RyaStatement(subj, new RyaURI("urn:predicate_2"), null); + Set<RyaStatement> statements1 = getRyaStatements(statement1, 10); + Set<RyaStatement> statements2 = getRyaStatements(statement2, 10); + + // Create the PCJ table. + final PrecomputedJoinStorage pcjStorage = new AccumuloPcjStorage(getAccumuloConnector(), getRyaInstanceName()); + final String pcjId = pcjStorage.createPcj(sparql); + + // Tell the Fluo app to maintain the PCJ. + String queryId = new CreateFluoPcj().withRyaIntegration(pcjId, pcjStorage, fluoClient, getAccumuloConnector(), + getRyaInstanceName()); + + List<String> ids = getNodeIdStrings(fluoClient, queryId); + List<String> prefixes = Arrays.asList("urn:subject_1", "urn:subject_1", "urn:object", "urn:subject_1", "urn:subject_1"); + + // Stream the data into Fluo. + InsertTriples inserter = new InsertTriples(); + inserter.insert(fluoClient, statements1, Optional.<String> absent()); + inserter.insert(fluoClient, statements2, Optional.<String> absent()); + + // Verify the end results of the query match the expected results. + getMiniFluo().waitForObservers(); + + verifyCounts(fluoClient, ids, Arrays.asList(100, 100, 100, 10, 10)); + + createSpanBatches(fluoClient, ids, prefixes, 10); + getMiniFluo().waitForObservers(); + + verifyCounts(fluoClient, ids, Arrays.asList(0, 0, 0, 0, 0)); + } + } + + @Test + public void simpleJoinDelete() throws Exception { + final String sparql = "SELECT ?subject ?object1 ?object2 WHERE { ?subject <urn:predicate_1> ?object1; " + + " <urn:predicate_2> ?object2 } "; + try (FluoClient fluoClient = new FluoClientImpl(getFluoConfiguration())) { + + RyaURI subj = new RyaURI("urn:subject_1"); + RyaStatement statement1 = new RyaStatement(subj, new RyaURI("urn:predicate_1"), null); + RyaStatement statement2 = new RyaStatement(subj, new RyaURI("urn:predicate_2"), null); + Set<RyaStatement> statements1 = getRyaStatements(statement1, 5); + Set<RyaStatement> statements2 = getRyaStatements(statement2, 5); + + // Create the PCJ table. + final PrecomputedJoinStorage pcjStorage = new AccumuloPcjStorage(getAccumuloConnector(), getRyaInstanceName()); + final String pcjId = pcjStorage.createPcj(sparql); + + // Tell the Fluo app to maintain the PCJ. + String queryId = new CreateFluoPcj().withRyaIntegration(pcjId, pcjStorage, fluoClient, getAccumuloConnector(), + getRyaInstanceName()); + + List<String> ids = getNodeIdStrings(fluoClient, queryId); + String joinId = ids.get(2); + String rightSp = ids.get(4); + QueryBindingSet bs = new QueryBindingSet(); + bs.addBinding("subject", new URIImpl("urn:subject_1")); + bs.addBinding("object1", new URIImpl("urn:object_0")); + VisibilityBindingSet vBs = new VisibilityBindingSet(bs); + Span span = Span.prefix(Bytes.of(rightSp + IncrementalUpdateConstants.NODEID_BS_DELIM + "urn:subject_1")); + + // Stream the data into Fluo. + InsertTriples inserter = new InsertTriples(); + inserter.insert(fluoClient, statements1, Optional.<String> absent()); + inserter.insert(fluoClient, statements2, Optional.<String> absent()); + + getMiniFluo().waitForObservers(); + verifyCounts(fluoClient, ids, Arrays.asList(25, 25, 25, 5, 5)); + + JoinBatchInformation batch = JoinBatchInformation.builder().setBatchSize(1) + .setColumn(FluoQueryColumns.STATEMENT_PATTERN_BINDING_SET).setSpan(span).setTask(Task.Delete) + .setJoinType(JoinType.NATURAL_JOIN).setSide(Side.LEFT).setBs(vBs).build(); + // Verify the end results of the query match the expected results. + createSpanBatch(fluoClient, joinId, batch); + + getMiniFluo().waitForObservers(); + verifyCounts(fluoClient, ids, Arrays.asList(25, 25, 20, 5, 5)); + } + } + + @Test + public void simpleJoinAdd() throws Exception { + final String sparql = "SELECT ?subject ?object1 ?object2 WHERE { ?subject <urn:predicate_1> ?object1; " + + " <urn:predicate_2> ?object2 } "; + try (FluoClient fluoClient = new FluoClientImpl(getFluoConfiguration())) { + + RyaURI subj = new RyaURI("urn:subject_1"); + RyaStatement statement2 = new RyaStatement(subj, new RyaURI("urn:predicate_2"), null); + Set<RyaStatement> statements2 = getRyaStatements(statement2, 5); + + // Create the PCJ table. + final PrecomputedJoinStorage pcjStorage = new AccumuloPcjStorage(getAccumuloConnector(), getRyaInstanceName()); + final String pcjId = pcjStorage.createPcj(sparql); + + // Tell the Fluo app to maintain the PCJ. + String queryId = new CreateFluoPcj().withRyaIntegration(pcjId, pcjStorage, fluoClient, getAccumuloConnector(), + getRyaInstanceName()); + + List<String> ids = getNodeIdStrings(fluoClient, queryId); + String joinId = ids.get(2); + String rightSp = ids.get(4); + QueryBindingSet bs = new QueryBindingSet(); + bs.addBinding("subject", new URIImpl("urn:subject_1")); + bs.addBinding("object1", new URIImpl("urn:object_0")); + VisibilityBindingSet vBs = new VisibilityBindingSet(bs); + Span span = Span.prefix(Bytes.of(rightSp + IncrementalUpdateConstants.NODEID_BS_DELIM + "urn:subject_1")); + + // Stream the data into Fluo. + InsertTriples inserter = new InsertTriples(); + inserter.insert(fluoClient, statements2, Optional.<String> absent()); + + getMiniFluo().waitForObservers(); + verifyCounts(fluoClient, ids, Arrays.asList(0, 0, 0, 0, 5)); + + JoinBatchInformation batch = JoinBatchInformation.builder().setBatchSize(1) + .setColumn(FluoQueryColumns.STATEMENT_PATTERN_BINDING_SET).setSpan(span).setTask(Task.Add) + .setJoinType(JoinType.NATURAL_JOIN).setSide(Side.LEFT).setBs(vBs).build(); + // Verify the end results of the query match the expected results. + createSpanBatch(fluoClient, joinId, batch); + + getMiniFluo().waitForObservers(); + verifyCounts(fluoClient, ids, Arrays.asList(5, 5, 5, 0, 5)); + } + } + + @Test + public void joinBatchIntegrationTest() throws Exception { + final String sparql = "SELECT ?subject ?object1 ?object2 WHERE { ?subject <urn:predicate_1> ?object1; " + + " <urn:predicate_2> ?object2 } "; + try (FluoClient fluoClient = new FluoClientImpl(getFluoConfiguration())) { + + RyaURI subj = new RyaURI("urn:subject_1"); + RyaStatement statement1 = new RyaStatement(subj, new RyaURI("urn:predicate_1"), null); + RyaStatement statement2 = new RyaStatement(subj, new RyaURI("urn:predicate_2"), null); + + Set<RyaStatement> statements1 = getRyaStatements(statement1, 15); + Set<RyaStatement> statements2 = getRyaStatements(statement2, 15); + + // Create the PCJ table. + final PrecomputedJoinStorage pcjStorage = new AccumuloPcjStorage(getAccumuloConnector(), getRyaInstanceName()); + final String pcjId = pcjStorage.createPcj(sparql); + + // Tell the Fluo app to maintain the PCJ and sets batch scan size for StatementPatterns to 5 and + // batch size of joins to 5. + String queryId = new CreateFluoPcj(5, 5).withRyaIntegration(pcjId, pcjStorage, fluoClient, getAccumuloConnector(), + getRyaInstanceName()); + + List<String> ids = getNodeIdStrings(fluoClient, queryId); + + // Stream the data into Fluo. + InsertTriples inserter = new InsertTriples(); + inserter.insert(fluoClient, statements1, Optional.<String> absent()); + inserter.insert(fluoClient, statements2, Optional.<String> absent()); + + getMiniFluo().waitForObservers(); + verifyCounts(fluoClient, ids, Arrays.asList(225, 225, 225, 15, 15)); + } + } + + + @Test + public void leftJoinBatchIntegrationTest() throws Exception { + final String sparql = "SELECT ?subject ?object1 ?object2 WHERE { ?subject <urn:predicate_1> ?object1; " + + "OPTIONAL{ ?subject <urn:predicate_2> ?object2} } "; + try (FluoClient fluoClient = new FluoClientImpl(getFluoConfiguration())) { + + RyaURI subj = new RyaURI("urn:subject_1"); + RyaStatement statement1 = new RyaStatement(subj, new RyaURI("urn:predicate_1"), null); + RyaStatement statement2 = new RyaStatement(subj, new RyaURI("urn:predicate_2"), null); + + subj = new RyaURI("urn:subject_2"); + RyaStatement statement3 = new RyaStatement(subj, new RyaURI("urn:predicate_1"), null); + + Set<RyaStatement> statements1 = getRyaStatements(statement1, 10); + Set<RyaStatement> statements2 = getRyaStatements(statement2, 10); + Set<RyaStatement> statements3 = getRyaStatements(statement3, 10); + + // Create the PCJ table. + final PrecomputedJoinStorage pcjStorage = new AccumuloPcjStorage(getAccumuloConnector(), getRyaInstanceName()); + final String pcjId = pcjStorage.createPcj(sparql); + + // Tell the Fluo app to maintain the PCJ and sets batch scan size for StatementPatterns to 5 and + // batch size of joins to 5. + String queryId = new CreateFluoPcj(5, 5).withRyaIntegration(pcjId, pcjStorage, fluoClient, getAccumuloConnector(), + getRyaInstanceName()); + + List<String> ids = getNodeIdStrings(fluoClient, queryId); + + // Stream the data into Fluo. + InsertTriples inserter = new InsertTriples(); + inserter.insert(fluoClient, statements1, Optional.<String> absent()); + inserter.insert(fluoClient, statements2, Optional.<String> absent()); + inserter.insert(fluoClient, statements3, Optional.<String> absent()); + + getMiniFluo().waitForObservers(); + verifyCounts(fluoClient, ids, Arrays.asList(110, 110, 110, 20, 10)); + } + } + + + @Test + public void multiJoinBatchIntegrationTest() throws Exception { + final String sparql = "SELECT ?subject1 ?subject2 ?object1 ?object2 WHERE { ?subject1 <urn:predicate_1> ?object1; " + + " <urn:predicate_2> ?object2 ." + + " ?subject2 <urn:predicate_3> ?object2 } "; + try (FluoClient fluoClient = new FluoClientImpl(getFluoConfiguration())) { + + RyaURI subj1 = new RyaURI("urn:subject_1"); + RyaStatement statement1 = new RyaStatement(subj1, new RyaURI("urn:predicate_1"), null); + RyaStatement statement2 = new RyaStatement(subj1, new RyaURI("urn:predicate_2"), null); + + Set<RyaStatement> statements1 = getRyaStatements(statement1, 10); + Set<RyaStatement> statements2 = getRyaStatements(statement2, 10); + + RyaURI subj2 = new RyaURI("urn:subject_2"); + RyaStatement statement3 = new RyaStatement(subj2, new RyaURI("urn:predicate_3"), null); + Set<RyaStatement> statements3 = getRyaStatements(statement3, 10); + + // Create the PCJ table. + final PrecomputedJoinStorage pcjStorage = new AccumuloPcjStorage(getAccumuloConnector(), getRyaInstanceName()); + final String pcjId = pcjStorage.createPcj(sparql); + + // Tell the Fluo app to maintain the PCJ and sets batch scan size for StatementPatterns to 5 and + // batch size of joins to 5. + String queryId = new CreateFluoPcj(5, 5).withRyaIntegration(pcjId, pcjStorage, fluoClient, getAccumuloConnector(), + getRyaInstanceName()); + + List<String> ids = getNodeIdStrings(fluoClient, queryId); + + // Stream the data into Fluo. + InsertTriples inserter = new InsertTriples(); + inserter.insert(fluoClient, statements1, Optional.<String> absent()); + inserter.insert(fluoClient, statements2, Optional.<String> absent()); + inserter.insert(fluoClient, statements3, Optional.<String> absent()); + + getMiniFluo().waitForObservers(); + verifyCounts(fluoClient, ids, Arrays.asList(100, 100, 100, 100, 10, 10, 10)); + } + } + + + private Set<RyaStatement> getRyaStatements(RyaStatement statement, int numTriples) { + + Set<RyaStatement> statements = new HashSet<>(); + final String subject = "urn:subject_"; + final String predicate = "urn:predicate_"; + final String object = "urn:object_"; + + for (int i = 0; i < numTriples; i++) { + RyaStatement stmnt = new RyaStatement(statement.getSubject(), statement.getPredicate(), statement.getObject()); + if (stmnt.getSubject() == null) { + stmnt.setSubject(new RyaURI(subject + i)); + } + if (stmnt.getPredicate() == null) { + stmnt.setPredicate(new RyaURI(predicate + i)); + } + if (stmnt.getObject() == null) { + stmnt.setObject(new RyaURI(object + i)); + } + statements.add(stmnt); + } + return statements; + } + + private List<String> getNodeIdStrings(FluoClient fluoClient, String queryId) { + List<String> nodeStrings; + try (Snapshot sx = fluoClient.newSnapshot()) { + FluoQuery query = dao.readFluoQuery(sx, queryId); + nodeStrings = FluoQueryUtils.collectNodeIds(query); + } + return nodeStrings; + } + + private void createSpanBatches(FluoClient fluoClient, List<String> ids, List<String> prefixes, int batchSize) { + + Preconditions.checkArgument(ids.size() == prefixes.size()); + + try (Transaction tx = fluoClient.newTransaction()) { + for (int i = 0; i < ids.size(); i++) { + String id = ids.get(i); + String bsPrefix = prefixes.get(i); + NodeType type = NodeType.fromNodeId(id).get(); + Column bsCol = type.getResultColumn(); + String row = id + IncrementalUpdateConstants.NODEID_BS_DELIM + bsPrefix; + Span span = Span.prefix(Bytes.of(row)); + BatchInformation batch = SpanBatchDeleteInformation.builder().setBatchSize(batchSize).setColumn(bsCol).setSpan(span) + .build(); + BatchInformationDAO.addBatch(tx, id, batch); + } + tx.commit(); + } + } + + private void createSpanBatch(FluoClient fluoClient, String nodeId, BatchInformation batch) { + try (Transaction tx = fluoClient.newTransaction()) { + BatchInformationDAO.addBatch(tx, nodeId, batch); + tx.commit(); + } + } + + private int countResults(FluoClient fluoClient, String nodeId, Column bsColumn) { + try (Transaction tx = fluoClient.newTransaction()) { + int count = 0; + RowScanner scanner = tx.scanner().over(Span.prefix(nodeId)).fetch(bsColumn).byRow().build(); + Iterator<ColumnScanner> colScanners = scanner.iterator(); + while (colScanners.hasNext()) { + ColumnScanner colScanner = colScanners.next(); + Iterator<ColumnValue> vals = colScanner.iterator(); + while (vals.hasNext()) { + vals.next(); + count++; + } + } + tx.commit(); + return count; + } + } + + private void verifyCounts(FluoClient fluoClient, List<String> ids, List<Integer> expectedCounts) { + Preconditions.checkArgument(ids.size() == expectedCounts.size()); + for (int i = 0; i < ids.size(); i++) { + String id = ids.get(i); + int expected = expectedCounts.get(i); + NodeType type = NodeType.fromNodeId(id).get(); + int count = countResults(fluoClient, id, type.getResultColumn()); + log.trace("NodeId: " + id + " Count: " + count + " Expected: " + expected); + switch (type) { + case STATEMENT_PATTERN: + assertEquals(expected, count); + break; + case JOIN: + assertEquals(expected, count); + break; + case QUERY: + assertEquals(expected, count); + break; + default: + break; + } + } + } + +}