This is an automated email from the ASF dual-hosted git repository.
luochen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/asterixdb.git
The following commit(s) were added to refs/heads/master by this push:
new 6e6b342 [ASTERIXDB-2708] Introduce batch and stateful point cursors
6e6b342 is described below
commit 6e6b3427d82a5b8b10aadb8aff095259cbfb9535
Author: luochen <[email protected]>
AuthorDate: Fri Apr 10 09:43:35 2020 -0700
[ASTERIXDB-2708] Introduce batch and stateful point cursors
- user model changes: no
- storage format changes: no
- interface changes: no
Details:
- Add a stateful btree point search cursor that uses previous search history
and exponential search algorithm to optimize point search performance
- Add a batching LSM btree point search cursor to perform point searches for
a batch of keys. Search states are cleared after each batch.
Change-Id: I0b0ade723895bcd71463df7a9703fe78a238e6c7
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/5484
Contrib: Jenkins <[email protected]>
Integration-Tests: Jenkins <[email protected]>
Tested-by: Jenkins <[email protected]>
Reviewed-by: Luo Chen <[email protected]>
Reviewed-by: Murtadha Hubail <[email protected]>
---
.../operators/physical/BTreeSearchPOperator.java | 30 +++-
.../metadata/declared/DatasetDataSource.java | 2 +-
.../metadata/declared/MetadataProvider.java | 18 ++-
.../storage/am/bloomfilter/impls/BloomFilter.java | 10 +-
.../storage/am/btree/api/IBTreeLeafFrame.java | 3 +
.../dataflow/BTreeSearchOperatorDescriptor.java | 4 +-
.../btree/frames/BTreeFieldPrefixNSMLeafFrame.java | 6 +
.../storage/am/btree/frames/BTreeNSMLeafFrame.java | 6 +
.../am/btree/frames/OrderedSlotManager.java | 65 ++++++++
.../hyracks/storage/am/btree/impls/BTree.java | 2 +-
.../storage/am/btree/impls/BatchPredicate.java | 122 +++++++++++++++
.../hyracks/storage/am/btree/impls/DiskBTree.java | 95 ++++--------
.../am/btree/impls/DiskBTreePointSearchCursor.java | 42 ++++-
.../am/btree/impls/DiskBTreeRangeSearchCursor.java | 39 +----
.../am/btree/impls/FieldPrefixSlotManager.java | 22 +++
.../storage/am/common/api/ISlotManager.java | 3 +
.../dataflow/IndexSearchOperatorNodePushable.java | 6 +-
...LSMBTreeBatchPointSearchOperatorDescriptor.java | 57 +++++++
...MBTreeBatchPointSearchOperatorNodePushable.java | 133 ++++++++++++++++
.../impls/LSMBTreeBatchPointSearchCursor.java | 90 +++++++++++
.../lsm/btree/impls/LSMBTreePointSearchCursor.java | 52 ++++---
.../am/lsm/rtree/impls/LSMRTreeAbstractCursor.java | 2 +-
.../am/rtree/impls/UnorderedSlotManager.java | 6 +
.../am/btree/DiskBTreePointSearchCursorTest.java | 2 +-
.../am/btree/DiskBTreeSearchCursorTest.java | 22 +--
.../cursor/LSMBTreeBatchPointSearchCursorTest.java | 170 +++++++++++++++++++++
26 files changed, 863 insertions(+), 146 deletions(-)
diff --git
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/BTreeSearchPOperator.java
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/BTreeSearchPOperator.java
index 94d3461..8c2c1df 100644
---
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/BTreeSearchPOperator.java
+++
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/BTreeSearchPOperator.java
@@ -50,6 +50,7 @@ import
org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperato
import
org.apache.hyracks.algebricks.core.algebra.operators.logical.UnnestMapOperator;
import
org.apache.hyracks.algebricks.core.algebra.properties.BroadcastPartitioningProperty;
import
org.apache.hyracks.algebricks.core.algebra.properties.ILocalStructuralProperty;
+import
org.apache.hyracks.algebricks.core.algebra.properties.ILocalStructuralProperty.PropertyType;
import org.apache.hyracks.algebricks.core.algebra.properties.INodeDomain;
import
org.apache.hyracks.algebricks.core.algebra.properties.IPartitioningRequirementsCoordinator;
import
org.apache.hyracks.algebricks.core.algebra.properties.IPhysicalPropertiesVector;
@@ -138,7 +139,8 @@ public class BTreeSearchPOperator extends
IndexSearchPOperator {
builder.getJobSpec(), opSchema, typeEnv, context,
jobGenParams.getRetainInput(), retainMissing, dataset,
jobGenParams.getIndexName(), lowKeyIndexes, highKeyIndexes,
jobGenParams.isLowKeyInclusive(),
jobGenParams.isHighKeyInclusive(), propagateFilter,
minFilterFieldIndexes, maxFilterFieldIndexes,
- tupleFilterFactory, outputLimit,
unnestMap.getGenerateCallBackProceedResultVar());
+ tupleFilterFactory, outputLimit,
unnestMap.getGenerateCallBackProceedResultVar(),
+ isPrimaryIndexPointSearch(op));
IOperatorDescriptor opDesc = btreeSearch.first;
opDesc.setSourceLocation(unnestMap.getSourceLocation());
@@ -149,6 +151,32 @@ public class BTreeSearchPOperator extends
IndexSearchPOperator {
builder.contributeGraphEdge(srcExchange, 0, unnestMap, 0);
}
+ private boolean isPrimaryIndexPointSearch(ILogicalOperator op) {
+ if (!isEqCondition || !isPrimaryIndex ||
!lowKeyVarList.equals(highKeyVarList)) {
+ return false;
+ }
+ Index searchIndex = ((DataSourceIndex) idx).getIndex();
+ int numberOfKeyFields = searchIndex.getKeyFieldNames().size();
+
+ if (lowKeyVarList.size() != numberOfKeyFields || highKeyVarList.size()
!= numberOfKeyFields) {
+ return false;
+ }
+
+ IPhysicalPropertiesVector vector =
op.getInputs().get(0).getValue().getDeliveredPhysicalProperties();
+ if (vector != null) {
+ for (ILocalStructuralProperty property :
vector.getLocalProperties()) {
+ if (property.getPropertyType() ==
PropertyType.LOCAL_ORDER_PROPERTY) {
+ LocalOrderProperty orderProperty = (LocalOrderProperty)
property;
+ if (orderProperty.getColumns().equals(lowKeyVarList)
+ && orderProperty.getOrders().stream().allMatch(o
-> o.equals(OrderKind.ASC))) {
+ return true;
+ }
+ }
+ }
+ }
+ return false;
+ }
+
@Override
public PhysicalRequirements
getRequiredPropertiesForChildren(ILogicalOperator op,
IPhysicalPropertiesVector reqdByParent, IOptimizationContext
context) {
diff --git
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/DatasetDataSource.java
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/DatasetDataSource.java
index f3a88ab..f5ca79a 100644
---
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/DatasetDataSource.java
+++
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/DatasetDataSource.java
@@ -127,7 +127,7 @@ public class DatasetDataSource extends DataSource {
return metadataProvider.buildBtreeRuntime(jobSpec, opSchema,
typeEnv, context, true, false,
((DatasetDataSource) dataSource).getDataset(),
primaryIndex.getIndexName(), null, null, true,
true, false, minFilterFieldIndexes,
maxFilterFieldIndexes, tupleFilterFactory, outputLimit,
- false);
+ false, false);
default:
throw new AlgebricksException("Unknown datasource type");
}
diff --git
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
index 1ee59bf..ff68e78 100644
---
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
+++
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
@@ -148,6 +148,7 @@ import
org.apache.hyracks.storage.am.common.api.ITupleFilterFactory;
import
org.apache.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
import
org.apache.hyracks.storage.am.common.dataflow.IndexDataflowHelperFactory;
import org.apache.hyracks.storage.am.common.ophelpers.IndexOperation;
+import
org.apache.hyracks.storage.am.lsm.btree.dataflow.LSMBTreeBatchPointSearchOperatorDescriptor;
import
org.apache.hyracks.storage.am.lsm.invertedindex.dataflow.BinaryTokenizerOperatorDescriptor;
import
org.apache.hyracks.storage.am.lsm.invertedindex.tokenizers.IBinaryTokenizerFactory;
import
org.apache.hyracks.storage.am.rtree.dataflow.RTreeSearchOperatorDescriptor;
@@ -499,7 +500,7 @@ public class MetadataProvider implements
IMetadataProvider<DataSourceId, String>
boolean retainMissing, Dataset dataset, String indexName, int[]
lowKeyFields, int[] highKeyFields,
boolean lowKeyInclusive, boolean highKeyInclusive, boolean
propagateFilter, int[] minFilterFieldIndexes,
int[] maxFilterFieldIndexes, ITupleFilterFactory
tupleFilterFactory, long outputLimit,
- boolean isIndexOnlyPlan) throws AlgebricksException {
+ boolean isIndexOnlyPlan, boolean isPrimaryIndexPointSearch) throws
AlgebricksException {
boolean isSecondary = true;
Index primaryIndex = MetadataManager.INSTANCE.getIndex(mdTxnCtx,
dataset.getDataverseName(),
dataset.getDatasetName(), dataset.getDatasetName());
@@ -540,11 +541,16 @@ public class MetadataProvider implements
IMetadataProvider<DataSourceId, String>
BTreeSearchOperatorDescriptor btreeSearchOp;
if (dataset.getDatasetType() == DatasetType.INTERNAL) {
- btreeSearchOp = new BTreeSearchOperatorDescriptor(jobSpec,
outputRecDesc, lowKeyFields, highKeyFields,
- lowKeyInclusive, highKeyInclusive, indexHelperFactory,
retainInput, retainMissing,
- context.getMissingWriterFactory(), searchCallbackFactory,
minFilterFieldIndexes,
- maxFilterFieldIndexes, propagateFilter,
tupleFilterFactory, outputLimit, proceedIndexOnlyPlan,
- failValueForIndexOnlyPlan, successValueForIndexOnlyPlan);
+ btreeSearchOp = !isSecondary && isPrimaryIndexPointSearch
+ ? new LSMBTreeBatchPointSearchOperatorDescriptor(jobSpec,
outputRecDesc, lowKeyFields,
+ highKeyFields, lowKeyInclusive, highKeyInclusive,
indexHelperFactory, retainInput,
+ retainMissing, context.getMissingWriterFactory(),
searchCallbackFactory,
+ minFilterFieldIndexes, maxFilterFieldIndexes,
tupleFilterFactory, outputLimit)
+ : new BTreeSearchOperatorDescriptor(jobSpec,
outputRecDesc, lowKeyFields, highKeyFields,
+ lowKeyInclusive, highKeyInclusive,
indexHelperFactory, retainInput, retainMissing,
+ context.getMissingWriterFactory(),
searchCallbackFactory, minFilterFieldIndexes,
+ maxFilterFieldIndexes, propagateFilter,
tupleFilterFactory, outputLimit,
+ proceedIndexOnlyPlan, failValueForIndexOnlyPlan,
successValueForIndexOnlyPlan);
} else {
btreeSearchOp = new ExternalBTreeSearchOperatorDescriptor(jobSpec,
outputRecDesc, lowKeyFields,
highKeyFields, lowKeyInclusive, highKeyInclusive,
indexHelperFactory, retainInput, retainMissing,
diff --git
a/hyracks-fullstack/hyracks/hyracks-storage-am-bloomfilter/src/main/java/org/apache/hyracks/storage/am/bloomfilter/impls/BloomFilter.java
b/hyracks-fullstack/hyracks/hyracks-storage-am-bloomfilter/src/main/java/org/apache/hyracks/storage/am/bloomfilter/impls/BloomFilter.java
index 062b1c7..6c0badc 100644
---
a/hyracks-fullstack/hyracks/hyracks-storage-am-bloomfilter/src/main/java/org/apache/hyracks/storage/am/bloomfilter/impls/BloomFilter.java
+++
b/hyracks-fullstack/hyracks/hyracks-storage-am-bloomfilter/src/main/java/org/apache/hyracks/storage/am/bloomfilter/impls/BloomFilter.java
@@ -124,10 +124,14 @@ public class BloomFilter {
}
public boolean contains(ITupleReference tuple, long[] hashes) throws
HyracksDataException {
+ computeHashes(tuple, hashes);
+ return contains(hashes);
+ }
+
+ public boolean contains(long[] hashes) throws HyracksDataException {
if (numPages == 0) {
return false;
}
- MurmurHash128Bit.hash3_x64_128(tuple, keyFields, SEED, hashes);
if (version == BLOCKED_BLOOM_FILTER_VERSION) {
return blockContains(hashes);
} else {
@@ -135,6 +139,10 @@ public class BloomFilter {
}
}
+ public void computeHashes(ITupleReference tuple, long[] hashes) {
+ MurmurHash128Bit.hash3_x64_128(tuple, keyFields, SEED, hashes);
+ }
+
private boolean blockContains(long[] hashes) throws HyracksDataException {
// take first hash to compute block id
long hash = Math.abs(hashes[0] % numBits);
diff --git
a/hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/main/java/org/apache/hyracks/storage/am/btree/api/IBTreeLeafFrame.java
b/hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/main/java/org/apache/hyracks/storage/am/btree/api/IBTreeLeafFrame.java
index 2a3c1eb..7ccf08f 100644
---
a/hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/main/java/org/apache/hyracks/storage/am/btree/api/IBTreeLeafFrame.java
+++
b/hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/main/java/org/apache/hyracks/storage/am/btree/api/IBTreeLeafFrame.java
@@ -32,6 +32,9 @@ public interface IBTreeLeafFrame extends IBTreeFrame {
public int findTupleIndex(ITupleReference searchKey,
ITreeIndexTupleReference pageTuple, MultiComparator cmp,
FindTupleMode ftm, FindTupleNoExactMatchPolicy ftp) throws
HyracksDataException;
+ public int findTupleIndex(ITupleReference searchKey,
ITreeIndexTupleReference pageTuple, MultiComparator cmp,
+ int startIndex) throws HyracksDataException;
+
public int findUpdateTupleIndex(ITupleReference tuple) throws
HyracksDataException;
public int findUpsertTupleIndex(ITupleReference tuple) throws
HyracksDataException;
diff --git
a/hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/main/java/org/apache/hyracks/storage/am/btree/dataflow/BTreeSearchOperatorDescriptor.java
b/hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/main/java/org/apache/hyracks/storage/am/btree/dataflow/BTreeSearchOperatorDescriptor.java
index 001e250..128929c 100644
---
a/hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/main/java/org/apache/hyracks/storage/am/btree/dataflow/BTreeSearchOperatorDescriptor.java
+++
b/hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/main/java/org/apache/hyracks/storage/am/btree/dataflow/BTreeSearchOperatorDescriptor.java
@@ -38,8 +38,8 @@ public class BTreeSearchOperatorDescriptor extends
AbstractSingleActivityOperato
protected final int[] highKeyFields;
protected final boolean lowKeyInclusive;
protected final boolean highKeyInclusive;
- private final int[] minFilterFieldIndexes;
- private final int[] maxFilterFieldIndexes;
+ protected final int[] minFilterFieldIndexes;
+ protected final int[] maxFilterFieldIndexes;
protected final IIndexDataflowHelperFactory indexHelperFactory;
protected final boolean retainInput;
protected final boolean retainMissing;
diff --git
a/hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/main/java/org/apache/hyracks/storage/am/btree/frames/BTreeFieldPrefixNSMLeafFrame.java
b/hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/main/java/org/apache/hyracks/storage/am/btree/frames/BTreeFieldPrefixNSMLeafFrame.java
index eb87c57..6584df3 100644
---
a/hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/main/java/org/apache/hyracks/storage/am/btree/frames/BTreeFieldPrefixNSMLeafFrame.java
+++
b/hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/main/java/org/apache/hyracks/storage/am/btree/frames/BTreeFieldPrefixNSMLeafFrame.java
@@ -769,6 +769,12 @@ public class BTreeFieldPrefixNSMLeafFrame implements
IBTreeLeafFrame {
}
@Override
+ public int findTupleIndex(ITupleReference searchKey,
ITreeIndexTupleReference pageTuple, MultiComparator cmp,
+ int startIndex) throws HyracksDataException {
+ throw new UnsupportedOperationException("Stateful search is not
supported by BTreeFieldPrefixNSMLeafFrame");
+ }
+
+ @Override
public int getPageHeaderSize() {
return NEXT_LEAF_OFFSET;
}
diff --git
a/hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/main/java/org/apache/hyracks/storage/am/btree/frames/BTreeNSMLeafFrame.java
b/hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/main/java/org/apache/hyracks/storage/am/btree/frames/BTreeNSMLeafFrame.java
index 09a4db4..b1add3c 100644
---
a/hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/main/java/org/apache/hyracks/storage/am/btree/frames/BTreeNSMLeafFrame.java
+++
b/hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/main/java/org/apache/hyracks/storage/am/btree/frames/BTreeNSMLeafFrame.java
@@ -313,6 +313,12 @@ public class BTreeNSMLeafFrame extends TreeIndexNSMFrame
implements IBTreeLeafFr
}
@Override
+ public int findTupleIndex(ITupleReference searchKey,
ITreeIndexTupleReference pageTuple, MultiComparator cmp,
+ int startIndex) throws HyracksDataException {
+ return slotManager.findTupleIndex(searchKey, pageTuple, cmp,
startIndex);
+ }
+
+ @Override
public void setMultiComparator(MultiComparator cmp) {
this.cmp = cmp;
}
diff --git
a/hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/main/java/org/apache/hyracks/storage/am/btree/frames/OrderedSlotManager.java
b/hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/main/java/org/apache/hyracks/storage/am/btree/frames/OrderedSlotManager.java
index bb06ec0..3931ebc 100644
---
a/hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/main/java/org/apache/hyracks/storage/am/btree/frames/OrderedSlotManager.java
+++
b/hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/main/java/org/apache/hyracks/storage/am/btree/frames/OrderedSlotManager.java
@@ -104,6 +104,71 @@ public class OrderedSlotManager extends
AbstractSlotManager {
}
@Override
+ public int findTupleIndex(ITupleReference searchKey,
ITreeIndexTupleReference frameTuple, MultiComparator multiCmp,
+ int startIndex) throws HyracksDataException {
+ int tupleCount = frame.getTupleCount();
+ if (tupleCount == 0) {
+ return -1;
+ } else if (startIndex >= tupleCount) {
+ return -tupleCount - 1;
+ }
+
+ int step = 1;
+ int index = startIndex;
+ int prevIndex = index;
+
+ // now we have key index < tupleCount - 1
+ // use exponential search to locate the key range that contains the
search key
+ // https://en.wikipedia.org/wiki/Exponential_search
+ while (index < tupleCount) {
+ frameTuple.resetByTupleIndex(frame, index);
+ int cmp = multiCmp.compare(searchKey, frameTuple);
+ if (cmp == 0) {
+ return index;
+ } else if (cmp > 0) {
+ prevIndex = index;
+ if (index + step < tupleCount) {
+ index = index + step;
+ step = step << 1;
+ } else {
+ if (index == tupleCount - 1) {
+ // we've already reached the last tuple
+ return -tupleCount - 1;
+ } else {
+ index = tupleCount - 1;
+ }
+ }
+ } else {
+ break;
+ }
+ }
+
+ if (index == startIndex) {
+ return -index - 1;
+ }
+
+ // perform binary search between prevPosition and position
+ // we must have prevIndex < keyIndex < index
+ // adopted from Collections.binarySearch
+ int low = prevIndex + 1;
+ int high = index - 1;
+
+ while (low <= high) {
+ int mid = (low + high) >>> 1;
+ frameTuple.resetByTupleIndex(frame, mid);
+ int cmp = multiCmp.compare(searchKey, frameTuple);
+ if (cmp < 0) {
+ high = mid - 1;
+ } else if (cmp > 0) {
+ low = mid + 1;
+ } else {
+ return mid;
+ }
+ }
+ return -low - 1;
+ }
+
+ @Override
public int insertSlot(int tupleIndex, int tupleOff) {
int slotOff = getSlotOff(tupleIndex);
if (tupleIndex == GREATEST_KEY_INDICATOR) {
diff --git
a/hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/main/java/org/apache/hyracks/storage/am/btree/impls/BTree.java
b/hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/main/java/org/apache/hyracks/storage/am/btree/impls/BTree.java
index 3a062af..1f0e447 100644
---
a/hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/main/java/org/apache/hyracks/storage/am/btree/impls/BTree.java
+++
b/hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/main/java/org/apache/hyracks/storage/am/btree/impls/BTree.java
@@ -887,7 +887,7 @@ public class BTree extends AbstractTreeIndex {
.getOrDefault(HyracksConstants.INDEX_CURSOR_STATS,
NoOpIndexCursorStats.INSTANCE));
}
- public BTreeRangeSearchCursor createPointCursor(boolean exclusive) {
+ public BTreeRangeSearchCursor createPointCursor(boolean exclusive,
boolean stateful) {
return createSearchCursor(exclusive);
}
diff --git
a/hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/main/java/org/apache/hyracks/storage/am/btree/impls/BatchPredicate.java
b/hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/main/java/org/apache/hyracks/storage/am/btree/impls/BatchPredicate.java
new file mode 100644
index 0000000..acfde93
--- /dev/null
+++
b/hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/main/java/org/apache/hyracks/storage/am/btree/impls/BatchPredicate.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hyracks.storage.am.btree.impls;
+
+import org.apache.hyracks.api.comm.IFrameTupleAccessor;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.dataflow.common.data.accessors.FrameTupleReference;
+import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
+import
org.apache.hyracks.dataflow.common.data.accessors.PermutingFrameTupleReference;
+import org.apache.hyracks.storage.common.MultiComparator;
+
+public class BatchPredicate extends RangePredicate {
+
+ private static final long serialVersionUID = 1L;
+
+ protected final FrameTupleReference keyTuple;
+ protected final FrameTupleReference minFilterTuple;
+ protected final FrameTupleReference maxFilterTuple;
+
+ protected int keyIndex = -1;
+ protected IFrameTupleAccessor accessor;
+
+ public BatchPredicate(IFrameTupleAccessor accessor, MultiComparator
keyCmp, int[] keyFields,
+ int[] minFilterKeyFields, int[] maxFieldKeyFields) {
+ super(null, null, true, true, keyCmp, keyCmp);
+ this.keyIndex = 0;
+ this.accessor = accessor;
+ if (keyFields != null && keyFields.length > 0) {
+ this.keyTuple = new PermutingFrameTupleReference(keyFields);
+ } else {
+ this.keyTuple = new FrameTupleReference();
+ }
+ if (minFilterKeyFields != null && minFilterKeyFields.length > 0) {
+ this.minFilterTuple = new
PermutingFrameTupleReference(minFilterKeyFields);
+ } else {
+ this.minFilterTuple = null;
+ }
+ if (maxFieldKeyFields != null && maxFieldKeyFields.length > 0) {
+ this.maxFilterTuple = new
PermutingFrameTupleReference(maxFieldKeyFields);
+ } else {
+ this.maxFilterTuple = null;
+ }
+
+ }
+
+ public void reset(IFrameTupleAccessor accessor) {
+ this.keyIndex = -1;
+ this.accessor = accessor;
+ }
+
+ private boolean isValid() {
+ return accessor != null && keyIndex >= 0 && keyIndex <
accessor.getTupleCount();
+ }
+
+ @Override
+ public ITupleReference getLowKey() {
+ return isValid() ? keyTuple : null;
+ }
+
+ @Override
+ public ITupleReference getHighKey() {
+ return isValid() ? keyTuple : null;
+ }
+
+ @Override
+ public ITupleReference getMinFilterTuple() {
+ return isValid() ? minFilterTuple : null;
+ }
+
+ @Override
+ public ITupleReference getMaxFilterTuple() {
+ return isValid() ? maxFilterTuple : null;
+ }
+
+ @Override
+ public boolean isPointPredicate(MultiComparator originalKeyComparator)
throws HyracksDataException {
+ return true;
+ }
+
+ public boolean hasNext() {
+ return accessor != null && keyIndex < accessor.getTupleCount() - 1;
+ }
+
+ public void next() {
+ keyIndex++;
+ if (isValid()) {
+ keyTuple.reset(accessor, keyIndex);
+ if (minFilterTuple != null) {
+ minFilterTuple.reset(accessor, keyIndex);
+ }
+ if (maxFilterTuple != null) {
+ maxFilterTuple.reset(accessor, keyIndex);
+ }
+ }
+ }
+
+ public int getKeyIndex() {
+ return keyIndex;
+ }
+
+ public int getNumKeys() {
+ return accessor.getTupleCount();
+ }
+
+}
diff --git
a/hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/main/java/org/apache/hyracks/storage/am/btree/impls/DiskBTree.java
b/hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/main/java/org/apache/hyracks/storage/am/btree/impls/DiskBTree.java
index 6459471..ae6bbaa 100644
---
a/hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/main/java/org/apache/hyracks/storage/am/btree/impls/DiskBTree.java
+++
b/hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/main/java/org/apache/hyracks/storage/am/btree/impls/DiskBTree.java
@@ -38,6 +38,7 @@ import org.apache.hyracks.storage.common.IIndexCursorStats;
import org.apache.hyracks.storage.common.ISearchPredicate;
import org.apache.hyracks.storage.common.MultiComparator;
import org.apache.hyracks.storage.common.NoOpIndexCursorStats;
+import org.apache.hyracks.storage.common.buffercache.BufferCache;
import org.apache.hyracks.storage.common.buffercache.IBufferCache;
import org.apache.hyracks.storage.common.buffercache.ICachedPage;
import org.apache.hyracks.storage.common.file.BufferedFileHandle;
@@ -77,7 +78,6 @@ public class DiskBTree extends BTree {
ctx.reset();
ctx.setPred((RangePredicate) searchPred);
ctx.setCursor(cursor);
- // simple index scan
if (ctx.getPred().getLowKeyComparator() == null) {
ctx.getPred().setLowKeyComparator(ctx.getCmp());
}
@@ -87,87 +87,52 @@ public class DiskBTree extends BTree {
cursor.setBufferCache(bufferCache);
cursor.setFileId(getFileId());
- DiskBTreeRangeSearchCursor diskCursor = (DiskBTreeRangeSearchCursor)
cursor;
-
- if (diskCursor.numSearchPages() == 0) {
- // we have to search from root to leaf
- ICachedPage rootNode =
bufferCache.pin(BufferedFileHandle.getDiskPageId(getFileId(), rootPage), false);
- diskCursor.addSearchPage(rootPage);
- searchDown(rootNode, rootPage, ctx, diskCursor);
- } else {
- // we first check whether the leaf page matches because page may
be shifted during cursor.hasNext
- if (ctx.getLeafFrame().getPage() != diskCursor.getPage()) {
- ctx.getLeafFrame().setPage(diskCursor.getPage());
- ctx.getCursorInitialState().setPage(diskCursor.getPage());
- ctx.getCursorInitialState().setPageId(diskCursor.getPageId());
- }
-
- if (fitInPage(ctx.getPred().getLowKey(),
ctx.getPred().getLowKeyComparator(), ctx.getLeafFrame())) {
- // the input still falls into the previous search leaf
- diskCursor.open(ctx.getCursorInitialState(), searchPred);
- } else {
- // unpin the previous leaf page
- bufferCache.unpin(ctx.getLeafFrame().getPage());
- diskCursor.removeLastSearchPage();
-
- ICachedPage page = searchUp(ctx, diskCursor);
- int pageId = diskCursor.getLastSearchPage();
-
- searchDown(page, pageId, ctx, diskCursor);
+ if (cursor instanceof DiskBTreePointSearchCursor) {
+ DiskBTreePointSearchCursor pointCursor =
(DiskBTreePointSearchCursor) cursor;
+ int lastPageId = pointCursor.getLastPageId();
+ if (lastPageId != BufferCache.INVALID_PAGEID) {
+ // check whether the last leaf page contains this key
+ ICachedPage lastPage =
+
bufferCache.pin(BufferedFileHandle.getDiskPageId(getFileId(), lastPageId),
false);
+ ctx.getLeafFrame().setPage(lastPage);
+ if (fitInPage(ctx.getPred().getLowKey(),
ctx.getPred().getLowKeyComparator(), ctx.getLeafFrame())) {
+ // use this page
+ ctx.getCursorInitialState().setPage(lastPage);
+ ctx.getCursorInitialState().setPageId(lastPageId);
+ pointCursor.open(ctx.getCursorInitialState(), searchPred);
+ return;
+ } else {
+ // release the last page and clear the states of this
cursor
+ // then retry the search from root to leaf
+ bufferCache.unpin(lastPage);
+ pointCursor.clearSearchState();
+ }
}
}
- }
-
- private ICachedPage searchUp(BTreeOpContext ctx,
DiskBTreeRangeSearchCursor cursor) throws HyracksDataException {
- int index = cursor.numSearchPages() - 1;
- // no need to check root page
- for (; index >= 0; index--) {
- int pageId = cursor.getLastSearchPage();
- ICachedPage page =
bufferCache.pin(BufferedFileHandle.getDiskPageId(getFileId(), pageId), false);
- ctx.getInteriorFrame().setPage(page);
- if (index == 0 || fitInPage(ctx.getPred().getLowKey(),
ctx.getPred().getLowKeyComparator(),
- ctx.getInteriorFrame())) {
- // we've found the right page
- return page;
- } else {
- // unpin the current page
- bufferCache.unpin(page);
- cursor.removeLastSearchPage();
- }
- }
-
- // if no page is available (which is the case for single-level BTree)
- // we simply return the root page
- ICachedPage page =
bufferCache.pin(BufferedFileHandle.getDiskPageId(getFileId(), rootPage), false);
- cursor.addSearchPage(rootPage);
- return page;
+ ICachedPage rootNode =
bufferCache.pin(BufferedFileHandle.getDiskPageId(getFileId(), rootPage), false);
+ searchDown(rootNode, rootPage, ctx, cursor);
}
private boolean fitInPage(ITupleReference key, MultiComparator comparator,
IBTreeFrame frame)
throws HyracksDataException {
+ // assume that search keys are sorted (non-decreasing)
ITupleReference rightmostTuple = frame.getRightmostTuple();
int cmp = comparator.compare(key, rightmostTuple);
- if (cmp > 0) {
- return false;
- }
- ITupleReference leftmostTuple = frame.getLeftmostTuple();
- return comparator.compare(key, leftmostTuple) >= 0;
+ return cmp <= 0;
}
- private void searchDown(ICachedPage page, int pageId, BTreeOpContext ctx,
DiskBTreeRangeSearchCursor cursor)
+ private void searchDown(ICachedPage page, int pageId, BTreeOpContext ctx,
ITreeIndexCursor cursor)
throws HyracksDataException {
ICachedPage currentPage = page;
ctx.getInteriorFrame().setPage(currentPage);
-
try {
int childPageId = pageId;
while (!ctx.getInteriorFrame().isLeaf()) {
// walk down the tree until we find the leaf
childPageId =
ctx.getInteriorFrame().getChildPageId(ctx.getPred());
-
- // save the child page tuple index
- cursor.addSearchPage(childPageId);
bufferCache.unpin(currentPage);
+ pageId = childPageId;
+
currentPage =
bufferCache.pin(BufferedFileHandle.getDiskPageId(getFileId(), childPageId),
false);
ctx.getInteriorFrame().setPage(currentPage);
}
@@ -233,9 +198,9 @@ public class DiskBTree extends BTree {
}
@Override
- public BTreeRangeSearchCursor createPointCursor(boolean exclusive) {
+ public BTreeRangeSearchCursor createPointCursor(boolean exclusive,
boolean stateful) {
IBTreeLeafFrame leafFrame = (IBTreeLeafFrame)
btree.getLeafFrameFactory().createFrame();
- return new DiskBTreePointSearchCursor(leafFrame, exclusive);
+ return new DiskBTreePointSearchCursor(leafFrame, exclusive,
stateful);
}
@Override
diff --git
a/hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/main/java/org/apache/hyracks/storage/am/btree/impls/DiskBTreePointSearchCursor.java
b/hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/main/java/org/apache/hyracks/storage/am/btree/impls/DiskBTreePointSearchCursor.java
index 7814e60..1bf3ecf 100644
---
a/hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/main/java/org/apache/hyracks/storage/am/btree/impls/DiskBTreePointSearchCursor.java
+++
b/hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/main/java/org/apache/hyracks/storage/am/btree/impls/DiskBTreePointSearchCursor.java
@@ -25,13 +25,27 @@ import
org.apache.hyracks.storage.am.common.ophelpers.FindTupleMode;
import
org.apache.hyracks.storage.am.common.ophelpers.FindTupleNoExactMatchPolicy;
import org.apache.hyracks.storage.common.ICursorInitialState;
import org.apache.hyracks.storage.common.ISearchPredicate;
+import org.apache.hyracks.storage.common.buffercache.BufferCache;
public class DiskBTreePointSearchCursor extends DiskBTreeRangeSearchCursor {
+ /**
+ * A stateful cursor keeps the search state (last search page Id + index)
across multiple searches
+ * until {@link #clearSearchState()} is called explicity
+ */
+ private final boolean stateful;
private boolean nextHasBeenCalled;
- public DiskBTreePointSearchCursor(IBTreeLeafFrame frame, boolean
exclusiveLatchNodes) {
+ private int lastPageId = BufferCache.INVALID_PAGEID;
+ private int lastTupleIndex = 0;
+
+ public DiskBTreePointSearchCursor(IBTreeLeafFrame frame, boolean
exclusiveLatchNodes, boolean stateful) {
super(frame, exclusiveLatchNodes);
+ this.stateful = stateful;
+ }
+
+ public DiskBTreePointSearchCursor(IBTreeLeafFrame frame, boolean
exclusiveLatchNodes) {
+ this(frame, exclusiveLatchNodes, false);
}
@Override
@@ -71,6 +85,32 @@ public class DiskBTreePointSearchCursor extends
DiskBTreeRangeSearchCursor {
// only get the low key position
tupleIndex = getLowKeyIndex();
+ if (stateful) {
+ lastPageId = pageId;
+ if (tupleIndex >= 0) {
+ lastTupleIndex = tupleIndex;
+ } else {
+ lastTupleIndex = -tupleIndex - 1;
+ }
+ }
+ }
+
+ public int getLastPageId() {
+ return lastPageId;
+ }
+
+ @Override
+ protected int getLowKeyIndex() throws HyracksDataException {
+ if (stateful) {
+ return frame.findTupleIndex(lowKey, frameTuple, lowKeyCmp,
lastTupleIndex);
+ } else {
+ return super.getLowKeyIndex();
+ }
+ }
+
+ public void clearSearchState() {
+ this.lastPageId = BufferCache.INVALID_PAGEID;
+ this.lastTupleIndex = 0;
}
}
diff --git
a/hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/main/java/org/apache/hyracks/storage/am/btree/impls/DiskBTreeRangeSearchCursor.java
b/hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/main/java/org/apache/hyracks/storage/am/btree/impls/DiskBTreeRangeSearchCursor.java
index d26378b..d788398 100644
---
a/hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/main/java/org/apache/hyracks/storage/am/btree/impls/DiskBTreeRangeSearchCursor.java
+++
b/hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/main/java/org/apache/hyracks/storage/am/btree/impls/DiskBTreeRangeSearchCursor.java
@@ -19,25 +19,20 @@
package org.apache.hyracks.storage.am.btree.impls;
-import java.util.ArrayList;
-import java.util.List;
-
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.storage.am.btree.api.IBTreeLeafFrame;
import org.apache.hyracks.storage.common.IIndexCursorStats;
+import org.apache.hyracks.storage.common.NoOpIndexCursorStats;
import org.apache.hyracks.storage.common.buffercache.ICachedPage;
import org.apache.hyracks.storage.common.file.BufferedFileHandle;
public class DiskBTreeRangeSearchCursor extends BTreeRangeSearchCursor {
- // keep track of the pages (root -> leaf) we've searched
- protected final List<Integer> searchPages = new ArrayList<>(5);
-
public DiskBTreeRangeSearchCursor(IBTreeLeafFrame frame, boolean
exclusiveLatchNodes) {
- super(frame, exclusiveLatchNodes);
+ this(frame, exclusiveLatchNodes, NoOpIndexCursorStats.INSTANCE);
}
- public DiskBTreeRangeSearchCursor(IBTreeLeafFrame frame, boolean
exclusiveLatchNodes, IIndexCursorStats stats) {
+ protected DiskBTreeRangeSearchCursor(IBTreeLeafFrame frame, boolean
exclusiveLatchNodes, IIndexCursorStats stats) {
super(frame, exclusiveLatchNodes, stats);
}
@@ -50,7 +45,6 @@ public class DiskBTreeRangeSearchCursor extends
BTreeRangeSearchCursor {
fetchNextLeafPage(nextLeafPage);
tupleIndex = 0;
// update page ids and positions
- searchPages.set(searchPages.size() - 1, nextLeafPage);
stopTupleIndex = getHighKeyIndex();
if (stopTupleIndex < 0) {
return false;
@@ -67,28 +61,6 @@ public class DiskBTreeRangeSearchCursor extends
BTreeRangeSearchCursor {
return true;
}
- @Override
- protected void resetBeforeOpen() throws HyracksDataException {
- // do nothing
- // we allow a disk btree range cursor be stateful, that is, the next
search can be based on the previous search
- }
-
- public int numSearchPages() {
- return searchPages.size();
- }
-
- public void addSearchPage(int page) {
- searchPages.add(page);
- }
-
- public int getLastSearchPage() {
- return searchPages.get(searchPages.size() - 1);
- }
-
- public int removeLastSearchPage() {
- return searchPages.remove(searchPages.size() - 1);
- }
-
public ICachedPage getPage() {
return page;
}
@@ -104,9 +76,4 @@ public class DiskBTreeRangeSearchCursor extends
BTreeRangeSearchCursor {
return bufferCache.pin(BufferedFileHandle.getDiskPageId(fileId,
pageId), false);
}
- @Override
- public void doClose() throws HyracksDataException {
- super.doClose();
- searchPages.clear();
- }
}
diff --git
a/hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/main/java/org/apache/hyracks/storage/am/btree/impls/FieldPrefixSlotManager.java
b/hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/main/java/org/apache/hyracks/storage/am/btree/impls/FieldPrefixSlotManager.java
index cd0a2d3..86ebe03 100644
---
a/hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/main/java/org/apache/hyracks/storage/am/btree/impls/FieldPrefixSlotManager.java
+++
b/hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/main/java/org/apache/hyracks/storage/am/btree/impls/FieldPrefixSlotManager.java
@@ -44,19 +44,23 @@ public class FieldPrefixSlotManager implements
IPrefixSlotManager {
private BTreeFieldPrefixNSMLeafFrame frame;
private MultiComparator cmp;
+ @Override
public int decodeFirstSlotField(int slot) {
return (slot & 0xFF000000) >>> 24;
}
+ @Override
public int decodeSecondSlotField(int slot) {
return slot & 0x00FFFFFF;
}
+ @Override
public int encodeSlotFields(int firstField, int secondField) {
return ((firstField & 0x000000FF) << 24) | (secondField & 0x00FFFFFF);
}
// returns prefix slot number, or TUPLE_UNCOMPRESSED of no match was found
+ @Override
public int findPrefix(ITupleReference tuple, ITreeIndexTupleReference
framePrefixTuple)
throws HyracksDataException {
int prefixMid;
@@ -194,30 +198,37 @@ public class FieldPrefixSlotManager implements
IPrefixSlotManager {
}
}
+ @Override
public int getPrefixSlotStartOff() {
return buf.capacity() - slotSize;
}
+ @Override
public int getPrefixSlotEndOff() {
return buf.capacity() - slotSize * frame.getPrefixTupleCount();
}
+ @Override
public int getTupleSlotStartOff() {
return getPrefixSlotEndOff() - slotSize;
}
+ @Override
public int getTupleSlotEndOff() {
return buf.capacity() - slotSize * (frame.getPrefixTupleCount() +
frame.getTupleCount());
}
+ @Override
public int getSlotSize() {
return slotSize;
}
+ @Override
public void setSlot(int offset, int value) {
frame.getBuffer().putInt(offset, value);
}
+ @Override
public int insertSlot(int slot, int tupleOff) {
int slotNum = decodeSecondSlotField(slot);
if (slotNum == ERROR_INDICATOR) {
@@ -241,14 +252,17 @@ public class FieldPrefixSlotManager implements
IPrefixSlotManager {
}
}
+ @Override
public int getPrefixSlotOff(int tupleIndex) {
return getPrefixSlotStartOff() - tupleIndex * slotSize;
}
+ @Override
public int getTupleSlotOff(int tupleIndex) {
return getTupleSlotStartOff() - tupleIndex * slotSize;
}
+ @Override
public void setPrefixSlot(int tupleIndex, int slot) {
buf.putInt(getPrefixSlotOff(tupleIndex), slot);
}
@@ -276,6 +290,12 @@ public class FieldPrefixSlotManager implements
IPrefixSlotManager {
}
@Override
+ public int findTupleIndex(ITupleReference searchKey,
ITreeIndexTupleReference frameTuple, MultiComparator multiCmp,
+ int startIndex) throws HyracksDataException {
+ throw new UnsupportedOperationException("Not implemented.");
+ }
+
+ @Override
public int getSlotStartOff() {
throw new UnsupportedOperationException("Not implemented.");
}
@@ -295,7 +315,9 @@ public class FieldPrefixSlotManager implements
IPrefixSlotManager {
throw new UnsupportedOperationException("Not implemented.");
}
+ @Override
public void setMultiComparator(MultiComparator cmp) {
this.cmp = cmp;
}
+
}
diff --git
a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/api/ISlotManager.java
b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/api/ISlotManager.java
index cd58387..3da722d 100644
---
a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/api/ISlotManager.java
+++
b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/api/ISlotManager.java
@@ -29,6 +29,9 @@ public interface ISlotManager {
public int findTupleIndex(ITupleReference searchKey,
ITreeIndexTupleReference frameTuple, MultiComparator multiCmp,
FindTupleMode mode, FindTupleNoExactMatchPolicy matchPolicy)
throws HyracksDataException;
+ public int findTupleIndex(ITupleReference searchKey,
ITreeIndexTupleReference frameTuple, MultiComparator multiCmp,
+ int startIndex) throws HyracksDataException;
+
public int getGreatestKeyIndicator();
public int getErrorIndicator();
diff --git
a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexSearchOperatorNodePushable.java
b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexSearchOperatorNodePushable.java
index 142e879..fae0d75 100644
---
a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexSearchOperatorNodePushable.java
+++
b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexSearchOperatorNodePushable.java
@@ -92,7 +92,7 @@ public abstract class IndexSearchOperatorNodePushable extends
AbstractUnaryInput
protected ArrayTupleBuilder nonFilterTupleBuild;
protected final ISearchOperationCallbackFactory searchCallbackFactory;
protected boolean failed = false;
- private IOperatorStats stats;
+ protected IOperatorStats stats;
// Used when the result of the search operation callback needs to be
passed.
protected boolean appendSearchCallbackProceedResult;
@@ -341,7 +341,7 @@ public abstract class IndexSearchOperatorNodePushable
extends AbstractUnaryInput
}
}
- private void writeTupleToOutput(ITupleReference tuple) throws IOException {
+ protected void writeTupleToOutput(ITupleReference tuple) throws
IOException {
try {
for (int i = 0; i < tuple.getFieldCount(); i++) {
dos.write(tuple.getFieldData(i), tuple.getFieldStart(i),
tuple.getFieldLength(i));
@@ -393,7 +393,7 @@ public abstract class IndexSearchOperatorNodePushable
extends AbstractUnaryInput
* is used by ITupleFilter
*
*/
- private static class ReferenceFrameTupleReference implements
IFrameTupleReference {
+ protected static class ReferenceFrameTupleReference implements
IFrameTupleReference {
private ITupleReference tuple;
public IFrameTupleReference reset(ITupleReference tuple) {
diff --git
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/LSMBTreeBatchPointSearchOperatorDescriptor.java
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/LSMBTreeBatchPointSearchOperatorDescriptor.java
new file mode 100644
index 0000000..23be280
--- /dev/null
+++
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/LSMBTreeBatchPointSearchOperatorDescriptor.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hyracks.storage.am.lsm.btree.dataflow;
+
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.dataflow.value.IMissingWriterFactory;
+import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider;
+import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.job.IOperatorDescriptorRegistry;
+import
org.apache.hyracks.storage.am.btree.dataflow.BTreeSearchOperatorDescriptor;
+import
org.apache.hyracks.storage.am.common.api.ISearchOperationCallbackFactory;
+import org.apache.hyracks.storage.am.common.api.ITupleFilterFactory;
+import
org.apache.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
+
+public class LSMBTreeBatchPointSearchOperatorDescriptor extends
BTreeSearchOperatorDescriptor {
+ private static final long serialVersionUID = 1L;
+
+ public
LSMBTreeBatchPointSearchOperatorDescriptor(IOperatorDescriptorRegistry spec,
RecordDescriptor outRecDesc,
+ int[] lowKeyFields, int[] highKeyFields, boolean lowKeyInclusive,
boolean highKeyInclusive,
+ IIndexDataflowHelperFactory indexHelperFactory, boolean
retainInput, boolean retainMissing,
+ IMissingWriterFactory missingWriterFactory,
ISearchOperationCallbackFactory searchCallbackFactory,
+ int[] minFilterFieldIndexes, int[] maxFilterFieldIndexes,
ITupleFilterFactory tupleFilterFactory,
+ long outputLimit) {
+ super(spec, outRecDesc, lowKeyFields, highKeyFields, lowKeyInclusive,
highKeyInclusive, indexHelperFactory,
+ retainInput, retainMissing, missingWriterFactory,
searchCallbackFactory, minFilterFieldIndexes,
+ maxFilterFieldIndexes, false, tupleFilterFactory, outputLimit,
false, null, null);
+ }
+
+ @Override
+ public LSMBTreeBatchPointSearchOperatorNodePushable
createPushRuntime(IHyracksTaskContext ctx,
+ IRecordDescriptorProvider recordDescProvider, int partition, int
nPartitions) throws HyracksDataException {
+ return new LSMBTreeBatchPointSearchOperatorNodePushable(ctx, partition,
+ recordDescProvider.getInputRecordDescriptor(getActivityId(),
0), lowKeyFields, highKeyFields,
+ lowKeyInclusive, highKeyInclusive, minFilterFieldIndexes,
maxFilterFieldIndexes, indexHelperFactory,
+ retainInput, retainMissing, missingWriterFactory,
searchCallbackFactory, tupleFilterFactory,
+ outputLimit);
+ }
+
+}
diff --git
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/LSMBTreeBatchPointSearchOperatorNodePushable.java
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/LSMBTreeBatchPointSearchOperatorNodePushable.java
new file mode 100644
index 0000000..596f4b0
--- /dev/null
+++
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/LSMBTreeBatchPointSearchOperatorNodePushable.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.storage.am.lsm.btree.dataflow;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.dataflow.value.IMissingWriterFactory;
+import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.dataflow.common.comm.util.FrameUtils;
+import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
+import
org.apache.hyracks.storage.am.btree.dataflow.BTreeSearchOperatorNodePushable;
+import org.apache.hyracks.storage.am.btree.impls.BatchPredicate;
+import org.apache.hyracks.storage.am.btree.util.BTreeUtils;
+import
org.apache.hyracks.storage.am.common.api.ISearchOperationCallbackFactory;
+import org.apache.hyracks.storage.am.common.api.ITreeIndex;
+import org.apache.hyracks.storage.am.common.api.ITupleFilterFactory;
+import
org.apache.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
+import
org.apache.hyracks.storage.am.lsm.btree.impls.LSMBTreeBatchPointSearchCursor;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor;
+import org.apache.hyracks.storage.common.IIndexCursor;
+import org.apache.hyracks.storage.common.ISearchPredicate;
+
+public class LSMBTreeBatchPointSearchOperatorNodePushable extends
BTreeSearchOperatorNodePushable {
+
+ private final int[] keyFields;
+
+ public LSMBTreeBatchPointSearchOperatorNodePushable(IHyracksTaskContext
ctx, int partition,
+ RecordDescriptor inputRecDesc, int[] lowKeyFields, int[]
highKeyFields, boolean lowKeyInclusive,
+ boolean highKeyInclusive, int[] minFilterKeyFields, int[]
maxFilterKeyFields,
+ IIndexDataflowHelperFactory indexHelperFactory, boolean
retainInput, boolean retainMissing,
+ IMissingWriterFactory missingWriterFactory,
ISearchOperationCallbackFactory searchCallbackFactory,
+ ITupleFilterFactory tupleFilterFactory, long outputLimit) throws
HyracksDataException {
+ super(ctx, partition, inputRecDesc, lowKeyFields, highKeyFields,
lowKeyInclusive, highKeyInclusive,
+ minFilterKeyFields, maxFilterKeyFields, indexHelperFactory,
retainInput, retainMissing,
+ missingWriterFactory, searchCallbackFactory, false,
tupleFilterFactory, outputLimit, false, null, null);
+ this.keyFields = lowKeyFields;
+ }
+
+ @Override
+ protected IIndexCursor createCursor() throws HyracksDataException {
+ ILSMIndexAccessor lsmAccessor = (ILSMIndexAccessor) indexAccessor;
+ return new LSMBTreeBatchPointSearchCursor(lsmAccessor.getOpContext());
+ }
+
+ @Override
+ protected ISearchPredicate createSearchPredicate() {
+ ITreeIndex treeIndex = (ITreeIndex) index;
+ lowKeySearchCmp =
+ highKeySearchCmp =
BTreeUtils.getSearchMultiComparator(treeIndex.getComparatorFactories(), lowKey);
+ return new BatchPredicate(accessor, lowKeySearchCmp, keyFields,
minFilterFieldIndexes, maxFilterFieldIndexes);
+ }
+
+ @Override
+ public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+ accessor.reset(buffer);
+ if (accessor.getTupleCount() > 0) {
+ BatchPredicate batchPred = (BatchPredicate) searchPred;
+ batchPred.reset(accessor);
+ try {
+ indexAccessor.search(cursor, batchPred);
+ writeSearchResults();
+ } catch (IOException e) {
+ throw HyracksDataException.create(e);
+ } finally {
+ cursor.close();
+ }
+ }
+ }
+
+ protected void writeSearchResults() throws IOException {
+ long matchingTupleCount = 0;
+ LSMBTreeBatchPointSearchCursor batchCursor =
(LSMBTreeBatchPointSearchCursor) cursor;
+ int tupleIndex = 0;
+ while (cursor.hasNext()) {
+ cursor.next();
+ matchingTupleCount++;
+ ITupleReference tuple = cursor.getTuple();
+ if (tupleFilter != null &&
!tupleFilter.accept(referenceFilterTuple.reset(tuple))) {
+ continue;
+ }
+ tb.reset();
+
+ if (retainInput && retainMissing) {
+ appendMissingTuple(tupleIndex, batchCursor.getKeyIndex());
+ }
+
+ tupleIndex = batchCursor.getKeyIndex();
+
+ if (retainInput) {
+ frameTuple.reset(accessor, tupleIndex);
+ for (int i = 0; i < frameTuple.getFieldCount(); i++) {
+ dos.write(frameTuple.getFieldData(i),
frameTuple.getFieldStart(i), frameTuple.getFieldLength(i));
+ tb.addFieldEndOffset();
+ }
+ }
+ writeTupleToOutput(tuple);
+ FrameUtils.appendToWriter(writer, appender,
tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize());
+ if (outputLimit >= 0 && ++outputCount >= outputLimit) {
+ finished = true;
+ break;
+ }
+ }
+ stats.getTupleCounter().update(matchingTupleCount);
+
+ }
+
+ private void appendMissingTuple(int start, int end) throws
HyracksDataException {
+ for (int i = start; i < end; i++) {
+ FrameUtils.appendConcatToWriter(writer, appender, accessor, i,
nonMatchTupleBuild.getFieldEndOffsets(),
+ nonMatchTupleBuild.getByteArray(), 0,
nonMatchTupleBuild.getSize());
+ }
+ }
+
+}
diff --git
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeBatchPointSearchCursor.java
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeBatchPointSearchCursor.java
new file mode 100644
index 0000000..8ab6fb1
--- /dev/null
+++
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeBatchPointSearchCursor.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hyracks.storage.am.lsm.btree.impls;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
+import org.apache.hyracks.storage.am.btree.impls.BatchPredicate;
+import org.apache.hyracks.storage.am.btree.impls.DiskBTreePointSearchCursor;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexOperationContext;
+
+/**
+ * This cursor performs point searches for each batch of search keys.
+ * Assumption: the search keys must be sorted into the increasing order.
+ *
+ */
+public class LSMBTreeBatchPointSearchCursor extends LSMBTreePointSearchCursor {
+
+ public LSMBTreeBatchPointSearchCursor(ILSMIndexOperationContext opCtx) {
+ super(opCtx);
+ }
+
+ @Override
+ public boolean doHasNext() throws HyracksDataException {
+ BatchPredicate batchPred = (BatchPredicate) predicate;
+ while (!foundTuple && batchPred.hasNext()) {
+ batchPred.next();
+ if (foundIn >= 0) {
+ btreeCursors[foundIn].close();
+ foundIn = -1;
+ }
+ foundTuple = super.doHasNext();
+ }
+ return foundTuple;
+ }
+
+ @Override
+ public void doNext() throws HyracksDataException {
+ foundTuple = false;
+ }
+
+ @Override
+ protected boolean isSearchCandidate(int componentIndex) throws
HyracksDataException {
+ if (!super.isSearchCandidate(componentIndex)) {
+ return false;
+ }
+ // check filters
+ ITupleReference minFilterKey = predicate.getMinFilterTuple();
+ ITupleReference maxFileterKey = predicate.getMaxFilterTuple();
+ boolean filtered = minFilterKey != null && maxFileterKey != null;
+ return !filtered ||
operationalComponents.get(componentIndex).getLSMComponentFilter().satisfy(minFilterKey,
+ maxFileterKey, opCtx.getFilterCmp());
+ }
+
+ @Override
+ protected void closeCursors() throws HyracksDataException {
+ super.closeCursors();
+ if (btreeCursors != null) {
+ // clear search states of btree cursors
+ for (int i = 0; i < numBTrees; ++i) {
+ if (btreeCursors[i] != null) {
+ if (btreeCursors[i] instanceof DiskBTreePointSearchCursor)
{
+ ((DiskBTreePointSearchCursor)
btreeCursors[i]).clearSearchState();
+ }
+ }
+ }
+ }
+ }
+
+ public int getKeyIndex() {
+ return ((BatchPredicate) predicate).getKeyIndex();
+ }
+
+}
diff --git
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreePointSearchCursor.java
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreePointSearchCursor.java
index c376262..d4903d9 100644
---
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreePointSearchCursor.java
+++
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreePointSearchCursor.java
@@ -44,23 +44,24 @@ import org.apache.hyracks.storage.common.ISearchPredicate;
public class LSMBTreePointSearchCursor extends EnforcedIndexCursor implements
ILSMIndexCursor {
- private ITreeIndexCursor[] btreeCursors;
- private final ILSMIndexOperationContext opCtx;
- private ISearchOperationCallback searchCallback;
- private RangePredicate predicate;
- private boolean includeMutableComponent;
- private int numBTrees;
+ protected ITreeIndexCursor[] btreeCursors;
+ protected final ILSMIndexOperationContext opCtx;
+ protected ISearchOperationCallback searchCallback;
+ protected RangePredicate predicate;
+ protected boolean includeMutableComponent;
+ protected int numBTrees;
private BTreeAccessor[] btreeAccessors;
- private BloomFilter[] bloomFilters;
- private ILSMHarness lsmHarness;
+ protected BloomFilter[] bloomFilters;
+ protected ILSMHarness lsmHarness;
private boolean nextHasBeenCalled;
- private boolean foundTuple;
- private int foundIn = -1;
- private ITupleReference frameTuple;
- private List<ILSMComponent> operationalComponents;
- private boolean resultOfSearchCallbackProceed = false;
+ protected boolean foundTuple;
+ protected int foundIn = -1;
+ protected ITupleReference frameTuple;
+ protected List<ILSMComponent> operationalComponents;
+ protected boolean resultOfSearchCallbackProceed = false;
- private final long[] hashes = BloomFilter.createHashArray();
+ protected final long[] hashes = BloomFilter.createHashArray();
+ protected boolean hashComputed = false;
public LSMBTreePointSearchCursor(ILSMIndexOperationContext opCtx) {
this.opCtx = opCtx;
@@ -73,9 +74,10 @@ public class LSMBTreePointSearchCursor extends
EnforcedIndexCursor implements IL
} else if (foundTuple) {
return true;
}
+ hashComputed = false;
boolean reconciled = false;
for (int i = 0; i < numBTrees; ++i) {
- if (bloomFilters[i] != null &&
!bloomFilters[i].contains(predicate.getLowKey(), hashes)) {
+ if (!isSearchCandidate(i)) {
continue;
}
btreeAccessors[i].search(btreeCursors[i], predicate);
@@ -141,12 +143,27 @@ public class LSMBTreePointSearchCursor extends
EnforcedIndexCursor implements IL
return false;
}
+ protected boolean isSearchCandidate(int componentIndex) throws
HyracksDataException {
+ if (bloomFilters[componentIndex] != null) {
+ if (!hashComputed) {
+ // all bloom filters share the same hash function
+ // only compute it once for better performance
+
bloomFilters[componentIndex].computeHashes(predicate.getLowKey(), hashes);
+ hashComputed = true;
+ }
+ return bloomFilters[componentIndex].contains(hashes);
+ } else {
+ return true;
+ }
+ }
+
@Override
public void doClose() throws HyracksDataException {
try {
closeCursors();
nextHasBeenCalled = false;
foundTuple = false;
+ hashComputed = false;
} finally {
if (lsmHarness != null) {
lsmHarness.endSearch(opCtx);
@@ -196,7 +213,7 @@ public class LSMBTreePointSearchCursor extends
EnforcedIndexCursor implements IL
if (btreeAccessors[i] == null) {
btreeAccessors[i] =
btree.createAccessor(NoOpIndexAccessParameters.INSTANCE);
- btreeCursors[i] = btreeAccessors[i].createPointCursor(false);
+ btreeCursors[i] = btreeAccessors[i].createPointCursor(false,
false);
} else {
// re-use
btreeAccessors[i].reset(btree,
NoOpIndexAccessParameters.INSTANCE);
@@ -205,6 +222,7 @@ public class LSMBTreePointSearchCursor extends
EnforcedIndexCursor implements IL
}
nextHasBeenCalled = false;
foundTuple = false;
+ hashComputed = false;
}
private void destroyAndNullifyCursorAtIndex(int i) throws
HyracksDataException {
@@ -259,7 +277,7 @@ public class LSMBTreePointSearchCursor extends
EnforcedIndexCursor implements IL
return null;
}
- private void closeCursors() throws HyracksDataException {
+ protected void closeCursors() throws HyracksDataException {
if (btreeCursors != null) {
for (int i = 0; i < numBTrees; ++i) {
if (btreeCursors[i] != null) {
diff --git
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeAbstractCursor.java
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeAbstractCursor.java
index c2644c1..b7eb115 100644
---
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeAbstractCursor.java
+++
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeAbstractCursor.java
@@ -132,7 +132,7 @@ public abstract class LSMRTreeAbstractCursor extends
EnforcedIndexCursor impleme
}
if (btreeCursors[i] == null) {
// need to create a new one
- btreeCursors[i] = btreeAccessors[i].createPointCursor(false);
+ btreeCursors[i] = btreeAccessors[i].createPointCursor(false,
false);
} else {
// close
btreeCursors[i].close();
diff --git
a/hyracks-fullstack/hyracks/hyracks-storage-am-rtree/src/main/java/org/apache/hyracks/storage/am/rtree/impls/UnorderedSlotManager.java
b/hyracks-fullstack/hyracks/hyracks-storage-am-rtree/src/main/java/org/apache/hyracks/storage/am/rtree/impls/UnorderedSlotManager.java
index 22ca4b9..69faad3 100644
---
a/hyracks-fullstack/hyracks/hyracks-storage-am-rtree/src/main/java/org/apache/hyracks/storage/am/rtree/impls/UnorderedSlotManager.java
+++
b/hyracks-fullstack/hyracks/hyracks-storage-am-rtree/src/main/java/org/apache/hyracks/storage/am/rtree/impls/UnorderedSlotManager.java
@@ -90,6 +90,12 @@ public class UnorderedSlotManager extends
AbstractSlotManager {
}
@Override
+ public int findTupleIndex(ITupleReference searchKey,
ITreeIndexTupleReference pageTuple, MultiComparator cmp,
+ int startIndex) {
+ throw new UnsupportedOperationException("Stateful search is not
supported by UnorderedSlotManager");
+ }
+
+ @Override
public int insertSlot(int tupleIndex, int tupleOff) {
int slotOff = getSlotEndOff() - slotSize;
setSlot(slotOff, tupleOff);
diff --git
a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-btree-test/src/test/java/org/apache/hyracks/storage/am/btree/DiskBTreePointSearchCursorTest.java
b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-btree-test/src/test/java/org/apache/hyracks/storage/am/btree/DiskBTreePointSearchCursorTest.java
index c2a69e1..2210372 100644
---
a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-btree-test/src/test/java/org/apache/hyracks/storage/am/btree/DiskBTreePointSearchCursorTest.java
+++
b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-btree-test/src/test/java/org/apache/hyracks/storage/am/btree/DiskBTreePointSearchCursorTest.java
@@ -101,6 +101,6 @@ public class DiskBTreePointSearchCursorTest extends
IIndexCursorTest {
@Override
protected IIndexCursor createCursor(IIndexAccessor accessor) {
- return ((BTreeAccessor) accessor).createPointCursor(false);
+ return ((BTreeAccessor) accessor).createPointCursor(false, false);
}
}
diff --git
a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-btree-test/src/test/java/org/apache/hyracks/storage/am/btree/DiskBTreeSearchCursorTest.java
b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-btree-test/src/test/java/org/apache/hyracks/storage/am/btree/DiskBTreeSearchCursorTest.java
index 15774c9..c240a73 100644
---
a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-btree-test/src/test/java/org/apache/hyracks/storage/am/btree/DiskBTreeSearchCursorTest.java
+++
b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-btree-test/src/test/java/org/apache/hyracks/storage/am/btree/DiskBTreeSearchCursorTest.java
@@ -105,20 +105,22 @@ public class DiskBTreeSearchCursorTest extends
BTreeSearchCursorTest {
insertBTree(keys, btree);
// forward searches
- Assert.assertTrue(performBatchLookups(keys, btree, leafFrame,
interiorFrame, minSearchKey, maxSearchKey));
+ Assert.assertTrue(
+ performBatchLookups(keys, btree, leafFrame, interiorFrame,
minSearchKey, maxSearchKey, false));
+ Assert.assertTrue(performBatchLookups(keys, btree, leafFrame,
interiorFrame, minSearchKey, maxSearchKey, true));
btree.deactivate();
btree.destroy();
}
private boolean performBatchLookups(ArrayList<Integer> keys, BTree btree,
IBTreeLeafFrame leafFrame,
- IBTreeInteriorFrame interiorFrame, int minKey, int maxKey) throws
Exception {
+ IBTreeInteriorFrame interiorFrame, int minKey, int maxKey, boolean
stateful) throws Exception {
ArrayList<Integer> results = new ArrayList<>();
ArrayList<Integer> expectedResults = new ArrayList<>();
BTreeAccessor indexAccessor = btree.createAccessor(
new IndexAccessParameters(TestOperationCallback.INSTANCE,
TestOperationCallback.INSTANCE));
- IIndexCursor pointCursor = indexAccessor.createPointCursor(false);
+ IIndexCursor pointCursor = indexAccessor.createPointCursor(false,
stateful);
try {
for (int i = minKey; i < maxKey; i++) {
results.clear();
@@ -144,18 +146,18 @@ public class DiskBTreeSearchCursorTest extends
BTreeSearchCursorTest {
if (results.size() == expectedResults.size()) {
for (int k = 0; k < results.size(); k++) {
if (!results.get(k).equals(expectedResults.get(k))) {
- if (LOGGER.isInfoEnabled()) {
- LOGGER.info("DIFFERENT RESULTS AT: i=" + i + "
k=" + k);
- LOGGER.info(results.get(k) + " " +
expectedResults.get(k));
+ if (LOGGER.isErrorEnabled()) {
+ LOGGER.error("DIFFERENT RESULTS AT: i=" + i +
" k=" + k);
+ LOGGER.error(results.get(k) + " " +
expectedResults.get(k));
}
return false;
}
}
} else {
- if (LOGGER.isInfoEnabled()) {
- LOGGER.info("UNEQUAL NUMBER OF RESULTS AT: i=" + i);
- LOGGER.info("RESULTS: " + results.size());
- LOGGER.info("EXPECTED RESULTS: " +
expectedResults.size());
+ if (LOGGER.isErrorEnabled()) {
+ LOGGER.error("UNEQUAL NUMBER OF RESULTS AT: i=" + i);
+ LOGGER.error("RESULTS: " + results.size());
+ LOGGER.error("EXPECTED RESULTS: " +
expectedResults.size());
}
return false;
}
diff --git
a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/cursor/LSMBTreeBatchPointSearchCursorTest.java
b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/cursor/LSMBTreeBatchPointSearchCursorTest.java
new file mode 100644
index 0000000..82aeed2
--- /dev/null
+++
b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/cursor/LSMBTreeBatchPointSearchCursorTest.java
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.storage.am.lsm.btree.cursor;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Random;
+
+import org.apache.hyracks.api.comm.IFrame;
+import org.apache.hyracks.api.comm.IFrameTupleAccessor;
+import org.apache.hyracks.api.comm.VSizeFrame;
+import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer;
+import org.apache.hyracks.api.dataflow.value.ITypeTraits;
+import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
+import org.apache.hyracks.api.exceptions.ErrorCode;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.data.std.accessors.IntegerBinaryComparatorFactory;
+import org.apache.hyracks.data.std.primitive.IntegerPointable;
+import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleReference;
+import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender;
+import
org.apache.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
+import org.apache.hyracks.dataflow.common.utils.TupleUtils;
+import org.apache.hyracks.storage.am.btree.impls.BatchPredicate;
+import org.apache.hyracks.storage.am.btree.util.BTreeUtils;
+import org.apache.hyracks.storage.am.common.TestOperationCallback;
+import org.apache.hyracks.storage.am.common.api.ITreeIndex;
+import org.apache.hyracks.storage.am.common.impls.IndexAccessParameters;
+import org.apache.hyracks.storage.am.common.impls.NoOpIndexAccessParameters;
+import org.apache.hyracks.storage.am.common.ophelpers.IndexOperation;
+import org.apache.hyracks.storage.am.common.test.IIndexCursorTest;
+import org.apache.hyracks.storage.am.lsm.btree.LSMBTreeExamplesTest;
+import org.apache.hyracks.storage.am.lsm.btree.impls.LSMBTree;
+import
org.apache.hyracks.storage.am.lsm.btree.impls.LSMBTreeBatchPointSearchCursor;
+import org.apache.hyracks.storage.am.lsm.btree.impls.LSMBTreeOpContext;
+import org.apache.hyracks.storage.am.lsm.btree.util.LSMBTreeTestHarness;
+import org.apache.hyracks.storage.common.IIndexAccessor;
+import org.apache.hyracks.storage.common.IIndexCursor;
+import org.apache.hyracks.storage.common.ISearchPredicate;
+import org.apache.hyracks.storage.common.MultiComparator;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+
+public class LSMBTreeBatchPointSearchCursorTest extends IIndexCursorTest {
+ public static final int FIELD_COUNT = 2;
+ public static final ITypeTraits[] TYPE_TRAITS = {
IntegerPointable.TYPE_TRAITS, IntegerPointable.TYPE_TRAITS };
+ @SuppressWarnings("rawtypes")
+ public static final ISerializerDeserializer[] FIELD_SERDES =
+ { IntegerSerializerDeserializer.INSTANCE,
IntegerSerializerDeserializer.INSTANCE };
+ public static final int KEY_FIELD_COUNT = 1;
+ public static final IBinaryComparatorFactory[] CMP_FACTORIES = {
IntegerBinaryComparatorFactory.INSTANCE };
+ public static final int[] BLOOM_FILTER_KEY_FIELDS = { 0 };
+ public static final Random RND = new Random(50);
+
+ private static final LSMBTreeTestHarness harness = new
LSMBTreeTestHarness();
+ private static LSMBTree lsmBtree;
+ private static LSMBTreeOpContext opCtx;
+
+ @BeforeClass
+ public static void setup() throws HyracksDataException {
+ harness.setUp();
+ lsmBtree = LSMBTreeExamplesTest.createTreeIndex(harness, TYPE_TRAITS,
CMP_FACTORIES, BLOOM_FILTER_KEY_FIELDS,
+ null, null, null, null);
+ lsmBtree.create();
+ lsmBtree.activate();
+ insertData(lsmBtree);
+ }
+
+ @AfterClass
+ public static void teardown() throws HyracksDataException {
+ try {
+ lsmBtree.deactivate();
+ lsmBtree.destroy();
+ } finally {
+ harness.tearDown();
+ }
+ }
+
+ @Override
+ protected List<ISearchPredicate> createSearchPredicates() throws Exception
{
+ IFrame frame = new VSizeFrame(harness.getHyracksTastContext());
+ FrameTupleAppender appender = new FrameTupleAppender();
+ appender.reset(frame, true);
+ MultiComparator keyCmp = null;
+ for (int i = 0; i < 10; i++) {
+ // Build low key.
+ ArrayTupleBuilder lowKeyTb = new
ArrayTupleBuilder(KEY_FIELD_COUNT);
+ ArrayTupleReference lowKey = new ArrayTupleReference();
+ TupleUtils.createIntegerTuple(lowKeyTb, lowKey, -100 + (i * 50));
+ appender.append(lowKey);
+ if (keyCmp == null) {
+ keyCmp = BTreeUtils.getSearchMultiComparator(CMP_FACTORIES,
lowKey);
+ }
+ }
+ IFrameTupleAccessor accessor =
+ new FrameTupleAccessor(new
RecordDescriptor(Arrays.copyOf(FIELD_SERDES, KEY_FIELD_COUNT)));
+ accessor.reset(frame.getBuffer());
+ BatchPredicate predicate = new BatchPredicate(accessor, keyCmp, null,
null, null);
+ return Collections.singletonList(predicate);
+ }
+
+ @Override
+ protected IIndexCursor createCursor(IIndexAccessor accessor) {
+ opCtx = lsmBtree.createOpContext(NoOpIndexAccessParameters.INSTANCE);
+ return new LSMBTreeBatchPointSearchCursor(opCtx);
+ }
+
+ @Override
+ protected void open(IIndexAccessor accessor, IIndexCursor cursor,
ISearchPredicate predicate)
+ throws HyracksDataException {
+ opCtx.reset();
+ opCtx.setOperation(IndexOperation.SEARCH);
+ lsmBtree.getOperationalComponents(opCtx);
+ opCtx.getSearchInitialState().reset(predicate,
opCtx.getComponentHolder());
+ cursor.open(opCtx.getSearchInitialState(), predicate);
+ }
+
+ @Override
+ protected IIndexAccessor createAccessor() throws Exception {
+ return lsmBtree.createAccessor(NoOpIndexAccessParameters.INSTANCE);
+ }
+
+ public static void insertData(ITreeIndex lsmBtree) throws
HyracksDataException {
+ ArrayTupleBuilder tb = new ArrayTupleBuilder(FIELD_COUNT);
+ ArrayTupleReference tuple = new ArrayTupleReference();
+ IndexAccessParameters actx =
+ new IndexAccessParameters(TestOperationCallback.INSTANCE,
TestOperationCallback.INSTANCE);
+ IIndexAccessor indexAccessor = lsmBtree.createAccessor(actx);
+ try {
+ int numInserts = 10000;
+ for (int i = 0; i < numInserts; i++) {
+ int f0 = RND.nextInt() % numInserts;
+ int f1 = 5;
+ TupleUtils.createIntegerTuple(tb, tuple, f0, f1);
+ try {
+ indexAccessor.insert(tuple);
+ } catch (HyracksDataException e) {
+ if (e.getErrorCode() != ErrorCode.DUPLICATE_KEY) {
+ e.printStackTrace();
+ throw e;
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ throw e;
+ }
+ }
+ } finally {
+ indexAccessor.destroy();
+ }
+ }
+}