Repository: asterixdb Updated Branches: refs/heads/master 028537d1f -> c3c235743
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/c3c23574/hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/main/java/org/apache/hyracks/storage/am/btree/dataflow/BTreeSearchOperatorNodePushable.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/main/java/org/apache/hyracks/storage/am/btree/dataflow/BTreeSearchOperatorNodePushable.java b/hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/main/java/org/apache/hyracks/storage/am/btree/dataflow/BTreeSearchOperatorNodePushable.java index f269a11..72941f3 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/main/java/org/apache/hyracks/storage/am/btree/dataflow/BTreeSearchOperatorNodePushable.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/main/java/org/apache/hyracks/storage/am/btree/dataflow/BTreeSearchOperatorNodePushable.java @@ -47,8 +47,22 @@ public class BTreeSearchOperatorNodePushable extends IndexSearchOperatorNodePush boolean retainInput, boolean retainMissing, IMissingWriterFactory missingWriterFactory, ISearchOperationCallbackFactory searchCallbackFactory, boolean appendIndexFilter) throws HyracksDataException { + this(ctx, partition, inputRecDesc, lowKeyFields, highKeyFields, lowKeyInclusive, highKeyInclusive, + minFilterFieldIndexes, maxFilterFieldIndexes, indexHelperFactory, retainInput, retainMissing, + missingWriterFactory, searchCallbackFactory, appendIndexFilter, false, null, null); + } + + public BTreeSearchOperatorNodePushable(IHyracksTaskContext ctx, int partition, RecordDescriptor inputRecDesc, + int[] lowKeyFields, int[] highKeyFields, boolean lowKeyInclusive, boolean highKeyInclusive, + int[] minFilterFieldIndexes, int[] maxFilterFieldIndexes, IIndexDataflowHelperFactory indexHelperFactory, + boolean retainInput, boolean retainMissing, IMissingWriterFactory missingWriterFactory, + ISearchOperationCallbackFactory searchCallbackFactory, boolean appendIndexFilter, + boolean appendOpCallbackProceedResult, byte[] searchCallbackProceedResultFalseValue, + byte[] searchCallbackProceedResultTrueValue) throws HyracksDataException { super(ctx, inputRecDesc, partition, minFilterFieldIndexes, maxFilterFieldIndexes, indexHelperFactory, - retainInput, retainMissing, missingWriterFactory, searchCallbackFactory, appendIndexFilter); + retainInput, retainMissing, missingWriterFactory, searchCallbackFactory, appendIndexFilter, + appendOpCallbackProceedResult, searchCallbackProceedResultFalseValue, + searchCallbackProceedResultTrueValue); this.lowKeyInclusive = lowKeyInclusive; this.highKeyInclusive = highKeyInclusive; if (lowKeyFields != null && lowKeyFields.length > 0) { http://git-wip-us.apache.org/repos/asf/asterixdb/blob/c3c23574/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/api/ILSMIndexCursor.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/api/ILSMIndexCursor.java b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/api/ILSMIndexCursor.java index 979ff51..67ac5a8 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/api/ILSMIndexCursor.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/api/ILSMIndexCursor.java @@ -33,4 +33,14 @@ public interface ILSMIndexCursor extends IIndexCursor { * @return the max tuple of the corresponding component's filter */ ITupleReference getFilterMaxTuple(); + + /** + * Returns the result of the current SearchOperationCallback.proceed(). + * This method is used for the secondary-index searches. + * + * @return true if SearchOperationCallback.proceed() succeeded + * false otherwise + */ + boolean getSearchOperationCallbackProceedResult(); + } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/c3c23574/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexSearchOperatorNodePushable.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexSearchOperatorNodePushable.java b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexSearchOperatorNodePushable.java index 41fdc41..f562379 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexSearchOperatorNodePushable.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexSearchOperatorNodePushable.java @@ -90,11 +90,27 @@ public abstract class IndexSearchOperatorNodePushable extends AbstractUnaryInput protected boolean failed = false; private final IOperatorStats stats; + // Used when the result of the search operation callback needs to be passed. + protected boolean appendSearchCallbackProceedResult; + protected byte[] searchCallbackProceedResultFalseValue; + protected byte[] searchCallbackProceedResultTrueValue; + public IndexSearchOperatorNodePushable(IHyracksTaskContext ctx, RecordDescriptor inputRecDesc, int partition, int[] minFilterFieldIndexes, int[] maxFilterFieldIndexes, IIndexDataflowHelperFactory indexHelperFactory, boolean retainInput, boolean retainMissing, IMissingWriterFactory missingWriterFactory, ISearchOperationCallbackFactory searchCallbackFactory, boolean appendIndexFilter) throws HyracksDataException { + this(ctx, inputRecDesc, partition, minFilterFieldIndexes, maxFilterFieldIndexes, indexHelperFactory, + retainInput, retainMissing, missingWriterFactory, searchCallbackFactory, appendIndexFilter, false, null, + null); + } + + public IndexSearchOperatorNodePushable(IHyracksTaskContext ctx, RecordDescriptor inputRecDesc, int partition, + int[] minFilterFieldIndexes, int[] maxFilterFieldIndexes, IIndexDataflowHelperFactory indexHelperFactory, + boolean retainInput, boolean retainMissing, IMissingWriterFactory missingWriterFactory, + ISearchOperationCallbackFactory searchCallbackFactory, boolean appendIndexFilter, + boolean appendSearchCallbackProceedResult, byte[] searchCallbackProceedResultFalseValue, + byte[] searchCallbackProceedResultTrueValue) throws HyracksDataException { this.ctx = ctx; this.indexHelper = indexHelperFactory.create(ctx.getJobletContext().getServiceContext(), partition); this.retainInput = retainInput; @@ -115,6 +131,9 @@ public abstract class IndexSearchOperatorNodePushable extends AbstractUnaryInput maxFilterKey = new PermutingFrameTupleReference(); maxFilterKey.setFieldPermutation(maxFilterFieldIndexes); } + this.appendSearchCallbackProceedResult = appendSearchCallbackProceedResult; + this.searchCallbackProceedResultFalseValue = searchCallbackProceedResultFalseValue; + this.searchCallbackProceedResultTrueValue = searchCallbackProceedResultTrueValue; stats = new OperatorStats(getDisplayName()); if (ctx.getStatsCollector() != null) { ctx.getStatsCollector().add(stats); @@ -139,8 +158,15 @@ public abstract class IndexSearchOperatorNodePushable extends AbstractUnaryInput accessor = new FrameTupleAccessor(inputRecDesc); if (retainMissing) { int fieldCount = getFieldCount(); - nonMatchTupleBuild = new ArrayTupleBuilder(fieldCount); + // Field count in case searchCallback.proceed() result is needed. + int finalFieldCount = appendSearchCallbackProceedResult ? fieldCount + 1 : fieldCount; + nonMatchTupleBuild = new ArrayTupleBuilder(finalFieldCount); buildMissingTuple(fieldCount, nonMatchTupleBuild, nonMatchWriter); + if (appendSearchCallbackProceedResult) { + // Writes the success result in the last field in case we need to write down + // the result of searchOperationCallback.proceed(). This value can't be missing even for this case. + writeSearchCallbackProceedResult(nonMatchTupleBuild, true); + } } else { nonMatchTupleBuild = null; } @@ -183,6 +209,10 @@ public abstract class IndexSearchOperatorNodePushable extends AbstractUnaryInput } ITupleReference tuple = cursor.getTuple(); writeTupleToOutput(tuple); + if (appendSearchCallbackProceedResult) { + writeSearchCallbackProceedResult(tb, + ((ILSMIndexCursor) cursor).getSearchOperationCallbackProceedResult()); + } if (appendIndexFilter) { writeFilterTupleToOutput(((ILSMIndexCursor) cursor).getFilterMinTuple()); writeFilterTupleToOutput(((ILSMIndexCursor) cursor).getFilterMaxTuple()); @@ -274,6 +304,18 @@ public abstract class IndexSearchOperatorNodePushable extends AbstractUnaryInput } } + /** + * Write the result of a SearchCallback.proceed() if it is needed. + */ + private void writeSearchCallbackProceedResult(ArrayTupleBuilder atb, boolean searchCallbackProceedResult) + throws HyracksDataException { + if (!searchCallbackProceedResult) { + atb.addField(searchCallbackProceedResultFalseValue, 0, searchCallbackProceedResultFalseValue.length); + } else { + atb.addField(searchCallbackProceedResultTrueValue, 0, searchCallbackProceedResultTrueValue.length); + } + } + private void writeFilterTupleToOutput(ITupleReference tuple) throws IOException { if (tuple != null) { writeTupleToOutput(tuple); http://git-wip-us.apache.org/repos/asf/asterixdb/blob/c3c23574/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreePointSearchCursor.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreePointSearchCursor.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreePointSearchCursor.java index 78564fd..1209e17 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreePointSearchCursor.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreePointSearchCursor.java @@ -58,6 +58,7 @@ public class LSMBTreePointSearchCursor extends EnforcedIndexCursor implements IL private int foundIn = -1; private ITupleReference frameTuple; private List<ILSMComponent> operationalComponents; + private boolean resultOfSearchCallbackProceed = false; private final long[] hashes = BloomFilter.createHashArray(); @@ -82,7 +83,10 @@ public class LSMBTreePointSearchCursor extends EnforcedIndexCursor implements IL btreeCursors[i].next(); // We use the predicate's to lock the key instead of the tuple that we get from cursor // to avoid copying the tuple when we do the "unlatch dance". - if (reconciled || searchCallback.proceed(predicate.getLowKey())) { + if (!reconciled) { + resultOfSearchCallbackProceed = searchCallback.proceed(predicate.getLowKey()); + } + if (reconciled || resultOfSearchCallbackProceed) { // if proceed is successful, then there's no need for doing the "unlatch dance" if (((ILSMTreeTupleReference) btreeCursors[i].getTuple()).isAntimatter()) { if (reconciled) { @@ -238,4 +242,9 @@ public class LSMBTreePointSearchCursor extends EnforcedIndexCursor implements IL } } } + + @Override + public boolean getSearchOperationCallbackProceedResult() { + return resultOfSearchCallbackProceed; + } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/c3c23574/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeRangeSearchCursor.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeRangeSearchCursor.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeRangeSearchCursor.java index 5d23fef..d8feab1 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeRangeSearchCursor.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeRangeSearchCursor.java @@ -52,6 +52,7 @@ public class LSMBTreeRangeSearchCursor extends LSMIndexSearchCursor { private BTreeAccessor[] btreeAccessors; private ArrayTupleBuilder tupleBuilder; private boolean canCallProceed = true; + private boolean resultOfSearchCallbackProceed = false; private int tupleFromMemoryComponentCount = 0; public LSMBTreeRangeSearchCursor(ILSMIndexOperationContext opCtx) { @@ -104,46 +105,49 @@ public class LSMBTreeRangeSearchCursor extends LSMIndexSearchCursor { while (!outputPriorityQueue.isEmpty() || needPushElementIntoQueue) { if (!outputPriorityQueue.isEmpty()) { PriorityQueueElement queueHead = outputPriorityQueue.peek(); - if (canCallProceed && includeMutableComponent && !searchCallback.proceed(queueHead.getTuple())) { - // In case proceed() fails and there is an in-memory component, - // we can't simply use this element since there might be a change. - PriorityQueueElement mutableElement = remove(outputPriorityQueue, 0); - if (mutableElement != null) { - // Copies the current queue head - if (tupleBuilder == null) { - tupleBuilder = new ArrayTupleBuilder(cmp.getKeyFieldCount()); + if (canCallProceed && includeMutableComponent) { + resultOfSearchCallbackProceed = searchCallback.proceed(queueHead.getTuple()); + if (!resultOfSearchCallbackProceed) { + // In case proceed() fails and there is an in-memory component, + // we can't simply use this element since there might be a change. + PriorityQueueElement mutableElement = remove(outputPriorityQueue, 0); + if (mutableElement != null) { + // Copies the current queue head + if (tupleBuilder == null) { + tupleBuilder = new ArrayTupleBuilder(cmp.getKeyFieldCount()); + } + TupleUtils.copyTuple(tupleBuilder, queueHead.getTuple(), cmp.getKeyFieldCount()); + copyTuple.reset(tupleBuilder.getFieldEndOffsets(), tupleBuilder.getByteArray()); + // Unlatches/unpins the leaf page of the index. + rangeCursors[0].close(); + // Reconcile. + searchCallback.reconcile(copyTuple); + // Re-traverses the index. + reusablePred.setLowKey(copyTuple, true); + btreeAccessors[0].search(rangeCursors[0], reusablePred); + pushIntoQueueFromCursorAndReplaceThisElement(mutableElement); + // now that we have completed the search and we have latches over the pages, + // it is safe to complete the operation.. but as per the API of the callback + // we only complete if we're producing this tuple + // get head again + queueHead = outputPriorityQueue.peek(); + /* + * We need to restart in one of two cases: + * 1. no more elements in the priority queue. + * 2. the key of the head has changed (which means we need to call proceed) + */ + if (queueHead == null || cmp.compare(copyTuple, queueHead.getTuple()) != 0) { + // cancel since we're not continuing + searchCallback.cancel(copyTuple); + continue; + } + searchCallback.complete(copyTuple); + // it is safe to proceed now + } else { + // There are no more elements in the memory component.. can safely skip locking for the + // remaining operations + includeMutableComponent = false; } - TupleUtils.copyTuple(tupleBuilder, queueHead.getTuple(), cmp.getKeyFieldCount()); - copyTuple.reset(tupleBuilder.getFieldEndOffsets(), tupleBuilder.getByteArray()); - // Unlatches/unpins the leaf page of the index. - rangeCursors[0].close(); - // Reconcile. - searchCallback.reconcile(copyTuple); - // Re-traverses the index. - reusablePred.setLowKey(copyTuple, true); - btreeAccessors[0].search(rangeCursors[0], reusablePred); - pushIntoQueueFromCursorAndReplaceThisElement(mutableElement); - // now that we have completed the search and we have latches over the pages, - // it is safe to complete the operation.. but as per the API of the callback - // we only complete if we're producing this tuple - // get head again - queueHead = outputPriorityQueue.peek(); - /* - * We need to restart in one of two cases: - * 1. no more elements in the priority queue. - * 2. the key of the head has changed (which means we need to call proceed) - */ - if (queueHead == null || cmp.compare(copyTuple, queueHead.getTuple()) != 0) { - // cancel since we're not continuing - searchCallback.cancel(copyTuple); - continue; - } - searchCallback.complete(copyTuple); - // it is safe to proceed now - } else { - // There are no more elements in the memory component.. can safely skip locking for the - // remaining operations - includeMutableComponent = false; } } @@ -368,4 +372,10 @@ public class LSMBTreeRangeSearchCursor extends LSMIndexSearchCursor { initPriorityQueue(); canCallProceed = true; } + + @Override + public boolean getSearchOperationCallbackProceedResult() { + return resultOfSearchCallbackProceed; + } + } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/c3c23574/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeSearchCursor.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeSearchCursor.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeSearchCursor.java index 02574ca..baf0d4a 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeSearchCursor.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeSearchCursor.java @@ -104,4 +104,9 @@ public class LSMBTreeSearchCursor extends EnforcedIndexCursor implements ILSMInd public ITupleReference getFilterMaxTuple() { return currentCursor.getFilterMaxTuple(); } + + @Override + public boolean getSearchOperationCallbackProceedResult() { + return false; + } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/c3c23574/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeWithBuddyAbstractCursor.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeWithBuddyAbstractCursor.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeWithBuddyAbstractCursor.java index 8dcbcc4..08e7b91 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeWithBuddyAbstractCursor.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeWithBuddyAbstractCursor.java @@ -166,4 +166,10 @@ public abstract class LSMBTreeWithBuddyAbstractCursor extends EnforcedIndexCurso public ITupleReference doGetTuple() { return frameTuple; } + + @Override + public boolean getSearchOperationCallbackProceedResult() { + return false; + } + } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/c3c23574/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMIndexSearchCursor.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMIndexSearchCursor.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMIndexSearchCursor.java index 900ee32..445a005 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMIndexSearchCursor.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMIndexSearchCursor.java @@ -311,4 +311,9 @@ public abstract class LSMIndexSearchCursor extends EnforcedIndexCursor implement throws HyracksDataException { return cmp.compare(tupleA, tupleB); } + + @Override + public boolean getSearchOperationCallbackProceedResult() { + return false; + } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/c3c23574/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexSearchCursor.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexSearchCursor.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexSearchCursor.java index fea9373..4d444b9 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexSearchCursor.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexSearchCursor.java @@ -216,4 +216,10 @@ public class LSMInvertedIndexSearchCursor extends EnforcedIndexCursor implements } return operationalComponents.get(accessorIndex).getLSMComponentFilter(); } + + @Override + public boolean getSearchOperationCallbackProceedResult() { + return false; + } + } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/c3c23574/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeAbstractCursor.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeAbstractCursor.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeAbstractCursor.java index d41e406..176f767 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeAbstractCursor.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeAbstractCursor.java @@ -43,6 +43,7 @@ import org.apache.hyracks.storage.am.rtree.impls.RTreeSearchCursor; import org.apache.hyracks.storage.am.rtree.impls.SearchPredicate; import org.apache.hyracks.storage.common.EnforcedIndexCursor; import org.apache.hyracks.storage.common.ICursorInitialState; +import org.apache.hyracks.storage.common.ISearchOperationCallback; import org.apache.hyracks.storage.common.ISearchPredicate; import org.apache.hyracks.storage.common.MultiComparator; @@ -54,7 +55,7 @@ public abstract class LSMRTreeAbstractCursor extends EnforcedIndexCursor impleme protected RTreeAccessor[] rtreeAccessors; protected BTreeAccessor[] btreeAccessors; protected BloomFilter[] bloomFilters; - private MultiComparator btreeCmp; + protected MultiComparator btreeCmp; protected int numberOfTrees; protected SearchPredicate rtreeSearchPredicate; protected RangePredicate btreeRangePredicate; @@ -63,6 +64,7 @@ public abstract class LSMRTreeAbstractCursor extends EnforcedIndexCursor impleme protected ILSMHarness lsmHarness; protected boolean foundNext; protected final ILSMIndexOperationContext opCtx; + protected ISearchOperationCallback searchCallback; protected List<ILSMComponent> operationalComponents; protected long[] hashes = BloomFilter.createHashArray(); @@ -86,6 +88,7 @@ public abstract class LSMRTreeAbstractCursor extends EnforcedIndexCursor impleme operationalComponents = lsmInitialState.getOperationalComponents(); lsmHarness = lsmInitialState.getLSMHarness(); numberOfTrees = operationalComponents.size(); + searchCallback = lsmInitialState.getSearchOperationCallback(); int numComponenets = operationalComponents.size(); if (rtreeCursors == null || rtreeCursors.length != numComponenets) { @@ -180,4 +183,9 @@ public abstract class LSMRTreeAbstractCursor extends EnforcedIndexCursor impleme return frameTuple; } + @Override + public boolean getSearchOperationCallbackProceedResult() { + return false; + } + } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/c3c23574/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeSearchCursor.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeSearchCursor.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeSearchCursor.java index d485f64..c79735f 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeSearchCursor.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeSearchCursor.java @@ -31,6 +31,7 @@ public class LSMRTreeSearchCursor extends LSMRTreeAbstractCursor { private int currentCursor; private final PermutingTupleReference btreeTuple; + private boolean resultOfsearchCallbackProceed = false; public LSMRTreeSearchCursor(ILSMIndexOperationContext opCtx, int[] buddyBTreeFields) { super(opCtx); @@ -96,6 +97,8 @@ public class LSMRTreeSearchCursor extends LSMRTreeAbstractCursor { while (rtreeCursors[currentCursor].hasNext()) { rtreeCursors[currentCursor].next(); ITupleReference currentTuple = rtreeCursors[currentCursor].getTuple(); + // Call proceed() to do necessary operations before returning this tuple. + resultOfsearchCallbackProceed = searchCallback.proceed(currentTuple); btreeTuple.reset(rtreeCursors[currentCursor].getTuple()); boolean killerTupleFound = false; for (int i = 0; i < currentCursor && !killerTupleFound; i++) { @@ -138,4 +141,9 @@ public class LSMRTreeSearchCursor extends LSMRTreeAbstractCursor { searchNextCursor(); } + @Override + public boolean getSearchOperationCallbackProceedResult() { + return resultOfsearchCallbackProceed; + } + } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/c3c23574/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeWithAntiMatterTuplesFlushCursor.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeWithAntiMatterTuplesFlushCursor.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeWithAntiMatterTuplesFlushCursor.java index 77bf58e..449c711 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeWithAntiMatterTuplesFlushCursor.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeWithAntiMatterTuplesFlushCursor.java @@ -173,4 +173,9 @@ public class LSMRTreeWithAntiMatterTuplesFlushCursor extends EnforcedIndexCursor return null; } + @Override + public boolean getSearchOperationCallbackProceedResult() { + return false; + } + } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/c3c23574/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeWithAntiMatterTuplesSearchCursor.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeWithAntiMatterTuplesSearchCursor.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeWithAntiMatterTuplesSearchCursor.java index 4547063..094acbc 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeWithAntiMatterTuplesSearchCursor.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeWithAntiMatterTuplesSearchCursor.java @@ -59,6 +59,7 @@ public class LSMRTreeWithAntiMatterTuplesSearchCursor extends LSMIndexSearchCurs private int numMemoryComponents; private boolean open; protected ISearchOperationCallback searchCallback; + private boolean resultOfsearchCallBackProceed = false; public LSMRTreeWithAntiMatterTuplesSearchCursor(ILSMIndexOperationContext opCtx) { this(opCtx, false); @@ -150,7 +151,7 @@ public class LSMRTreeWithAntiMatterTuplesSearchCursor extends LSMIndexSearchCurs // reconcile() and complete() can be added later after considering the semantics. // Call proceed() to do necessary operations before returning this tuple. - searchCallback.proceed(currentTuple); + resultOfsearchCallBackProceed = searchCallback.proceed(currentTuple); if (searchMemBTrees(currentTuple, currentCursor)) { // anti-matter tuple is NOT found foundNext = true; @@ -169,7 +170,7 @@ public class LSMRTreeWithAntiMatterTuplesSearchCursor extends LSMIndexSearchCurs // reconcile() and complete() can be added later after considering the semantics. // Call proceed() to do necessary operations before returning this tuple. - searchCallback.proceed(diskRTreeTuple); + resultOfsearchCallBackProceed = searchCallback.proceed(diskRTreeTuple); if (searchMemBTrees(diskRTreeTuple, numMemoryComponents)) { // anti-matter tuple is NOT found foundNext = true; @@ -185,7 +186,7 @@ public class LSMRTreeWithAntiMatterTuplesSearchCursor extends LSMIndexSearchCurs // reconcile() and complete() can be added later after considering the semantics. // Call proceed() to do necessary operations before returning this tuple. // Since in-memory components don't exist, we can skip searching in-memory B-Trees. - searchCallback.proceed(diskRTreeTuple); + resultOfsearchCallBackProceed = searchCallback.proceed(diskRTreeTuple); foundNext = true; frameTuple = diskRTreeTuple; return true; @@ -311,4 +312,9 @@ public class LSMRTreeWithAntiMatterTuplesSearchCursor extends LSMIndexSearchCurs } } } + + @Override + public boolean getSearchOperationCallbackProceedResult() { + return resultOfsearchCallBackProceed; + } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/c3c23574/hyracks-fullstack/hyracks/hyracks-storage-am-rtree/src/main/java/org/apache/hyracks/storage/am/rtree/dataflow/RTreeSearchOperatorDescriptor.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-rtree/src/main/java/org/apache/hyracks/storage/am/rtree/dataflow/RTreeSearchOperatorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-storage-am-rtree/src/main/java/org/apache/hyracks/storage/am/rtree/dataflow/RTreeSearchOperatorDescriptor.java index 0a4d2a7..46727cd 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-rtree/src/main/java/org/apache/hyracks/storage/am/rtree/dataflow/RTreeSearchOperatorDescriptor.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-rtree/src/main/java/org/apache/hyracks/storage/am/rtree/dataflow/RTreeSearchOperatorDescriptor.java @@ -44,12 +44,26 @@ public class RTreeSearchOperatorDescriptor extends AbstractSingleActivityOperato protected final boolean retainMissing; protected final IMissingWriterFactory missingWriterFactory; protected final ISearchOperationCallbackFactory searchCallbackFactory; + protected boolean appendOpCallbackProceedResult; + protected byte[] searchCallbackProceedResultFalseValue; + protected byte[] searchCallbackProceedResultTrueValue; public RTreeSearchOperatorDescriptor(IOperatorDescriptorRegistry spec, RecordDescriptor outRecDesc, int[] keyFields, boolean lowKeyInclusive, boolean highKeyInclusive, IIndexDataflowHelperFactory indexHelperFactory, boolean retainInput, boolean retainMissing, IMissingWriterFactory missingWriterFactory, ISearchOperationCallbackFactory searchCallbackFactory, int[] minFilterFieldIndexes, int[] maxFilterFieldIndexes, boolean appendIndexFilter) { + this(spec, outRecDesc, keyFields, lowKeyInclusive, highKeyInclusive, indexHelperFactory, retainInput, + retainMissing, missingWriterFactory, searchCallbackFactory, minFilterFieldIndexes, + maxFilterFieldIndexes, appendIndexFilter, false, null, null); + } + + public RTreeSearchOperatorDescriptor(IOperatorDescriptorRegistry spec, RecordDescriptor outRecDesc, int[] keyFields, + boolean lowKeyInclusive, boolean highKeyInclusive, IIndexDataflowHelperFactory indexHelperFactory, + boolean retainInput, boolean retainMissing, IMissingWriterFactory missingWriterFactory, + ISearchOperationCallbackFactory searchCallbackFactory, int[] minFilterFieldIndexes, + int[] maxFilterFieldIndexes, boolean appendIndexFilter, boolean appendOpCallbackProceedResult, + byte[] searchCallbackProceedResultFalseValue, byte[] searchCallbackProceedResultTrueValue) { super(spec, 1, 1); this.indexHelperFactory = indexHelperFactory; this.retainInput = retainInput; @@ -63,6 +77,9 @@ public class RTreeSearchOperatorDescriptor extends AbstractSingleActivityOperato this.maxFilterFieldIndexes = maxFilterFieldIndexes; this.appendIndexFilter = appendIndexFilter; this.outRecDescs[0] = outRecDesc; + this.appendOpCallbackProceedResult = appendOpCallbackProceedResult; + this.searchCallbackProceedResultFalseValue = searchCallbackProceedResultFalseValue; + this.searchCallbackProceedResultTrueValue = searchCallbackProceedResultTrueValue; } @Override @@ -71,6 +88,7 @@ public class RTreeSearchOperatorDescriptor extends AbstractSingleActivityOperato return new RTreeSearchOperatorNodePushable(ctx, partition, recordDescProvider.getInputRecordDescriptor(getActivityId(), 0), keyFields, minFilterFieldIndexes, maxFilterFieldIndexes, indexHelperFactory, retainInput, retainMissing, missingWriterFactory, - searchCallbackFactory, appendIndexFilter); + searchCallbackFactory, appendIndexFilter, appendOpCallbackProceedResult, + searchCallbackProceedResultFalseValue, searchCallbackProceedResultTrueValue); } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/c3c23574/hyracks-fullstack/hyracks/hyracks-storage-am-rtree/src/main/java/org/apache/hyracks/storage/am/rtree/dataflow/RTreeSearchOperatorNodePushable.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-rtree/src/main/java/org/apache/hyracks/storage/am/rtree/dataflow/RTreeSearchOperatorNodePushable.java b/hyracks-fullstack/hyracks/hyracks-storage-am-rtree/src/main/java/org/apache/hyracks/storage/am/rtree/dataflow/RTreeSearchOperatorNodePushable.java index 07b6a60..886285c 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-rtree/src/main/java/org/apache/hyracks/storage/am/rtree/dataflow/RTreeSearchOperatorNodePushable.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-rtree/src/main/java/org/apache/hyracks/storage/am/rtree/dataflow/RTreeSearchOperatorNodePushable.java @@ -42,8 +42,22 @@ public class RTreeSearchOperatorNodePushable extends IndexSearchOperatorNodePush IIndexDataflowHelperFactory indexHelperFactory, boolean retainInput, boolean retainMissing, IMissingWriterFactory missingWriterFactory, ISearchOperationCallbackFactory searchCallbackFactory, boolean appendIndexFilter) throws HyracksDataException { + this(ctx, partition, inputRecDesc, keyFields, minFilterFieldIndexes, maxFilterFieldIndexes, indexHelperFactory, + retainInput, retainMissing, missingWriterFactory, searchCallbackFactory, appendIndexFilter, false, null, + null); + } + + public RTreeSearchOperatorNodePushable(IHyracksTaskContext ctx, int partition, RecordDescriptor inputRecDesc, + int[] keyFields, int[] minFilterFieldIndexes, int[] maxFilterFieldIndexes, + IIndexDataflowHelperFactory indexHelperFactory, boolean retainInput, boolean retainMissing, + IMissingWriterFactory missingWriterFactory, ISearchOperationCallbackFactory searchCallbackFactory, + boolean appendIndexFilter, boolean appendOpCallbackProceedResult, + byte[] searchCallbackProceedResultFalseValue, byte[] searchCallbackProceedResultTrueValue) + throws HyracksDataException { super(ctx, inputRecDesc, partition, minFilterFieldIndexes, maxFilterFieldIndexes, indexHelperFactory, - retainInput, retainMissing, missingWriterFactory, searchCallbackFactory, appendIndexFilter); + retainInput, retainMissing, missingWriterFactory, searchCallbackFactory, appendIndexFilter, + appendOpCallbackProceedResult, searchCallbackProceedResultFalseValue, + searchCallbackProceedResultTrueValue); if (keyFields != null && keyFields.length > 0) { searchKey = new PermutingFrameTupleReference(); searchKey.setFieldPermutation(keyFields);