http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/2ca85427/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/BatchRowKeyUtil.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/BatchRowKeyUtil.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/BatchRowKeyUtil.java new file mode 100644 index 0000000..581aa5b --- /dev/null +++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/BatchRowKeyUtil.java @@ -0,0 +1,68 @@ +package org.apache.rya.indexing.pcj.fluo.app.batch; +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +import java.util.UUID; + +import org.apache.fluo.api.data.Bytes; +import org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants; + +import com.google.common.base.Preconditions; + +/** + * Class for creating the {@link Byte}s written to the Fluo Row used to identify each {@link BatchInformation} + * object. Each Byte row is formed by concatenating a query id and a batch id. + * + */ +public class BatchRowKeyUtil { + + /** + * Creates a Byte row form the query id. The batch id is automatically generated/ + * @param nodeId + * @return Byte row used to identify the BatchInformation + */ + public static Bytes getRow(String nodeId) { + String row = new StringBuilder().append(nodeId).append(IncrementalUpdateConstants.NODEID_BS_DELIM) + .append(UUID.randomUUID().toString().replace("-", "")).toString(); + return Bytes.of(row); + } + + /** + * Creates a Byte row from a nodeId and batchId + * @param nodeId - query node id that batch task will be performed on + * @param batchId - id used to identify batch + * @return Byte row used to identify the BatchInformation + */ + public static Bytes getRow(String nodeId, String batchId) { + String row = new StringBuilder().append(nodeId).append(IncrementalUpdateConstants.NODEID_BS_DELIM) + .append(batchId).toString(); + return Bytes.of(row); + } + + /** + * Given a Byte row, return the query node Id + * @param row - the Byte row used to identify the BatchInformation + * @return - the queryId that the batch task is performed on + */ + public static String getNodeId(Bytes row) { + String[] stringArray = row.toString().split(IncrementalUpdateConstants.NODEID_BS_DELIM);; + Preconditions.checkArgument(stringArray.length == 2); + return stringArray[0]; + } + +}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/2ca85427/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/JoinBatchBindingSetUpdater.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/JoinBatchBindingSetUpdater.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/JoinBatchBindingSetUpdater.java new file mode 100644 index 0000000..a266341 --- /dev/null +++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/JoinBatchBindingSetUpdater.java @@ -0,0 +1,184 @@ +package org.apache.rya.indexing.pcj.fluo.app.batch; +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +import java.util.HashSet; +import java.util.Iterator; +import java.util.Optional; +import java.util.Set; + +import org.apache.fluo.api.client.TransactionBase; +import org.apache.fluo.api.client.scanner.ColumnScanner; +import org.apache.fluo.api.client.scanner.RowScanner; +import org.apache.fluo.api.data.Bytes; +import org.apache.fluo.api.data.Column; +import org.apache.fluo.api.data.ColumnValue; +import org.apache.fluo.api.data.RowColumn; +import org.apache.fluo.api.data.Span; +import org.apache.log4j.Logger; +import org.apache.rya.indexing.pcj.fluo.app.JoinResultUpdater.IterativeJoin; +import org.apache.rya.indexing.pcj.fluo.app.JoinResultUpdater.LeftOuterJoin; +import org.apache.rya.indexing.pcj.fluo.app.JoinResultUpdater.NaturalJoin; +import org.apache.rya.indexing.pcj.fluo.app.JoinResultUpdater.Side; +import org.apache.rya.indexing.pcj.fluo.app.batch.BatchInformation.Task; +import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns; +import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryMetadataDAO; +import org.apache.rya.indexing.pcj.fluo.app.query.JoinMetadata; +import org.apache.rya.indexing.pcj.fluo.app.util.RowKeyUtil; +import org.apache.rya.indexing.pcj.storage.accumulo.VariableOrder; +import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet; +import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSetSerDe; + +import com.google.common.base.Preconditions; + +/** + * Performs updates to BindingSets in the JoinBindingSet column in batch fashion. + */ +public class JoinBatchBindingSetUpdater extends AbstractBatchBindingSetUpdater { + + private static final Logger log = Logger.getLogger(JoinBatchBindingSetUpdater.class); + private static final VisibilityBindingSetSerDe BS_SERDE = new VisibilityBindingSetSerDe(); + private static final FluoQueryMetadataDAO dao = new FluoQueryMetadataDAO(); + + /** + * Processes {@link JoinBatchInformation}. Updates the BindingSets + * associated with the specified nodeId. The BindingSets are processed in + * batch fashion, where the number of results is indicated by + * {@link JoinBatchInformation#getBatchSize()}. BindingSets are either + * Added, Deleted, or Updated according to + * {@link JoinBatchInformation#getTask()}. In the event that the number of + * entries that need to be updated exceeds the batch size, the row of the + * first unprocessed BindingSets is used to create a new JoinBatch job to + * process the remaining BindingSets. + * @throws Exception + */ + @Override + public void processBatch(TransactionBase tx, Bytes row, BatchInformation batch) throws Exception { + super.processBatch(tx, row, batch); + String nodeId = BatchRowKeyUtil.getNodeId(row); + Preconditions.checkArgument(batch instanceof JoinBatchInformation); + JoinBatchInformation joinBatch = (JoinBatchInformation) batch; + Task task = joinBatch.getTask(); + + // Figure out which join algorithm we are going to use. + final IterativeJoin joinAlgorithm; + switch (joinBatch.getJoinType()) { + case NATURAL_JOIN: + joinAlgorithm = new NaturalJoin(); + break; + case LEFT_OUTER_JOIN: + joinAlgorithm = new LeftOuterJoin(); + break; + default: + throw new RuntimeException("Unsupported JoinType: " + joinBatch.getJoinType()); + } + + Set<VisibilityBindingSet> bsSet = new HashSet<>(); + Optional<RowColumn> rowCol = fillSiblingBatch(tx, joinBatch, bsSet); + + // Iterates over the resulting BindingSets from the join. + final Iterator<VisibilityBindingSet> newJoinResults; + VisibilityBindingSet bs = joinBatch.getBs(); + if (joinBatch.getSide() == Side.LEFT) { + newJoinResults = joinAlgorithm.newLeftResult(bs, bsSet.iterator()); + } else { + newJoinResults = joinAlgorithm.newRightResult(bsSet.iterator(), bs); + } + + // Insert the new join binding sets to the Fluo table. + final JoinMetadata joinMetadata = dao.readJoinMetadata(tx, nodeId); + final VariableOrder joinVarOrder = joinMetadata.getVariableOrder(); + while (newJoinResults.hasNext()) { + final VisibilityBindingSet newJoinResult = newJoinResults.next(); + //create BindingSet value + Bytes bsBytes = BS_SERDE.serialize(newJoinResult); + //make rowId + Bytes rowKey = RowKeyUtil.makeRowKey(nodeId, joinVarOrder, newJoinResult); + final Column col = FluoQueryColumns.JOIN_BINDING_SET; + processTask(tx, task, rowKey, col, bsBytes); + } + + // if batch limit met, there are additional entries to process + // update the span and register updated batch job + if (rowCol.isPresent()) { + Span newSpan = getNewSpan(rowCol.get(), joinBatch.getSpan()); + joinBatch.setSpan(newSpan); + BatchInformationDAO.addBatch(tx, nodeId, joinBatch); + } + + } + + private void processTask(TransactionBase tx, Task task, Bytes row, Column column, Bytes value) { + switch (task) { + case Add: + tx.set(row, column, value); + break; + case Delete: + tx.delete(row, column); + break; + case Update: + log.trace("The Task Update is not supported for JoinBatchBindingSetUpdater. Batch will not be processed."); + break; + default: + log.trace("Invalid Task type. Aborting batch operation."); + break; + } + } + + /** + * Fetches batch to be processed by scanning over the Span specified by the + * {@link JoinBatchInformation}. The number of results is less than or equal + * to the batch size specified by the JoinBatchInformation. + * + * @param tx - Fluo transaction in which batch operation is performed + * @param batch - batch order to be processed + * @param bsSet- set that batch results are added to + * @return Set - containing results of sibling scan. + * @throws Exception + */ + private Optional<RowColumn> fillSiblingBatch(TransactionBase tx, JoinBatchInformation batch, Set<VisibilityBindingSet> bsSet) throws Exception { + + Span span = batch.getSpan(); + Column column = batch.getColumn(); + int batchSize = batch.getBatchSize(); + + RowScanner rs = tx.scanner().over(span).fetch(column).byRow().build(); + Iterator<ColumnScanner> colScannerIter = rs.iterator(); + + boolean batchLimitMet = false; + Bytes row = span.getStart().getRow(); + while (colScannerIter.hasNext() && !batchLimitMet) { + ColumnScanner colScanner = colScannerIter.next(); + row = colScanner.getRow(); + Iterator<ColumnValue> iter = colScanner.iterator(); + while (iter.hasNext()) { + if (bsSet.size() >= batchSize) { + batchLimitMet = true; + break; + } + bsSet.add(BS_SERDE.deserialize(iter.next().getValue())); + } + } + + if (batchLimitMet) { + return Optional.of(new RowColumn(row, column)); + } else { + return Optional.empty(); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/2ca85427/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/JoinBatchInformation.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/JoinBatchInformation.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/JoinBatchInformation.java new file mode 100644 index 0000000..71ac557 --- /dev/null +++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/JoinBatchInformation.java @@ -0,0 +1,255 @@ +package org.apache.rya.indexing.pcj.fluo.app.batch; +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +import java.util.Objects; + +import org.apache.fluo.api.data.Column; +import org.apache.fluo.api.data.Span; +import org.apache.rya.indexing.pcj.fluo.app.JoinResultUpdater.Side; +import org.apache.rya.indexing.pcj.fluo.app.query.JoinMetadata.JoinType; +import org.apache.rya.indexing.pcj.storage.accumulo.VariableOrder; +import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet; +import org.openrdf.query.Binding; + +import jline.internal.Preconditions; + +/** + * This class updates join results based on parameters specified for the join's + * children. The join has two children, and for one child a VisibilityBindingSet + * is specified along with the Side of that child. This BindingSet represents an + * update to that join child. For the other child, a Span, Column and + * VariableOrder are specified. This is so that the sibling node (the node that + * wasn't updated) can be scanned to obtain results that can be joined with the + * VisibilityBindingSet. The assumption here is that the Span is derived from + * the {@link Binding}s of common variables between the join children, with + * Values ordered according to the indicated {@link VariableOrder}. This class + * represents a batch order to perform a given task on join BindingSet results. + * The {@link Task} is to Add, Delete, or Update. This batch order is processed + * by the {@link BatchObserver} and used with the nodeId provided to the + * Observer to process the Task specified by the batch order. If the Task is to + * add, the BatchBindingSetUpdater returned by + * {@link JoinBatchInformation#getBatchUpdater()} will scan the join's child for + * results using the indicated Span and Column. These results are joined with + * the indicated VisibilityBindingSet, and the results are added to the parent + * join. The other Tasks are performed analogously. + * + */ +public class JoinBatchInformation extends AbstractSpanBatchInformation { + + private static final BatchBindingSetUpdater updater = new JoinBatchBindingSetUpdater(); + private VisibilityBindingSet bs; //update for join child indicated by side + private VariableOrder varOrder; //variable order for child indicated by Span + private Side side; //join child that was updated by bs + private JoinType join; + /** + * @param batchSize - batch size that Tasks are performed in + * @param task - Add, Delete, or Update + * @param column - Column of join child to be scanned + * @param span - span of join child to be scanned (derived from common variables of left and right join children) + * @param bs - BindingSet to be joined with results of child scan + * @param varOrder - VariableOrder used to form join (order for join child corresponding to Span) + * @param side - The side of the child that the VisibilityBindingSet update occurred at + * @param join - JoinType (left, right, natural inner) + */ + public JoinBatchInformation(int batchSize, Task task, Column column, Span span, VisibilityBindingSet bs, VariableOrder varOrder, Side side, JoinType join) { + super(batchSize, task, column, span); + this.bs = Preconditions.checkNotNull(bs); + this.varOrder = Preconditions.checkNotNull(varOrder); + this.side = Preconditions.checkNotNull(side); + this.join = Preconditions.checkNotNull(join); + } + + public JoinBatchInformation(Task task, Column column, Span span, VisibilityBindingSet bs, VariableOrder varOrder, Side side, JoinType join) { + this(DEFAULT_BATCH_SIZE, task, column, span, bs, varOrder, side, join); + } + + /** + * Indicates the join child that the BindingSet result {@link JoinBatchInformation#getBs()} updated. + * This BindingSet is join with the results obtained by scanning over the value of {@link JoinBatchInformation#getSpan()}. + * @return {@link Side} indicating which side new result occurred on in join + */ + public Side getSide() { + return side; + } + + /** + * @return {@link JoinType} indicating type of join (left join, right join, natural inner join,...) + */ + public JoinType getJoinType() { + return join; + } + + /** + * Returns the VariableOrder for the join child corresponding to the Span. + * @return {@link VariableOrder} used to join {@link VisibilityBindingSet}s. + */ + public VariableOrder getVarOrder() { + return varOrder; + } + + /** + * Sets the VisibilityBindingSet that represents an update to the join child. The join child + * updated is indicated by the value of {@link JoinBatchInformation#getSide()}. + * @return VisibilityBindingSet that will be joined with results returned by scan over given + * {@link Span}. + */ + public VisibilityBindingSet getBs() { + return bs; + } + + /** + * @return BatchBindingSetUpdater used to apply {@link Task} to results formed by joining the given + * VisibilityBindingSet with the results returned by scanned over the Span. + */ + @Override + public BatchBindingSetUpdater getBatchUpdater() { + return updater; + } + + @Override + public String toString() { + return new StringBuilder() + .append("Span Batch Information {\n") + .append(" Batch Size: " + super.getBatchSize() + "\n") + .append(" Task: " + super.getTask() + "\n") + .append(" Column: " + super.getColumn() + "\n") + .append(" VariableOrder: " + varOrder + "\n") + .append(" Join Type: " + join + "\n") + .append(" Join Side: " + side + "\n") + .append(" Binding Set: " + bs + "\n") + .append("}") + .toString(); + } + + @Override + public boolean equals(Object other) { + if (this == other) { + return true; + } + + if (!(other instanceof JoinBatchInformation)) { + return false; + } + + JoinBatchInformation batch = (JoinBatchInformation) other; + return super.equals(other) && Objects.equals(this.bs, batch.bs) && Objects.equals(this.join, batch.join) + && Objects.equals(this.side, batch.side) && Objects.equals(this.varOrder, batch.varOrder); + } + + @Override + public int hashCode() { + return Objects.hash(super.getBatchSize(), super.getColumn(), super.getSpan(), super.getTask(), bs, join, side, varOrder); + } + + + public static Builder builder() { + return new Builder(); + } + + public static class Builder { + + private int batchSize = DEFAULT_BATCH_SIZE; + private Task task; + private Column column; + private Span span; + private VisibilityBindingSet bs; + private VariableOrder varOrder; + private JoinType join; + private Side side; + + /** + * @param batchSize - batch size that {@link Task}s are performed in + */ + public Builder setBatchSize(int batchSize) { + this.batchSize = batchSize; + return this; + } + + /** + * @param task - Task performed (Add, Delete, Update) + */ + public Builder setTask(Task task) { + this.task = task; + return this; + } + + /** + * @param column - Column of join child to be scanned + */ + public Builder setColumn(Column column) { + this.column = column; + return this; + } + + /** + * Span to scan results for one child of the join. The Span corresponds to the side of + * the join that is not indicated by Side. So if Side is Left, then the + * Span will scan the right child of the join. It is assumed that the span is derived from + * the common variables of the left and right join children. + * @param span - Span over join child to be scanned + */ + public Builder setSpan(Span span) { + this.span = span; + return this; + } + + /** + * Sets the BindingSet that corresponds to an update to the join child indicated + * by Side. + * @param bs - BindingSet update of join child to be joined with results of scan + */ + public Builder setBs(VisibilityBindingSet bs) { + this.bs = bs; + return this; + } + + /** + * @param join - JoinType (left, right, natural inner) + */ + public Builder setJoinType(JoinType join) { + this.join = join; + return this; + } + + /** + * Indicates the join child corresponding to the VisibilityBindingSet update + * @param side - side of join the child BindingSet update appeared at + */ + public Builder setSide(Side side) { + this.side = side; + return this; + } + + /** + * Sets the variable order for the join child corresponding to the Span + * @param varOrder - Variable order used to join BindingSet with result of scan + */ + public Builder setVarOrder(VariableOrder varOrder) { + this.varOrder = varOrder; + return this; + } + + /** + * @return an instance of {@link JoinBatchInformation} constructed from the parameters passed to this Builder + */ + public JoinBatchInformation build() { + return new JoinBatchInformation(batchSize, task, column, span, bs, varOrder, side, join); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/2ca85427/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/SpanBatchBindingSetUpdater.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/SpanBatchBindingSetUpdater.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/SpanBatchBindingSetUpdater.java new file mode 100644 index 0000000..749a77d --- /dev/null +++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/SpanBatchBindingSetUpdater.java @@ -0,0 +1,128 @@ +package org.apache.rya.indexing.pcj.fluo.app.batch; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +import java.util.Iterator; +import java.util.Optional; + +import org.apache.fluo.api.client.TransactionBase; +import org.apache.fluo.api.client.scanner.ColumnScanner; +import org.apache.fluo.api.client.scanner.RowScanner; +import org.apache.fluo.api.data.Bytes; +import org.apache.fluo.api.data.Column; +import org.apache.fluo.api.data.ColumnValue; +import org.apache.fluo.api.data.RowColumn; +import org.apache.fluo.api.data.Span; +import org.apache.log4j.Logger; +import org.apache.rya.indexing.pcj.fluo.app.batch.BatchInformation.Task; + +import com.google.common.base.Preconditions; + +/** + * This class processes {@link SpanBatchDeleteInformation} objects by + * deleting the entries in the Fluo Column corresponding to the {@link Span} + * of the BatchInformation object. This class will delete entries until the + * batch size is met, and then create a new SpanBatchDeleteInformation object + * with an updated Span whose starting point is the stopping point of this + * batch. If the batch limit is not met, then a new batch is not created and + * the task is complete. + * + */ +public class SpanBatchBindingSetUpdater extends AbstractBatchBindingSetUpdater { + + private static final Logger log = Logger.getLogger(SpanBatchBindingSetUpdater.class); + + /** + * Process SpanBatchDeleteInformation objects by deleting all entries indicated + * by Span until batch limit is met. + * @param tx - Fluo Transaction + * @param row - Byte row identifying BatchInformation + * @param batch - SpanBatchDeleteInformation object to be processed + */ + @Override + public void processBatch(TransactionBase tx, Bytes row, BatchInformation batch) throws Exception { + super.processBatch(tx, row, batch); + Preconditions.checkArgument(batch instanceof SpanBatchDeleteInformation); + SpanBatchDeleteInformation spanBatch = (SpanBatchDeleteInformation) batch; + Task task = spanBatch.getTask(); + int batchSize = spanBatch.getBatchSize(); + Span span = spanBatch.getSpan(); + Column column = batch.getColumn(); + Optional<RowColumn> rowCol = Optional.empty(); + + switch (task) { + case Add: + log.trace("The Task Add is not supported for SpanBatchBindingSetUpdater. Batch " + batch + " will not be processed."); + break; + case Delete: + rowCol = deleteBatch(tx, span, column, batchSize); + break; + case Update: + log.trace("The Task Update is not supported for SpanBatchBindingSetUpdater. Batch " + batch + " will not be processed."); + break; + default: + log.trace("Invalid Task type. Aborting batch operation."); + break; + } + + if (rowCol.isPresent()) { + Span newSpan = getNewSpan(rowCol.get(), spanBatch.getSpan()); + log.trace("Batch size met. There are remaining results that need to be deleted. Creating a new batch of size: " + + spanBatch.getBatchSize() + " with Span: " + newSpan + " and Column: " + column); + spanBatch.setSpan(newSpan); + BatchInformationDAO.addBatch(tx, BatchRowKeyUtil.getNodeId(row), spanBatch); + } + } + + private Optional<RowColumn> deleteBatch(TransactionBase tx, Span span, Column column, int batchSize) { + + log.trace("Deleting batch of size: " + batchSize + " using Span: " + span + " and Column: " + column); + RowScanner rs = tx.scanner().over(span).fetch(column).byRow().build(); + try { + Iterator<ColumnScanner> colScannerIter = rs.iterator(); + + int count = 0; + boolean batchLimitMet = false; + Bytes row = span.getStart().getRow(); + while (colScannerIter.hasNext() && !batchLimitMet) { + ColumnScanner colScanner = colScannerIter.next(); + row = colScanner.getRow(); + Iterator<ColumnValue> iter = colScanner.iterator(); + while (iter.hasNext()) { + if (count >= batchSize) { + batchLimitMet = true; + break; + } + ColumnValue colVal = iter.next(); + tx.delete(row, colVal.getColumn()); + count++; + } + } + + if (batchLimitMet) { + return Optional.of(new RowColumn(row)); + } else { + return Optional.empty(); + } + } catch (Exception e) { + return Optional.empty(); + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/2ca85427/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/SpanBatchDeleteInformation.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/SpanBatchDeleteInformation.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/SpanBatchDeleteInformation.java new file mode 100644 index 0000000..3b1e245 --- /dev/null +++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/SpanBatchDeleteInformation.java @@ -0,0 +1,95 @@ +package org.apache.rya.indexing.pcj.fluo.app.batch; +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +import org.apache.fluo.api.data.Column; +import org.apache.fluo.api.data.Span; + +/** + * This class represents a batch order to delete all entries in the Fluo table indicated + * by the given Span and Column. These batch orders are processed by the {@link BatchObserver}, + * which uses this batch information along with the nodeId passed into the Observer to perform + * batch deletes. + * + */ +public class SpanBatchDeleteInformation extends AbstractSpanBatchInformation { + + private static final BatchBindingSetUpdater updater = new SpanBatchBindingSetUpdater(); + + public SpanBatchDeleteInformation(int batchSize, Column column, Span span) { + super(batchSize, Task.Delete, column, span); + } + + /** + * @return Updater that applies the {@link Task} to the given {@link Span} and {@link Column} + */ + @Override + public BatchBindingSetUpdater getBatchUpdater() { + return updater; + } + + + public static Builder builder() { + return new Builder(); + } + + public static class Builder { + + private int batchSize = DEFAULT_BATCH_SIZE; + private Column column; + private Span span; + + /** + * @param batchSize - {@link Task}s are applied in batches of this size + */ + public Builder setBatchSize(int batchSize) { + this.batchSize = batchSize; + return this; + } + + /** + * Sets column to apply batch {@link Task} to + * @param column - column batch Task will be applied to + * @return + */ + public Builder setColumn(Column column) { + this.column = column; + return this; + } + + /** + * @param span - span that batch {@link Task} will be applied to + * + */ + public Builder setSpan(Span span) { + this.span = span; + return this; + } + + + /** + * @return an instance of {@link SpanBatchDeleteInformation} constructed from parameters passed to this Builder + */ + public SpanBatchDeleteInformation build() { + return new SpanBatchDeleteInformation(batchSize, column, span); + } + + } + + +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/2ca85427/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/serializer/BatchInformationSerializer.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/serializer/BatchInformationSerializer.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/serializer/BatchInformationSerializer.java new file mode 100644 index 0000000..e6f69d0 --- /dev/null +++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/serializer/BatchInformationSerializer.java @@ -0,0 +1,58 @@ +package org.apache.rya.indexing.pcj.fluo.app.batch.serializer; +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +import java.util.Optional; + +import org.apache.log4j.Logger; +import org.apache.rya.indexing.pcj.fluo.app.batch.BatchInformation; + +import com.google.gson.Gson; +import com.google.gson.GsonBuilder; + +/** + * Serializer/Deserializer for {@link BatchInformation} objects that uses the Gson + * Type Adapter {@link BatchInformationTypeAdapter} to do all of the serializing and deserializing. + * + * + */ +public class BatchInformationSerializer { + + private static Logger log = Logger.getLogger(BatchInformationSerializer.class); + private static Gson gson = new GsonBuilder().registerTypeHierarchyAdapter(BatchInformation.class, new BatchInformationTypeAdapter()) + .create(); + + public static byte[] toBytes(BatchInformation arg0) { + try { + return gson.toJson(arg0).getBytes("UTF-8"); + } catch (Exception e) { + log.info("Unable to serialize BatchInformation: " + arg0); + throw new RuntimeException(e); + } + } + + public static Optional<BatchInformation> fromBytes(byte[] arg0) { + try { + String json = new String(arg0, "UTF-8"); + return Optional.of(gson.fromJson(json, BatchInformation.class)); + } catch (Exception e) { + log.info("Invalid String encoding. BatchInformation cannot be deserialized."); + return Optional.empty(); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/2ca85427/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/serializer/BatchInformationTypeAdapter.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/serializer/BatchInformationTypeAdapter.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/serializer/BatchInformationTypeAdapter.java new file mode 100644 index 0000000..d7c15df --- /dev/null +++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/serializer/BatchInformationTypeAdapter.java @@ -0,0 +1,73 @@ +package org.apache.rya.indexing.pcj.fluo.app.batch.serializer; +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +import java.lang.reflect.Type; + +import org.apache.log4j.Logger; +import org.apache.rya.indexing.pcj.fluo.app.batch.BatchInformation; +import org.apache.rya.indexing.pcj.fluo.app.batch.JoinBatchInformation; +import org.apache.rya.indexing.pcj.fluo.app.batch.SpanBatchDeleteInformation; + +import com.google.gson.JsonDeserializationContext; +import com.google.gson.JsonDeserializer; +import com.google.gson.JsonElement; +import com.google.gson.JsonObject; +import com.google.gson.JsonParseException; +import com.google.gson.JsonSerializationContext; +import com.google.gson.JsonSerializer; + +/** + * JsonSerializer/JsonDeserializer for serializing/deserializing + * {@link BatchInformation} objects. This makes use of the + * {@link BatchInformationTypeAdapterFactory} to retrieve the appropriate + * JsonSerializer/JsonDeserializer given the class name of the particular + * implementation of BatchInformation. + * + */ +public class BatchInformationTypeAdapter implements JsonSerializer<BatchInformation>, JsonDeserializer<BatchInformation> { + + private static final Logger log = Logger.getLogger(BatchInformationTypeAdapter.class); + private static final BatchInformationTypeAdapterFactory factory = new BatchInformationTypeAdapterFactory(); + + @Override + public BatchInformation deserialize(JsonElement arg0, Type arg1, JsonDeserializationContext arg2) throws JsonParseException { + try { + JsonObject json = arg0.getAsJsonObject(); + String type = json.get("class").getAsString(); + JsonDeserializer<? extends BatchInformation> deserializer = factory.getDeserializerFromName(type); + return deserializer.deserialize(arg0, arg1, arg2); + } catch (Exception e) { + log.trace("Unable to deserialize JsonElement: " + arg0); + log.trace("Returning an empty Batch"); + throw new JsonParseException(e); + } + } + + @Override + public JsonElement serialize(BatchInformation batch, Type arg1, JsonSerializationContext arg2) { + JsonSerializer<? extends BatchInformation> serializer = factory.getSerializerFromName(batch.getClass().getName()); + + if(batch instanceof SpanBatchDeleteInformation) { + return ((SpanBatchInformationTypeAdapter) serializer).serialize((SpanBatchDeleteInformation) batch, arg1, arg2); + } else { + return ((JoinBatchInformationTypeAdapter) serializer).serialize((JoinBatchInformation) batch, arg1, arg2); + } + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/2ca85427/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/serializer/BatchInformationTypeAdapterFactory.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/serializer/BatchInformationTypeAdapterFactory.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/serializer/BatchInformationTypeAdapterFactory.java new file mode 100644 index 0000000..0221bc2 --- /dev/null +++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/serializer/BatchInformationTypeAdapterFactory.java @@ -0,0 +1,65 @@ +package org.apache.rya.indexing.pcj.fluo.app.batch.serializer; +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +import java.util.Map; + +import org.apache.rya.indexing.pcj.fluo.app.batch.BatchInformation; +import org.apache.rya.indexing.pcj.fluo.app.batch.JoinBatchInformation; +import org.apache.rya.indexing.pcj.fluo.app.batch.SpanBatchDeleteInformation; + +import com.google.common.collect.ImmutableMap; +import com.google.gson.JsonDeserializer; +import com.google.gson.JsonSerializer; + +/** + * Factory the uses class names to return the appropriate {@link JsonSerializer} and {@link JsonDeserializer} for serializing + * and deserializing {@link BatchInformation} objects. + * + */ +public class BatchInformationTypeAdapterFactory { + + /** + * Retrieve the appropriate {@link JsonSerializer} using the class name of the {@link BatchInformation} implementation + * @param name - class name of the BatchInformation object + * @return JsonSerializer for serializing BatchInformation objects + */ + public JsonSerializer<? extends BatchInformation> getSerializerFromName(String name) { + return serializers.get(name); + } + + /** + * Retrieve the appropriate {@link JsonDeserializer} using the class name of the {@link BatchInformation} implementation + * @param name - class name of the BatchInformation object + * @return JsonDeserializer for deserializing BatchInformation objects + */ + public JsonDeserializer<? extends BatchInformation> getDeserializerFromName(String name) { + return deserializers.get(name); + } + + static final Map<String, JsonSerializer<? extends BatchInformation>> serializers = ImmutableMap.of( + SpanBatchDeleteInformation.class.getName(), new SpanBatchInformationTypeAdapter(), + JoinBatchInformation.class.getName(), new JoinBatchInformationTypeAdapter() + ); + + static final Map<String, JsonDeserializer<? extends BatchInformation>> deserializers = ImmutableMap.of( + SpanBatchDeleteInformation.class.getName(), new SpanBatchInformationTypeAdapter(), + JoinBatchInformation.class.getName(), new JoinBatchInformationTypeAdapter() + ); + +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/2ca85427/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/serializer/JoinBatchInformationTypeAdapter.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/serializer/JoinBatchInformationTypeAdapter.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/serializer/JoinBatchInformationTypeAdapter.java new file mode 100644 index 0000000..9f3f1a6 --- /dev/null +++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/serializer/JoinBatchInformationTypeAdapter.java @@ -0,0 +1,94 @@ +package org.apache.rya.indexing.pcj.fluo.app.batch.serializer; +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +import java.lang.reflect.Type; + +import org.apache.fluo.api.data.Column; +import org.apache.fluo.api.data.RowColumn; +import org.apache.fluo.api.data.Span; +import org.apache.rya.indexing.pcj.fluo.app.JoinResultUpdater.Side; +import org.apache.rya.indexing.pcj.fluo.app.batch.BatchInformation.Task; +import org.apache.rya.indexing.pcj.fluo.app.batch.JoinBatchInformation; +import org.apache.rya.indexing.pcj.fluo.app.query.JoinMetadata.JoinType; +import org.apache.rya.indexing.pcj.storage.accumulo.VariableOrder; +import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet; +import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSetStringConverter; + +import com.google.common.base.Joiner; +import com.google.gson.JsonDeserializationContext; +import com.google.gson.JsonDeserializer; +import com.google.gson.JsonElement; +import com.google.gson.JsonObject; +import com.google.gson.JsonParseException; +import com.google.gson.JsonPrimitive; +import com.google.gson.JsonSerializationContext; +import com.google.gson.JsonSerializer; + +/** + * JsonSerializer/JsonDeserializer to serialize/deserialize {@link JoinBatchInformation} objects. + * + */ +public class JoinBatchInformationTypeAdapter implements JsonSerializer<JoinBatchInformation>, JsonDeserializer<JoinBatchInformation> { + + private static final VisibilityBindingSetStringConverter converter = new VisibilityBindingSetStringConverter(); + + @Override + public JsonElement serialize(JoinBatchInformation batch, Type typeOfSrc, JsonSerializationContext context) { + JsonObject result = new JsonObject(); + result.add("class", new JsonPrimitive(batch.getClass().getName())); + result.add("batchSize", new JsonPrimitive(batch.getBatchSize())); + result.add("task", new JsonPrimitive(batch.getTask().name())); + Column column = batch.getColumn(); + result.add("column", new JsonPrimitive(column.getsFamily() + "\u0000" + column.getsQualifier())); + Span span = batch.getSpan(); + result.add("span", new JsonPrimitive(span.getStart().getsRow() + "\u0000" + span.getEnd().getsRow())); + result.add("startInc", new JsonPrimitive(span.isStartInclusive())); + result.add("endInc", new JsonPrimitive(span.isEndInclusive())); + result.add("varOrder", new JsonPrimitive(Joiner.on(";").join(batch.getVarOrder().getVariableOrders()))); + result.add("side", new JsonPrimitive(batch.getSide().name())); + result.add("joinType", new JsonPrimitive(batch.getJoinType().name())); + String updateVarOrderString = Joiner.on(";").join(batch.getBs().getBindingNames()); + VariableOrder updateVarOrder = new VariableOrder(updateVarOrderString); + result.add("bindingSet", new JsonPrimitive(converter.convert(batch.getBs(), updateVarOrder))); + result.add("updateVarOrder", new JsonPrimitive(updateVarOrderString)); + return result; + } + + @Override + public JoinBatchInformation deserialize(JsonElement element, Type typeOfT, JsonDeserializationContext context) + throws JsonParseException { + JsonObject json = element.getAsJsonObject(); + int batchSize = json.get("batchSize").getAsInt(); + Task task = Task.valueOf(json.get("task").getAsString()); + String[] colArray = json.get("column").getAsString().split("\u0000"); + Column column = new Column(colArray[0], colArray[1]); + String[] rows = json.get("span").getAsString().split("\u0000"); + boolean startInc = json.get("startInc").getAsBoolean(); + boolean endInc = json.get("endInc").getAsBoolean(); + Span span = new Span(new RowColumn(rows[0]), startInc, new RowColumn(rows[1]), endInc); + VariableOrder varOrder = new VariableOrder(json.get("varOrder").getAsString()); + VariableOrder updateVarOrder = new VariableOrder(json.get("updateVarOrder").getAsString()); + VisibilityBindingSet bs = converter.convert(json.get("bindingSet").getAsString(), updateVarOrder); + Side side = Side.valueOf(json.get("side").getAsString()); + JoinType join = JoinType.valueOf(json.get("joinType").getAsString()); + return JoinBatchInformation.builder().setBatchSize(batchSize).setTask(task).setSpan(span).setColumn(column).setBs(bs).setVarOrder(varOrder) + .setSide(side).setJoinType(join).build(); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/2ca85427/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/serializer/SpanBatchInformationTypeAdapter.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/serializer/SpanBatchInformationTypeAdapter.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/serializer/SpanBatchInformationTypeAdapter.java new file mode 100644 index 0000000..98deb8e --- /dev/null +++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/serializer/SpanBatchInformationTypeAdapter.java @@ -0,0 +1,69 @@ +package org.apache.rya.indexing.pcj.fluo.app.batch.serializer; +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +import java.lang.reflect.Type; + +import org.apache.fluo.api.data.Column; +import org.apache.fluo.api.data.RowColumn; +import org.apache.fluo.api.data.Span; +import org.apache.rya.indexing.pcj.fluo.app.batch.SpanBatchDeleteInformation; + +import com.google.gson.JsonDeserializationContext; +import com.google.gson.JsonDeserializer; +import com.google.gson.JsonElement; +import com.google.gson.JsonObject; +import com.google.gson.JsonParseException; +import com.google.gson.JsonPrimitive; +import com.google.gson.JsonSerializationContext; +import com.google.gson.JsonSerializer; + +/** + * JsonSerializer/JsonDeserializer used to serialize/deserialize {@link SpanBatchDeleteInformation} objects. + * + */ +public class SpanBatchInformationTypeAdapter implements JsonSerializer<SpanBatchDeleteInformation>, JsonDeserializer<SpanBatchDeleteInformation> { + + @Override + public SpanBatchDeleteInformation deserialize(JsonElement element, Type typeOfT, JsonDeserializationContext context) throws JsonParseException { + JsonObject json = element.getAsJsonObject(); + int batchSize = json.get("batchSize").getAsInt(); + String[] colArray = json.get("column").getAsString().split("\u0000"); + Column column = new Column(colArray[0], colArray[1]); + String[] rows = json.get("span").getAsString().split("\u0000"); + boolean startInc = json.get("startInc").getAsBoolean(); + boolean endInc = json.get("endInc").getAsBoolean(); + Span span = new Span(new RowColumn(rows[0]), startInc, new RowColumn(rows[1]), endInc); + return SpanBatchDeleteInformation.builder().setBatchSize(batchSize).setSpan(span).setColumn(column).build(); + } + + @Override + public JsonElement serialize(SpanBatchDeleteInformation batch, Type typeOfSrc, JsonSerializationContext context) { + JsonObject result = new JsonObject(); + result.add("class", new JsonPrimitive(batch.getClass().getName())); + result.add("batchSize", new JsonPrimitive(batch.getBatchSize())); + Column column = batch.getColumn(); + result.add("column", new JsonPrimitive(column.getsFamily() + "\u0000" + column.getsQualifier())); + Span span = batch.getSpan(); + result.add("span", new JsonPrimitive(span.getStart().getsRow() + "\u0000" + span.getEnd().getsRow())); + result.add("startInc", new JsonPrimitive(span.isStartInclusive())); + result.add("endInc", new JsonPrimitive(span.isEndInclusive())); + return result; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/2ca85427/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/kafka/KafkaBindingSetExporter.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/kafka/KafkaBindingSetExporter.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/kafka/KafkaBindingSetExporter.java index 152d156..7c4b3cc 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/kafka/KafkaBindingSetExporter.java +++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/kafka/KafkaBindingSetExporter.java @@ -36,16 +36,16 @@ import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet; * Incrementally exports SPARQL query results to Kafka topics. */ public class KafkaBindingSetExporter implements IncrementalBindingSetExporter { + private static final Logger log = Logger.getLogger(KafkaBindingSetExporter.class); - private final KafkaProducer<String, VisibilityBindingSet> producer; + /** * Constructs an instance given a Kafka producer. * - * @param producer - * for sending result set alerts to a broker. (not null) - * Can be created and configured by {@link KafkaBindingSetExporterFactory} + * @param producer for sending result set alerts to a broker. (not null) Can be created and configured by + * {@link KafkaBindingSetExporterFactory} */ public KafkaBindingSetExporter(KafkaProducer<String, VisibilityBindingSet> producer) { super(); http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/2ca85427/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/RyaBindingSetExporter.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/RyaBindingSetExporter.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/RyaBindingSetExporter.java index 84d3ce6..54c39b7 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/RyaBindingSetExporter.java +++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/RyaBindingSetExporter.java @@ -24,8 +24,11 @@ import static java.util.Objects.requireNonNull; import java.util.Collections; import org.apache.fluo.api.client.TransactionBase; +import org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants; import org.apache.rya.indexing.pcj.fluo.app.export.IncrementalBindingSetExporter; import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns; +import org.apache.rya.indexing.pcj.storage.PeriodicQueryResultStorage; +import org.apache.rya.indexing.pcj.storage.PeriodicQueryStorageException; import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage; import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage.PCJStorageException; import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet; @@ -36,14 +39,16 @@ import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet; public class RyaBindingSetExporter implements IncrementalBindingSetExporter { private final PrecomputedJoinStorage pcjStorage; + private final PeriodicQueryResultStorage periodicStorage; /** * Constructs an instance of {@link RyaBindingSetExporter}. * * @param pcjStorage - The PCJ storage the new results will be exported to. (not null) */ - public RyaBindingSetExporter(final PrecomputedJoinStorage pcjStorage) { + public RyaBindingSetExporter(final PrecomputedJoinStorage pcjStorage, PeriodicQueryResultStorage periodicStorage) { this.pcjStorage = checkNotNull(pcjStorage); + this.periodicStorage = checkNotNull(periodicStorage); } @Override @@ -59,8 +64,12 @@ public class RyaBindingSetExporter implements IncrementalBindingSetExporter { final String pcjId = fluoTx.gets(queryId, FluoQueryColumns.RYA_PCJ_ID); try { - pcjStorage.addResults(pcjId, Collections.singleton(result)); - } catch (final PCJStorageException e) { + if (result.hasBinding(IncrementalUpdateConstants.PERIODIC_BIN_ID)) { + periodicStorage.addPeriodicQueryResults(pcjId, Collections.singleton(result)); + } else { + pcjStorage.addResults(pcjId, Collections.singleton(result)); + } + } catch (final PCJStorageException | PeriodicQueryStorageException e) { throw new ResultExportException("A result could not be exported to Rya.", e); } } http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/2ca85427/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/RyaBindingSetExporterFactory.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/RyaBindingSetExporterFactory.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/RyaBindingSetExporterFactory.java index 86d593f..82ce9c6 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/RyaBindingSetExporterFactory.java +++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/RyaBindingSetExporterFactory.java @@ -28,8 +28,10 @@ import org.apache.accumulo.core.client.ZooKeeperInstance; import org.apache.accumulo.core.client.security.tokens.PasswordToken; import org.apache.rya.indexing.pcj.fluo.app.export.IncrementalBindingSetExporter; import org.apache.rya.indexing.pcj.fluo.app.export.IncrementalBindingSetExporterFactory; +import org.apache.rya.indexing.pcj.storage.PeriodicQueryResultStorage; import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage; import org.apache.rya.indexing.pcj.storage.accumulo.AccumuloPcjStorage; +import org.apache.rya.indexing.pcj.storage.accumulo.AccumuloPeriodicQueryResultStorage; import com.google.common.base.Optional; @@ -62,9 +64,10 @@ public class RyaBindingSetExporterFactory implements IncrementalBindingSetExport // Setup Rya PCJ Storage. final String ryaInstanceName = params.getRyaInstanceName().get(); final PrecomputedJoinStorage pcjStorage = new AccumuloPcjStorage(accumuloConn, ryaInstanceName); - + final PeriodicQueryResultStorage periodicStorage = new AccumuloPeriodicQueryResultStorage(accumuloConn, ryaInstanceName); + // Make the exporter. - final IncrementalBindingSetExporter exporter = new RyaBindingSetExporter(pcjStorage); + final IncrementalBindingSetExporter exporter = new RyaBindingSetExporter(pcjStorage, periodicStorage); return Optional.of(exporter); } catch (final AccumuloException | AccumuloSecurityException e) { http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/2ca85427/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/BindingSetUpdater.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/BindingSetUpdater.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/BindingSetUpdater.java index ac131e3..3a731c2 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/BindingSetUpdater.java +++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/BindingSetUpdater.java @@ -30,12 +30,14 @@ import org.apache.rya.indexing.pcj.fluo.app.ConstructQueryResultUpdater; import org.apache.rya.indexing.pcj.fluo.app.FilterResultUpdater; import org.apache.rya.indexing.pcj.fluo.app.JoinResultUpdater; import org.apache.rya.indexing.pcj.fluo.app.NodeType; +import org.apache.rya.indexing.pcj.fluo.app.PeriodicQueryUpdater; import org.apache.rya.indexing.pcj.fluo.app.QueryResultUpdater; import org.apache.rya.indexing.pcj.fluo.app.query.AggregationMetadata; import org.apache.rya.indexing.pcj.fluo.app.query.ConstructQueryMetadata; import org.apache.rya.indexing.pcj.fluo.app.query.FilterMetadata; import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryMetadataDAO; import org.apache.rya.indexing.pcj.fluo.app.query.JoinMetadata; +import org.apache.rya.indexing.pcj.fluo.app.query.PeriodicQueryMetadata; import org.apache.rya.indexing.pcj.fluo.app.query.QueryMetadata; import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet; @@ -50,7 +52,6 @@ import edu.umd.cs.findbugs.annotations.NonNull; @DefaultAnnotation(NonNull.class) public abstract class BindingSetUpdater extends AbstractObserver { private static final Logger log = Logger.getLogger(BindingSetUpdater.class); - // DAO private final FluoQueryMetadataDAO queryDao = new FluoQueryMetadataDAO(); @@ -60,6 +61,7 @@ public abstract class BindingSetUpdater extends AbstractObserver { private final QueryResultUpdater queryUpdater = new QueryResultUpdater(); private final AggregationResultUpdater aggregationUpdater = new AggregationResultUpdater(); private final ConstructQueryResultUpdater constructUpdater = new ConstructQueryResultUpdater(); + private final PeriodicQueryUpdater periodicQueryUpdater = new PeriodicQueryUpdater(); @Override public abstract ObservedColumn getObservedColumn(); @@ -131,6 +133,15 @@ public abstract class BindingSetUpdater extends AbstractObserver { throw new RuntimeException("Could not process a Join node.", e); } break; + + case PERIODIC_QUERY: + final PeriodicQueryMetadata parentPeriodicQuery = queryDao.readPeriodicQueryMetadata(tx, parentNodeId); + try{ + periodicQueryUpdater.updatePeriodicBinResults(tx, observedBindingSet, parentPeriodicQuery); + } catch(Exception e) { + throw new RuntimeException("Could not process PeriodicBin node.", e); + } + break; case AGGREGATION: final AggregationMetadata parentAggregation = queryDao.readAggregationMetadata(tx, parentNodeId); @@ -141,8 +152,9 @@ public abstract class BindingSetUpdater extends AbstractObserver { } break; + default: - throw new IllegalArgumentException("The parent node's NodeType must be of type Filter, Join, or Query, but was " + parentNodeType); + throw new IllegalArgumentException("The parent node's NodeType must be of type Filter, Join, PeriodicBin or Query, but was " + parentNodeType); } } http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/2ca85427/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/FilterObserver.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/FilterObserver.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/FilterObserver.java index f5c7177..ee03334 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/FilterObserver.java +++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/FilterObserver.java @@ -23,11 +23,11 @@ import static java.util.Objects.requireNonNull; import org.apache.fluo.api.client.TransactionBase; import org.apache.fluo.api.data.Bytes; import org.apache.rya.indexing.pcj.fluo.app.BindingSetRow; -import org.apache.rya.indexing.pcj.fluo.app.VisibilityBindingSetSerDe; import org.apache.rya.indexing.pcj.fluo.app.query.FilterMetadata; import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns; import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryMetadataDAO; import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet; +import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSetSerDe; import org.openrdf.query.BindingSet; /** http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/2ca85427/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/JoinObserver.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/JoinObserver.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/JoinObserver.java index 141ccc7..28e31d8 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/JoinObserver.java +++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/JoinObserver.java @@ -23,11 +23,11 @@ import static java.util.Objects.requireNonNull; import org.apache.fluo.api.client.TransactionBase; import org.apache.fluo.api.data.Bytes; import org.apache.rya.indexing.pcj.fluo.app.BindingSetRow; -import org.apache.rya.indexing.pcj.fluo.app.VisibilityBindingSetSerDe; import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns; import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryMetadataDAO; import org.apache.rya.indexing.pcj.fluo.app.query.JoinMetadata; import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet; +import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSetSerDe; import org.openrdf.query.BindingSet; /** http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/2ca85427/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/PeriodicQueryObserver.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/PeriodicQueryObserver.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/PeriodicQueryObserver.java new file mode 100644 index 0000000..e7072e7 --- /dev/null +++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/PeriodicQueryObserver.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.indexing.pcj.fluo.app.observers; + +import static java.util.Objects.requireNonNull; + +import org.apache.fluo.api.client.TransactionBase; +import org.apache.fluo.api.data.Bytes; +import org.apache.rya.indexing.pcj.fluo.app.BindingSetRow; +import org.apache.rya.indexing.pcj.fluo.app.PeriodicQueryUpdater; +import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns; +import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryMetadataDAO; +import org.apache.rya.indexing.pcj.fluo.app.query.PeriodicQueryMetadata; +import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet; +import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSetSerDe; + +/** + * This Observer is responsible for assigning Periodic Bin Ids to BindingSets. + * This class delegates to the {@link BindingSetUpdater} process method, which + * uses the {@link PeriodicQueryUpdater} to extract the time stamp from the BindingSet. + * The PeriodicQueryUpdater creates one instance of the given BindingSet for each bin + * that the time stamp is assigned to by the updater, and these BindingSets are written + * to the parent node of the given PeriodicQueryMetadata node. + * + */ +public class PeriodicQueryObserver extends BindingSetUpdater { + + private static final VisibilityBindingSetSerDe BS_SERDE = new VisibilityBindingSetSerDe(); + private final FluoQueryMetadataDAO queryDao = new FluoQueryMetadataDAO(); + + @Override + public ObservedColumn getObservedColumn() { + return new ObservedColumn(FluoQueryColumns.PERIODIC_QUERY_BINDING_SET, NotificationType.STRONG); + } + + @Override + public Observation parseObservation(final TransactionBase tx, final Bytes row) throws Exception { + requireNonNull(tx); + requireNonNull(row); + + // Read the Join metadata. + final String periodicBinNodeId = BindingSetRow.make(row).getNodeId(); + final PeriodicQueryMetadata periodicBinMetadata = queryDao.readPeriodicQueryMetadata(tx, periodicBinNodeId); + + // Read the Visibility Binding Set from the Value. + final Bytes valueBytes = tx.get(row, FluoQueryColumns.PERIODIC_QUERY_BINDING_SET); + final VisibilityBindingSet periodicBinBindingSet = BS_SERDE.deserialize(valueBytes); + + // Figure out which node needs to handle the new metadata. + final String parentNodeId = periodicBinMetadata.getParentNodeId(); + + return new Observation(periodicBinNodeId, periodicBinBindingSet, parentNodeId); + } + + +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/2ca85427/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/QueryResultObserver.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/QueryResultObserver.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/QueryResultObserver.java index b675ba7..fbdca08 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/QueryResultObserver.java +++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/QueryResultObserver.java @@ -29,7 +29,6 @@ import org.apache.fluo.api.data.Column; import org.apache.fluo.api.observer.AbstractObserver; import org.apache.log4j.Logger; import org.apache.rya.accumulo.utils.VisibilitySimplifier; -import org.apache.rya.indexing.pcj.fluo.app.VisibilityBindingSetSerDe; import org.apache.rya.indexing.pcj.fluo.app.export.IncrementalBindingSetExporter; import org.apache.rya.indexing.pcj.fluo.app.export.IncrementalBindingSetExporter.ResultExportException; import org.apache.rya.indexing.pcj.fluo.app.export.IncrementalBindingSetExporterFactory; @@ -38,6 +37,7 @@ import org.apache.rya.indexing.pcj.fluo.app.export.kafka.KafkaBindingSetExporter import org.apache.rya.indexing.pcj.fluo.app.export.rya.RyaBindingSetExporterFactory; import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns; import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet; +import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSetSerDe; import com.google.common.base.Optional; import com.google.common.collect.ImmutableSet; http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/2ca85427/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/StatementPatternObserver.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/StatementPatternObserver.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/StatementPatternObserver.java index b0548b4..69a651e 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/StatementPatternObserver.java +++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/StatementPatternObserver.java @@ -23,11 +23,11 @@ import static java.util.Objects.requireNonNull; import org.apache.fluo.api.client.TransactionBase; import org.apache.fluo.api.data.Bytes; import org.apache.rya.indexing.pcj.fluo.app.BindingSetRow; -import org.apache.rya.indexing.pcj.fluo.app.VisibilityBindingSetSerDe; import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns; import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryMetadataDAO; import org.apache.rya.indexing.pcj.fluo.app.query.StatementPatternMetadata; import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet; +import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSetSerDe; import org.openrdf.query.BindingSet; /** http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/2ca85427/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/TripleObserver.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/TripleObserver.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/TripleObserver.java index 3c43885..6fc8e91 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/TripleObserver.java +++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/TripleObserver.java @@ -34,12 +34,12 @@ import org.apache.fluo.api.observer.AbstractObserver; import org.apache.log4j.Logger; import org.apache.rya.api.domain.RyaStatement; import org.apache.rya.indexing.pcj.fluo.app.IncUpdateDAO; -import org.apache.rya.indexing.pcj.fluo.app.VisibilityBindingSetSerDe; import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns; import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryMetadataDAO; import org.apache.rya.indexing.pcj.fluo.app.query.StatementPatternMetadata; import org.apache.rya.indexing.pcj.storage.accumulo.VariableOrder; import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet; +import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSetSerDe; import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSetStringConverter; import com.google.common.base.Charsets; http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/2ca85427/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/AggregationMetadata.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/AggregationMetadata.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/AggregationMetadata.java index 3bc8da6..ff42a0f 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/AggregationMetadata.java +++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/AggregationMetadata.java @@ -321,6 +321,13 @@ public class AggregationMetadata extends CommonNodeMetadata { this.varOrder = varOrder; return this; } + + /** + * @return the variable order of binding sets that are emitted by this node. + */ + public VariableOrder getVariableOrder() { + return varOrder; + } /** * @param parentNodeId - The Node ID of this node's parent. @@ -330,6 +337,10 @@ public class AggregationMetadata extends CommonNodeMetadata { this.parentNodeId = parentNodeId; return this; } + + public String getParentNodeId() { + return parentNodeId; + } /** * @param childNodeId - The Node ID of this node's child. @@ -360,6 +371,13 @@ public class AggregationMetadata extends CommonNodeMetadata { this.groupByVariables = groupByVariables; return this; } + + /** + * @return variable order that defines how data is grouped for the aggregation function + */ + public VariableOrder getGroupByVariableOrder() { + return groupByVariables; + } /** * @return An instance of {@link AggregationMetadata} build using this builder's values. http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/2ca85427/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FilterMetadata.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FilterMetadata.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FilterMetadata.java index 8866bd4..7e2e995 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FilterMetadata.java +++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FilterMetadata.java @@ -18,19 +18,19 @@ */ package org.apache.rya.indexing.pcj.fluo.app.query; -import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkNotNull; -import edu.umd.cs.findbugs.annotations.Nullable; -import edu.umd.cs.findbugs.annotations.DefaultAnnotation; -import edu.umd.cs.findbugs.annotations.NonNull; -import net.jcip.annotations.Immutable; - import org.apache.commons.lang3.builder.EqualsBuilder; +import org.apache.rya.indexing.pcj.fluo.app.util.FilterSerializer; import org.apache.rya.indexing.pcj.storage.accumulo.VariableOrder; import com.google.common.base.Objects; +import edu.umd.cs.findbugs.annotations.DefaultAnnotation; +import edu.umd.cs.findbugs.annotations.NonNull; +import edu.umd.cs.findbugs.annotations.Nullable; +import net.jcip.annotations.Immutable; + /** * Metadata that is specific to Filter nodes. */ @@ -38,8 +38,7 @@ import com.google.common.base.Objects; @DefaultAnnotation(NonNull.class) public class FilterMetadata extends CommonNodeMetadata { - private final String originalSparql; - private final int filterIndexWithinSparql; + private final String filterSparql; private final String parentNodeId; private final String childNodeId; @@ -48,7 +47,7 @@ public class FilterMetadata extends CommonNodeMetadata { * * @param nodeId - The ID the Fluo app uses to reference this node. (not null) * @param varOrder - The variable order of binding sets that are emitted by this node. (not null) - * @param originalSparql - The original SPARQL query the filter is derived from. (not null) + * @param filterSparql - SPARQL query representing the filter as generated by {@link FilterSerializer#serialize}. (not null) * @param filterIndexWithinSparql - The index of the filter within the original SPARQL query * that this node processes. (not null) * @param parentNodeId - The node id of this node's parent. (not null) @@ -57,14 +56,11 @@ public class FilterMetadata extends CommonNodeMetadata { public FilterMetadata( final String nodeId, final VariableOrder varOrder, - final String originalSparql, - final int filterIndexWithinSparql, + final String filterSparql, final String parentNodeId, final String childNodeId) { super(nodeId, varOrder); - this.originalSparql = checkNotNull(originalSparql); - checkArgument(filterIndexWithinSparql >= 0 , "filterIndexWithinSparql must be >= 0, was " + filterIndexWithinSparql); - this.filterIndexWithinSparql = filterIndexWithinSparql; + this.filterSparql = checkNotNull(filterSparql); this.parentNodeId = checkNotNull(parentNodeId); this.childNodeId = checkNotNull(childNodeId); } @@ -72,16 +68,8 @@ public class FilterMetadata extends CommonNodeMetadata { /** * @return The original SPARQL query the filter is derived from. */ - public String getOriginalSparql() { - return originalSparql; - } - - /** - * @return The index of the filter within the original SPARQL query that - * this node processes. - */ - public int getFilterIndexWithinSparql() { - return filterIndexWithinSparql; + public String getFilterSparql() { + return filterSparql; } /** @@ -103,8 +91,7 @@ public class FilterMetadata extends CommonNodeMetadata { return Objects.hashCode( super.getNodeId(), super.getVariableOrder(), - originalSparql, - filterIndexWithinSparql, + filterSparql, parentNodeId, childNodeId); } @@ -119,8 +106,7 @@ public class FilterMetadata extends CommonNodeMetadata { if(super.equals(o)) { final FilterMetadata filterMetadata = (FilterMetadata)o; return new EqualsBuilder() - .append(originalSparql, filterMetadata.originalSparql) - .append(filterIndexWithinSparql, filterMetadata.filterIndexWithinSparql) + .append(filterSparql, filterMetadata.filterSparql) .append(parentNodeId, filterMetadata.parentNodeId) .append(childNodeId, filterMetadata.childNodeId) .isEquals(); @@ -140,8 +126,7 @@ public class FilterMetadata extends CommonNodeMetadata { .append(" Variable Order: " + super.getVariableOrder() + "\n") .append(" Parent Node ID: " + parentNodeId + "\n") .append(" Child Node ID: " + childNodeId + "\n") - .append(" Original SPARQL: " + originalSparql + "\n") - .append(" Filter Index Within SPARQL: " + filterIndexWithinSparql + "\n") + .append(" Original SPARQL: " + filterSparql + "\n") .append("}") .toString(); } @@ -164,8 +149,7 @@ public class FilterMetadata extends CommonNodeMetadata { private final String nodeId; private VariableOrder varOrder; - private String originalSparql; - private int filterIndexWithinSparql; + private String filterSparql; private String parentNodeId; private String childNodeId; @@ -202,20 +186,8 @@ public class FilterMetadata extends CommonNodeMetadata { * @param originalSparql - The original SPARQL query the filter is derived from. * @return This builder so that method invocations may be chained. */ - public Builder setOriginalSparql(final String originalSparql) { - this.originalSparql = originalSparql; - return this; - } - - /** - * Set the index of the filter within the original SPARQL query that this node processes. - * - * @param filterIndexWithinSparql - The index of the filter within the original - * SPARQL query that this node processes. - * @return This builder so that method invocations may be chained. - */ - public Builder setFilterIndexWithinSparql(final int filterIndexWithinSparql) { - this.filterIndexWithinSparql = filterIndexWithinSparql; + public Builder setFilterSparql(final String originalSparql) { + this.filterSparql = originalSparql; return this; } @@ -248,8 +220,7 @@ public class FilterMetadata extends CommonNodeMetadata { return new FilterMetadata( nodeId, varOrder, - originalSparql, - filterIndexWithinSparql, + filterSparql, parentNodeId, childNodeId); }