Repository: asterixdb Updated Branches: refs/heads/master 745379ee1 -> 1ded1616b
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1ded1616/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTree.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTree.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTree.java index 13ff420..3618737 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTree.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTree.java @@ -71,6 +71,7 @@ import org.apache.hyracks.storage.common.IIndexCursor; import org.apache.hyracks.storage.common.IModificationOperationCallback; import org.apache.hyracks.storage.common.ISearchOperationCallback; import org.apache.hyracks.storage.common.ISearchPredicate; +import org.apache.hyracks.storage.common.MultiComparator; import org.apache.hyracks.storage.common.file.IFileMapProvider; public class LSMRTree extends AbstractLSMRTree { @@ -173,20 +174,39 @@ public class LSMRTree extends AbstractLSMRTree { RTreeSearchCursor rtreeScanCursor = (RTreeSearchCursor) memRTreeAccessor.createSearchCursor(false); SearchPredicate rtreeNullPredicate = new SearchPredicate(null, null); memRTreeAccessor.search(rtreeScanCursor, rtreeNullPredicate); + LSMRTreeDiskComponent component = createDiskComponent(componentFactory, flushOp.getTarget(), flushOp.getBTreeTarget(), flushOp.getBloomFilterTarget(), true); - RTree diskRTree = component.getRTree(); - IIndexBulkLoader rTreeBulkloader; - ITreeIndexCursor cursor; + //count the number of tuples in the buddy btree + ITreeIndexAccessor memBTreeAccessor = flushingComponent.getBTree() + .createAccessor(NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE); + RangePredicate btreeNullPredicate = new RangePredicate(null, null, true, true, null, null); + IIndexCursor btreeCountingCursor = ((BTreeAccessor) memBTreeAccessor).createCountingSearchCursor(); + memBTreeAccessor.search(btreeCountingCursor, btreeNullPredicate); + long numBTreeTuples = 0L; + try { + while (btreeCountingCursor.hasNext()) { + btreeCountingCursor.next(); + ITupleReference countTuple = btreeCountingCursor.getTuple(); + numBTreeTuples = IntegerPointable.getInteger(countTuple.getFieldData(0), countTuple.getFieldStart(0)); + } + } finally { + btreeCountingCursor.close(); + } + + IIndexBulkLoader componentBulkLoader = + createComponentBulkLoader(component, 1.0f, false, numBTreeTuples, false, false); + + ITreeIndexCursor cursor; IBinaryComparatorFactory[] linearizerArray = { linearizer }; TreeTupleSorter rTreeTupleSorter = new TreeTupleSorter(flushingComponent.getRTree().getFileId(), linearizerArray, rtreeLeafFrameFactory.createFrame(), rtreeLeafFrameFactory.createFrame(), flushingComponent.getRTree().getBufferCache(), comparatorFields); + // BulkLoad the tuples from the in-memory tree into the new disk // RTree. - boolean isEmpty = true; try { while (rtreeScanCursor.hasNext()) { @@ -199,7 +219,6 @@ public class LSMRTree extends AbstractLSMRTree { } rTreeTupleSorter.sort(); - rTreeBulkloader = diskRTree.createBulkLoader(1.0f, false, 0L, false); cursor = rTreeTupleSorter; if (!isEmpty) { @@ -207,54 +226,24 @@ public class LSMRTree extends AbstractLSMRTree { while (cursor.hasNext()) { cursor.next(); ITupleReference frameTuple = cursor.getTuple(); - rTreeBulkloader.add(frameTuple); + componentBulkLoader.add(frameTuple); } } finally { cursor.close(); } } - rTreeBulkloader.end(); - - ITreeIndexAccessor memBTreeAccessor = flushingComponent.getBTree() - .createAccessor(NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE); - RangePredicate btreeNullPredicate = new RangePredicate(null, null, true, true, null, null); - IIndexCursor btreeCountingCursor = ((BTreeAccessor) memBTreeAccessor).createCountingSearchCursor(); - memBTreeAccessor.search(btreeCountingCursor, btreeNullPredicate); - long numBTreeTuples = 0L; - try { - while (btreeCountingCursor.hasNext()) { - btreeCountingCursor.next(); - ITupleReference countTuple = btreeCountingCursor.getTuple(); - numBTreeTuples = IntegerPointable.getInteger(countTuple.getFieldData(0), countTuple.getFieldStart(0)); - } - } finally { - btreeCountingCursor.close(); - } - - int maxBucketsPerElement = BloomCalculations.maxBucketsPerElement(numBTreeTuples); - BloomFilterSpecification bloomFilterSpec = - BloomCalculations.computeBloomSpec(maxBucketsPerElement, bloomFilterFalsePositiveRate); - + // scan the memory BTree IIndexCursor btreeScanCursor = memBTreeAccessor.createSearchCursor(false); memBTreeAccessor.search(btreeScanCursor, btreeNullPredicate); - BTree diskBTree = component.getBTree(); - - // BulkLoad the tuples from the in-memory tree into the new disk BTree. - IIndexBulkLoader bTreeBulkloader = diskBTree.createBulkLoader(1.0f, false, numBTreeTuples, false); - IIndexBulkLoader builder = component.getBloomFilter().createBuilder(numBTreeTuples, - bloomFilterSpec.getNumHashes(), bloomFilterSpec.getNumBucketsPerElements()); - // scan the memory BTree try { while (btreeScanCursor.hasNext()) { btreeScanCursor.next(); ITupleReference frameTuple = btreeScanCursor.getTuple(); - bTreeBulkloader.add(frameTuple); - builder.add(frameTuple); + ((LSMRTreeDiskComponentBulkLoader) componentBulkLoader).delete(frameTuple); } } finally { btreeScanCursor.close(); - builder.end(); } if (component.getLSMComponentFilter() != null) { @@ -266,7 +255,8 @@ public class LSMRTree extends AbstractLSMRTree { } // Note. If we change the filter to write to metadata object, we don't need the if block above flushingComponent.getMetadata().copy(component.getMetadata()); - bTreeBulkloader.end(); + + componentBulkLoader.end(); return component; } @@ -282,40 +272,46 @@ public class LSMRTree extends AbstractLSMRTree { LSMRTreeDiskComponent mergedComponent = createDiskComponent(componentFactory, mergeOp.getTarget(), mergeOp.getBTreeTarget(), mergeOp.getBloomFilterTarget(), true); + IIndexBulkLoader componentBulkLoader; + // In case we must keep the deleted-keys BTrees, then they must be merged *before* merging the r-trees so that // lsmHarness.endSearch() is called once when the r-trees have been merged. - BTree btree = mergedComponent.getBTree(); - IIndexBulkLoader btreeBulkLoader = btree.createBulkLoader(1.0f, true, 0L, false); if (mergeOp.getMergingComponents().get(mergeOp.getMergingComponents().size() - 1) != diskComponents .get(diskComponents.size() - 1)) { // Keep the deleted tuples since the oldest disk component is not included in the merge operation - LSMRTreeDeletedKeysBTreeMergeCursor btreeCursor = new LSMRTreeDeletedKeysBTreeMergeCursor(opCtx); - search(opCtx, btreeCursor, rtreeSearchPred); - long numElements = 0L; for (int i = 0; i < mergeOp.getMergingComponents().size(); ++i) { numElements += ((LSMRTreeDiskComponent) mergeOp.getMergingComponents().get(i)).getBloomFilter() .getNumElements(); } + componentBulkLoader = createComponentBulkLoader(mergedComponent, 1.0f, false, numElements, false, false); - int maxBucketsPerElement = BloomCalculations.maxBucketsPerElement(numElements); - BloomFilterSpecification bloomFilterSpec = - BloomCalculations.computeBloomSpec(maxBucketsPerElement, bloomFilterFalsePositiveRate); - IIndexBulkLoader builder = mergedComponent.getBloomFilter().createBuilder(numElements, - bloomFilterSpec.getNumHashes(), bloomFilterSpec.getNumBucketsPerElements()); - + LSMRTreeDeletedKeysBTreeMergeCursor btreeCursor = new LSMRTreeDeletedKeysBTreeMergeCursor(opCtx); + search(opCtx, btreeCursor, rtreeSearchPred); try { while (btreeCursor.hasNext()) { btreeCursor.next(); ITupleReference tuple = btreeCursor.getTuple(); - btreeBulkLoader.add(tuple); - builder.add(tuple); + ((LSMRTreeDiskComponentBulkLoader) componentBulkLoader).delete(tuple); } } finally { btreeCursor.close(); - builder.end(); } + } else { + //no buddy-btree needed + componentBulkLoader = createComponentBulkLoader(mergedComponent, 1.0f, false, 0L, false, false); + } + + //search old rtree components + try { + while (cursor.hasNext()) { + cursor.next(); + ITupleReference frameTuple = cursor.getTuple(); + componentBulkLoader.add(frameTuple); + } + } finally { + cursor.close(); } if (mergedComponent.getLSMComponentFilter() != null) { @@ -327,19 +323,8 @@ public class LSMRTree extends AbstractLSMRTree { getFilterManager().updateFilter(mergedComponent.getLSMComponentFilter(), filterTuples); getFilterManager().writeFilter(mergedComponent.getLSMComponentFilter(), mergedComponent.getRTree()); } - btreeBulkLoader.end(); - IIndexBulkLoader bulkLoader = mergedComponent.getRTree().createBulkLoader(1.0f, false, 0L, false); - try { - while (cursor.hasNext()) { - cursor.next(); - ITupleReference frameTuple = cursor.getTuple(); - bulkLoader.add(frameTuple); - } - } finally { - cursor.close(); - } - bulkLoader.end(); + componentBulkLoader.end(); return mergedComponent; } @@ -358,6 +343,25 @@ public class LSMRTree extends AbstractLSMRTree { } @Override + public IIndexBulkLoader createComponentBulkLoader(ILSMDiskComponent component, float fillFactor, + boolean verifyInput, long numElementsHint, boolean checkIfEmptyIndex, boolean withFilter) + throws HyracksDataException { + BloomFilterSpecification bloomFilterSpec = null; + if (numElementsHint > 0) { + int maxBucketsPerElement = BloomCalculations.maxBucketsPerElement(numElementsHint); + bloomFilterSpec = BloomCalculations.computeBloomSpec(maxBucketsPerElement, bloomFilterFalsePositiveRate); + } + if (withFilter && filterFields != null) { + return new LSMRTreeDiskComponentBulkLoader((LSMRTreeDiskComponent) component, bloomFilterSpec, fillFactor, + verifyInput, numElementsHint, checkIfEmptyIndex, filterManager, treeFields, filterFields, + MultiComparator.create(component.getLSMComponentFilter().getFilterCmpFactories())); + } else { + return new LSMRTreeDiskComponentBulkLoader((LSMRTreeDiskComponent) component, bloomFilterSpec, fillFactor, + verifyInput, numElementsHint, checkIfEmptyIndex); + } + } + + @Override public IIndexBulkLoader createBulkLoader(float fillLevel, boolean verifyInput, long numElementsHint) throws HyracksDataException { return new LSMRTreeBulkLoader(this, fillLevel, verifyInput, numElementsHint); http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1ded1616/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeBulkLoader.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeBulkLoader.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeBulkLoader.java index edc3e7d..fbdd37a 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeBulkLoader.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeBulkLoader.java @@ -20,110 +20,41 @@ package org.apache.hyracks.storage.am.lsm.rtree.impls; import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference; -import org.apache.hyracks.storage.am.common.tuples.PermutingTupleReference; import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent; import org.apache.hyracks.storage.am.lsm.common.api.LSMOperationType; import org.apache.hyracks.storage.common.IIndexBulkLoader; -import org.apache.hyracks.storage.common.MultiComparator; public class LSMRTreeBulkLoader implements IIndexBulkLoader { private final ILSMDiskComponent component; - private final IIndexBulkLoader bulkLoader; - private final IIndexBulkLoader buddyBTreeBulkloader; - private boolean cleanedUpArtifacts = false; - private boolean isEmptyComponent = true; - public final PermutingTupleReference indexTuple; - public final PermutingTupleReference filterTuple; - public final MultiComparator filterCmp; private final LSMRTree lsmIndex; + private final IIndexBulkLoader componentBulkLoader; public LSMRTreeBulkLoader(LSMRTree lsmIndex, float fillFactor, boolean verifyInput, long numElementsHint) throws HyracksDataException { this.lsmIndex = lsmIndex; // Note that by using a flush target file name, we state that the // new bulk loaded tree is "newer" than any other merged tree. - component = lsmIndex.createBulkLoadTarget(); - bulkLoader = ((LSMRTreeDiskComponent) component).getRTree().createBulkLoader(fillFactor, verifyInput, - numElementsHint, false); - buddyBTreeBulkloader = ((LSMRTreeDiskComponent) component).getBTree().createBulkLoader(fillFactor, verifyInput, - numElementsHint, false); - if (lsmIndex.getFilterFields() != null) { - indexTuple = new PermutingTupleReference(lsmIndex.getTreeFields()); - filterCmp = MultiComparator.create(component.getLSMComponentFilter().getFilterCmpFactories()); - filterTuple = new PermutingTupleReference(lsmIndex.getFilterFields()); - } else { - indexTuple = null; - filterCmp = null; - filterTuple = null; - } + this.component = lsmIndex.createBulkLoadTarget(); + this.componentBulkLoader = + lsmIndex.createComponentBulkLoader(component, fillFactor, verifyInput, numElementsHint, false, true); } @Override public void add(ITupleReference tuple) throws HyracksDataException { - try { - ITupleReference t; - if (indexTuple != null) { - indexTuple.reset(tuple); - t = indexTuple; - } else { - t = tuple; - } - - bulkLoader.add(t); - - if (filterTuple != null) { - filterTuple.reset(tuple); - component.getLSMComponentFilter().update(filterTuple, filterCmp); - } - } catch (Exception e) { - cleanupArtifacts(); - throw e; - } - if (isEmptyComponent) { - isEmptyComponent = false; - } + componentBulkLoader.add(tuple); } @Override public void end() throws HyracksDataException { - if (!cleanedUpArtifacts) { - - if (component.getLSMComponentFilter() != null) { - lsmIndex.getFilterManager().writeFilter(component.getLSMComponentFilter(), - ((LSMRTreeDiskComponent) component).getRTree()); - } - - bulkLoader.end(); - buddyBTreeBulkloader.end(); - - if (isEmptyComponent) { - cleanupArtifacts(); - } else { - lsmIndex.getIOOperationCallback().afterOperation(LSMOperationType.FLUSH, null, component); - lsmIndex.getLsmHarness().addBulkLoadedComponent(component); - } + componentBulkLoader.end(); + if (component.getComponentSize() > 0) { + lsmIndex.getIOOperationCallback().afterOperation(LSMOperationType.FLUSH, null, component); + lsmIndex.getLsmHarness().addBulkLoadedComponent(component); } } @Override public void abort() throws HyracksDataException { - if (bulkLoader != null) { - bulkLoader.abort(); - } - if (buddyBTreeBulkloader != null) { - buddyBTreeBulkloader.abort(); - } - } - - protected void cleanupArtifacts() throws HyracksDataException { - if (!cleanedUpArtifacts) { - cleanedUpArtifacts = true; - ((LSMRTreeDiskComponent) component).getRTree().deactivate(); - ((LSMRTreeDiskComponent) component).getRTree().destroy(); - ((LSMRTreeDiskComponent) component).getBTree().deactivate(); - ((LSMRTreeDiskComponent) component).getBTree().destroy(); - ((LSMRTreeDiskComponent) component).getBloomFilter().deactivate(); - ((LSMRTreeDiskComponent) component).getBloomFilter().destroy(); - } + componentBulkLoader.abort(); } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1ded1616/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeDiskComponentBulkLoader.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeDiskComponentBulkLoader.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeDiskComponentBulkLoader.java new file mode 100644 index 0000000..ff0a299 --- /dev/null +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeDiskComponentBulkLoader.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hyracks.storage.am.lsm.rtree.impls; + +import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.storage.am.bloomfilter.impls.BloomFilter; +import org.apache.hyracks.storage.am.bloomfilter.impls.BloomFilterSpecification; +import org.apache.hyracks.storage.am.common.api.ITreeIndex; +import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentFilterManager; +import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent; +import org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMDiskComponentWithBuddyBulkLoader; +import org.apache.hyracks.storage.common.IIndex; +import org.apache.hyracks.storage.common.MultiComparator; + +public class LSMRTreeDiskComponentBulkLoader extends AbstractLSMDiskComponentWithBuddyBulkLoader { + + //with filter + public LSMRTreeDiskComponentBulkLoader(LSMRTreeDiskComponent component, BloomFilterSpecification bloomFilterSpec, + float fillFactor, boolean verifyInput, long numElementsHint, boolean checkIfEmptyIndex, + ILSMComponentFilterManager filterManager, int[] indexFields, int[] filterFields, MultiComparator filterCmp) + throws HyracksDataException { + super(component, bloomFilterSpec, fillFactor, verifyInput, numElementsHint, checkIfEmptyIndex, filterManager, + indexFields, filterFields, filterCmp); + } + + //without filter + public LSMRTreeDiskComponentBulkLoader(LSMRTreeDiskComponent component, BloomFilterSpecification bloomFilterSpec, + float fillFactor, boolean verifyInput, long numElementsHint, boolean checkIfEmptyIndex) + throws HyracksDataException { + super(component, bloomFilterSpec, fillFactor, verifyInput, numElementsHint, checkIfEmptyIndex, null, null, null, + null); + } + + @Override + protected BloomFilter getBloomFilter(ILSMDiskComponent component) { + return ((LSMRTreeDiskComponent) component).getBloomFilter(); + } + + @Override + protected IIndex getIndex(ILSMDiskComponent component) { + return ((LSMRTreeDiskComponent) component).getRTree(); + } + + @Override + protected ITreeIndex getBuddyBTree(ILSMDiskComponent component) { + return ((LSMRTreeDiskComponent) component).getBTree(); + } + +} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1ded1616/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeWithAntiMatterTuples.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeWithAntiMatterTuples.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeWithAntiMatterTuples.java index a20e6f2..24c46d7 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeWithAntiMatterTuples.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeWithAntiMatterTuples.java @@ -35,7 +35,6 @@ import org.apache.hyracks.storage.am.common.api.ITreeIndexAccessor; import org.apache.hyracks.storage.am.common.api.ITreeIndexCursor; import org.apache.hyracks.storage.am.common.api.ITreeIndexFrameFactory; import org.apache.hyracks.storage.am.common.impls.NoOpOperationCallback; -import org.apache.hyracks.storage.am.common.tuples.PermutingTupleReference; import org.apache.hyracks.storage.am.lsm.common.api.IComponentFilterHelper; import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponent; import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentFilterFrameFactory; @@ -136,14 +135,7 @@ public class LSMRTreeWithAntiMatterTuples extends AbstractLSMRTree { SearchPredicate rtreeNullPredicate = new SearchPredicate(null, null); memRTreeAccessor.search(rtreeScanCursor, rtreeNullPredicate); LSMRTreeDiskComponent component = createDiskComponent(componentFactory, flushOp.getTarget(), null, null, true); - RTree diskRTree = component.getRTree(); - - // scan the memory BTree - ITreeIndexAccessor memBTreeAccessor = flushingComponent.getBTree() - .createAccessor(NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE); - BTreeRangeSearchCursor btreeScanCursor = (BTreeRangeSearchCursor) memBTreeAccessor.createSearchCursor(false); - RangePredicate btreeNullPredicate = new RangePredicate(null, null, true, true, null, null); - memBTreeAccessor.search(btreeScanCursor, btreeNullPredicate); + IIndexBulkLoader componentBulkLoader = createComponentBulkLoader(component, 1.0f, false, 0L, false, false); // Since the LSM-RTree is used as a secondary assumption, the // primary key will be the last comparator in the BTree comparators @@ -151,12 +143,6 @@ public class LSMRTreeWithAntiMatterTuples extends AbstractLSMRTree { linearizerArray, rtreeLeafFrameFactory.createFrame(), rtreeLeafFrameFactory.createFrame(), flushingComponent.getRTree().getBufferCache(), comparatorFields); - TreeTupleSorter bTreeTupleSorter = new TreeTupleSorter(flushingComponent.getBTree().getFileId(), - linearizerArray, btreeLeafFrameFactory.createFrame(), btreeLeafFrameFactory.createFrame(), - flushingComponent.getBTree().getBufferCache(), comparatorFields); - // BulkLoad the tuples from the in-memory tree into the new disk - // RTree. - boolean isEmpty = true; try { while (rtreeScanCursor.hasNext()) { @@ -171,6 +157,16 @@ public class LSMRTreeWithAntiMatterTuples extends AbstractLSMRTree { rTreeTupleSorter.sort(); } + // scan the memory BTree + ITreeIndexAccessor memBTreeAccessor = flushingComponent.getBTree() + .createAccessor(NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE); + BTreeRangeSearchCursor btreeScanCursor = (BTreeRangeSearchCursor) memBTreeAccessor.createSearchCursor(false); + RangePredicate btreeNullPredicate = new RangePredicate(null, null, true, true, null, null); + memBTreeAccessor.search(btreeScanCursor, btreeNullPredicate); + TreeTupleSorter bTreeTupleSorter = new TreeTupleSorter(flushingComponent.getBTree().getFileId(), + linearizerArray, btreeLeafFrameFactory.createFrame(), btreeLeafFrameFactory.createFrame(), + flushingComponent.getBTree().getBufferCache(), comparatorFields); + isEmpty = true; try { while (btreeScanCursor.hasNext()) { @@ -185,7 +181,6 @@ public class LSMRTreeWithAntiMatterTuples extends AbstractLSMRTree { bTreeTupleSorter.sort(); } - IIndexBulkLoader rTreeBulkloader = diskRTree.createBulkLoader(1.0f, false, 0L, false); LSMRTreeWithAntiMatterTuplesFlushCursor cursor = new LSMRTreeWithAntiMatterTuplesFlushCursor(rTreeTupleSorter, bTreeTupleSorter, comparatorFields, linearizerArray); cursor.open(null, null); @@ -195,7 +190,7 @@ public class LSMRTreeWithAntiMatterTuples extends AbstractLSMRTree { cursor.next(); ITupleReference frameTuple = cursor.getTuple(); - rTreeBulkloader.add(frameTuple); + componentBulkLoader.add(frameTuple); } } finally { cursor.close(); @@ -209,8 +204,8 @@ public class LSMRTreeWithAntiMatterTuples extends AbstractLSMRTree { getFilterManager().writeFilter(component.getLSMComponentFilter(), component.getRTree()); } flushingComponent.getMetadata().copy(component.getMetadata()); - rTreeBulkloader.end(); + componentBulkLoader.end(); return component; } @@ -225,13 +220,13 @@ public class LSMRTreeWithAntiMatterTuples extends AbstractLSMRTree { // Bulk load the tuples from all on-disk RTrees into the new RTree. LSMRTreeDiskComponent component = createDiskComponent(componentFactory, mergeOp.getTarget(), null, null, true); - RTree mergedRTree = component.getRTree(); - IIndexBulkLoader bulkloader = mergedRTree.createBulkLoader(1.0f, false, 0L, false); + + IIndexBulkLoader componentBulkLoader = createComponentBulkLoader(component, 1.0f, false, 0L, false, false); try { while (cursor.hasNext()) { cursor.next(); ITupleReference frameTuple = cursor.getTuple(); - bulkloader.add(frameTuple); + componentBulkLoader.add(frameTuple); } } finally { cursor.close(); @@ -245,7 +240,8 @@ public class LSMRTreeWithAntiMatterTuples extends AbstractLSMRTree { getFilterManager().updateFilter(component.getLSMComponentFilter(), filterTuples); getFilterManager().writeFilter(component.getLSMComponentFilter(), component.getRTree()); } - bulkloader.end(); + + componentBulkLoader.end(); return component; } @@ -258,6 +254,20 @@ public class LSMRTreeWithAntiMatterTuples extends AbstractLSMRTree { } @Override + public IIndexBulkLoader createComponentBulkLoader(ILSMDiskComponent component, float fillFactor, + boolean verifyInput, long numElementsHint, boolean checkIfEmptyIndex, boolean withFilter) + throws HyracksDataException { + if (withFilter && filterFields != null) { + return new LSMRTreeWithAntiMatterTuplesDiskComponentBulkLoader((LSMRTreeDiskComponent) component, null, + fillFactor, verifyInput, numElementsHint, checkIfEmptyIndex, filterManager, treeFields, + filterFields, MultiComparator.create(component.getLSMComponentFilter().getFilterCmpFactories())); + } else { + return new LSMRTreeWithAntiMatterTuplesDiskComponentBulkLoader((LSMRTreeDiskComponent) component, null, + fillFactor, verifyInput, numElementsHint, checkIfEmptyIndex); + } + } + + @Override public IIndexBulkLoader createBulkLoader(float fillLevel, boolean verifyInput, long numElementsHint) throws HyracksDataException { return new LSMRTreeWithAntiMatterTuplesBulkLoader(fillLevel, verifyInput, numElementsHint); @@ -265,91 +275,35 @@ public class LSMRTreeWithAntiMatterTuples extends AbstractLSMRTree { public class LSMRTreeWithAntiMatterTuplesBulkLoader implements IIndexBulkLoader { private final ILSMDiskComponent component; - private final IIndexBulkLoader bulkLoader; - private boolean cleanedUpArtifacts = false; - private boolean isEmptyComponent = true; - public final PermutingTupleReference indexTuple; - public final PermutingTupleReference filterTuple; - public final MultiComparator filterCmp; + private final IIndexBulkLoader componentBulkLoader; public LSMRTreeWithAntiMatterTuplesBulkLoader(float fillFactor, boolean verifyInput, long numElementsHint) throws HyracksDataException { - // Note that by using a flush target file name, we state that the - // new bulk loaded tree is "newer" than any other merged tree. - component = createBulkLoadTarget(); - bulkLoader = ((LSMRTreeDiskComponent) component).getRTree().createBulkLoader(fillFactor, verifyInput, - numElementsHint, false); - - if (getFilterFields() != null) { - indexTuple = new PermutingTupleReference(getTreeFields()); - filterCmp = MultiComparator.create(component.getLSMComponentFilter().getFilterCmpFactories()); - filterTuple = new PermutingTupleReference(getFilterFields()); - } else { - indexTuple = null; - filterCmp = null; - filterTuple = null; - } + + componentBulkLoader = + createComponentBulkLoader(component, fillFactor, verifyInput, numElementsHint, false, true); } @Override public void add(ITupleReference tuple) throws HyracksDataException { - try { - ITupleReference t; - if (indexTuple != null) { - indexTuple.reset(tuple); - t = indexTuple; - } else { - t = tuple; - } - - bulkLoader.add(t); - - if (filterTuple != null) { - filterTuple.reset(tuple); - component.getLSMComponentFilter().update(filterTuple, filterCmp); - } - - } catch (Exception e) { - cleanupArtifacts(); - throw e; - } - if (isEmptyComponent) { - isEmptyComponent = false; - } + componentBulkLoader.add(tuple); } @Override public void end() throws HyracksDataException { - if (!cleanedUpArtifacts) { - - if (component.getLSMComponentFilter() != null) { - getFilterManager().writeFilter(component.getLSMComponentFilter(), - ((LSMRTreeDiskComponent) component).getRTree()); - } - bulkLoader.end(); - - if (isEmptyComponent) { - cleanupArtifacts(); - } else { - ioOpCallback.afterOperation(LSMOperationType.FLUSH, null, component); - getLsmHarness().addBulkLoadedComponent(component); - } + + componentBulkLoader.end(); + if (component.getComponentSize() > 0) { + ioOpCallback.afterOperation(LSMOperationType.FLUSH, null, component); + lsmHarness.addBulkLoadedComponent(component); } } @Override public void abort() throws HyracksDataException { - if (bulkLoader != null) { - bulkLoader.abort(); - } - } - - protected void cleanupArtifacts() throws HyracksDataException { - if (!cleanedUpArtifacts) { - cleanedUpArtifacts = true; - ((LSMRTreeDiskComponent) component).getRTree().deactivate(); - ((LSMRTreeDiskComponent) component).getRTree().destroy(); + if (componentBulkLoader != null) { + componentBulkLoader.abort(); } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1ded1616/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeWithAntiMatterTuplesDiskComponentBulkLoader.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeWithAntiMatterTuplesDiskComponentBulkLoader.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeWithAntiMatterTuplesDiskComponentBulkLoader.java new file mode 100644 index 0000000..88a2e1e --- /dev/null +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeWithAntiMatterTuplesDiskComponentBulkLoader.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hyracks.storage.am.lsm.rtree.impls; + +import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.storage.am.bloomfilter.impls.BloomFilter; +import org.apache.hyracks.storage.am.bloomfilter.impls.BloomFilterSpecification; +import org.apache.hyracks.storage.am.common.api.ITreeIndex; +import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentFilterManager; +import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent; +import org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMDiskComponentBulkLoader; +import org.apache.hyracks.storage.common.MultiComparator; + +public class LSMRTreeWithAntiMatterTuplesDiskComponentBulkLoader extends AbstractLSMDiskComponentBulkLoader { + + //with filter + public LSMRTreeWithAntiMatterTuplesDiskComponentBulkLoader(LSMRTreeDiskComponent component, + BloomFilterSpecification bloomFilterSpec, float fillFactor, boolean verifyInput, long numElementsHint, + boolean checkIfEmptyIndex, ILSMComponentFilterManager filterManager, int[] indexFields, int[] filterFields, + MultiComparator filterCmp) throws HyracksDataException { + super(component, bloomFilterSpec, fillFactor, verifyInput, numElementsHint, checkIfEmptyIndex, filterManager, + indexFields, filterFields, filterCmp); + } + + //without filter + public LSMRTreeWithAntiMatterTuplesDiskComponentBulkLoader(LSMRTreeDiskComponent component, + BloomFilterSpecification bloomFilterSpec, float fillFactor, boolean verifyInput, long numElementsHint, + boolean checkIfEmptyIndex) throws HyracksDataException { + super(component, bloomFilterSpec, fillFactor, verifyInput, numElementsHint, checkIfEmptyIndex, null, null, null, + null); + } + + @Override + protected BloomFilter getBloomFilter(ILSMDiskComponent component) { + return ((LSMRTreeDiskComponent) component).getBloomFilter(); + } + + @Override + protected ITreeIndex getIndex(ILSMDiskComponent component) { + return ((LSMRTreeDiskComponent) component).getRTree(); + } + +}
