http://git-wip-us.apache.org/repos/asf/asterixdb/blob/77f89525/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMMemoryComponent.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMMemoryComponent.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMMemoryComponent.java index 18121e0..13543e4 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMMemoryComponent.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMMemoryComponent.java @@ -65,7 +65,7 @@ public interface ILSMMemoryComponent extends ILSMComponent { /** * request the component to be active */ - void activate(); + void requestActivation(); /** * Set the component state @@ -74,4 +74,31 @@ public interface ILSMMemoryComponent extends ILSMComponent { * the new state */ void setState(ComponentState state); + + /** + * Allocates memory to this component, create and activate it + * + * @throws HyracksDataException + */ + void allocate() throws HyracksDataException; + + /** + * Deactivete the memory component, destroy it, and deallocates its memory + * + * @throws HyracksDataException + */ + void deallocate() throws HyracksDataException; + + /** + * Test method + * TODO: Get rid of it + * + * @throws HyracksDataException + */ + void validate() throws HyracksDataException; + + /** + * @return the size of the memory component + */ + long getSize(); }
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/77f89525/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMDiskComponent.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMDiskComponent.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMDiskComponent.java index 4386d52..280cc52 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMDiskComponent.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMDiskComponent.java @@ -25,18 +25,27 @@ import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent; import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponentId; import org.apache.hyracks.storage.am.lsm.common.api.LSMOperationType; import org.apache.hyracks.storage.am.lsm.common.util.ComponentUtils; +import org.apache.hyracks.storage.common.MultiComparator; public abstract class AbstractLSMDiskComponent extends AbstractLSMComponent implements ILSMDiskComponent { private final DiskComponentMetadata metadata; + private final AbstractLSMIndex lsmIndex; - public AbstractLSMDiskComponent(IMetadataPageManager mdPageManager, ILSMComponentFilter filter) { + public AbstractLSMDiskComponent(AbstractLSMIndex lsmIndex, IMetadataPageManager mdPageManager, + ILSMComponentFilter filter) { super(filter); + this.lsmIndex = lsmIndex; state = ComponentState.READABLE_UNWRITABLE; metadata = new DiskComponentMetadata(mdPageManager); } @Override + public AbstractLSMIndex getLsmIndex() { + return lsmIndex; + } + + @Override public boolean threadEnter(LSMOperationType opType, boolean isMutableComponent) { if (state == ComponentState.INACTIVE) { throw new IllegalStateException("Trying to enter an inactive disk component"); @@ -110,4 +119,82 @@ public abstract class AbstractLSMDiskComponent extends AbstractLSMComponent impl //TODO: do we need to throw an exception when ID is not found? return new LSMDiskComponentId(minID, maxID); } + + /** + * Mark the component as valid + * + * @param persist + * whether the call should force data to disk before returning + * @throws HyracksDataException + */ + @Override + public void markAsValid(boolean persist) throws HyracksDataException { + ComponentUtils.markAsValid(getMetadataHolder(), persist); + } + + @Override + public void activate(boolean createNewComponent) throws HyracksDataException { + if (createNewComponent) { + getIndex().create(); + } + getIndex().activate(); + if (getLSMComponentFilter() != null && !createNewComponent) { + getLsmIndex().getFilterManager().readFilter(getLSMComponentFilter(), getMetadataHolder()); + } + } + + @Override + public void deactivateAndDestroy() throws HyracksDataException { + getIndex().deactivate(); + getIndex().destroy(); + } + + @Override + public void destroy() throws HyracksDataException { + getIndex().destroy(); + } + + @Override + public void deactivate() throws HyracksDataException { + getIndex().deactivate(); + } + + @Override + public void deactivateAndPurge() throws HyracksDataException { + getIndex().deactivate(); + getIndex().purge(); + } + + @Override + public void validate() throws HyracksDataException { + getIndex().validate(); + } + + @Override + public IChainedComponentBulkLoader createFilterBulkLoader() throws HyracksDataException { + return new FilterBulkLoader(getLSMComponentFilter(), getMetadataHolder(), getLsmIndex().getFilterManager(), + getLsmIndex().getTreeFields(), getLsmIndex().getFilterFields(), + MultiComparator.create(getLSMComponentFilter().getFilterCmpFactories())); + } + + @Override + public IChainedComponentBulkLoader createIndexBulkLoader(float fillFactor, boolean verifyInput, + long numElementsHint, boolean checkIfEmptyIndex) throws HyracksDataException { + return new LSMIndexBulkLoader( + getIndex().createBulkLoader(fillFactor, verifyInput, numElementsHint, checkIfEmptyIndex)); + } + + @Override + public ChainedLSMDiskComponentBulkLoader createBulkLoader(float fillFactor, boolean verifyInput, + long numElementsHint, boolean checkIfEmptyIndex, boolean withFilter, boolean cleanupEmptyComponent) + throws HyracksDataException { + ChainedLSMDiskComponentBulkLoader chainedBulkLoader = + new ChainedLSMDiskComponentBulkLoader(this, cleanupEmptyComponent); + if (withFilter && getLsmIndex().getFilterFields() != null) { + chainedBulkLoader.addBulkLoader(createFilterBulkLoader()); + } + chainedBulkLoader + .addBulkLoader(createIndexBulkLoader(fillFactor, verifyInput, numElementsHint, checkIfEmptyIndex)); + return chainedBulkLoader; + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/asterixdb/blob/77f89525/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMDiskComponentBulkLoader.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMDiskComponentBulkLoader.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMDiskComponentBulkLoader.java deleted file mode 100644 index 351f619..0000000 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMDiskComponentBulkLoader.java +++ /dev/null @@ -1,202 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.hyracks.storage.am.lsm.common.impls; - -import org.apache.hyracks.api.exceptions.HyracksDataException; -import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference; -import org.apache.hyracks.storage.am.bloomfilter.impls.BloomFilter; -import org.apache.hyracks.storage.am.bloomfilter.impls.BloomFilterSpecification; -import org.apache.hyracks.storage.am.common.api.ITreeIndex; -import org.apache.hyracks.storage.am.common.impls.AbstractTreeIndex.AbstractTreeIndexBulkLoader; -import org.apache.hyracks.storage.am.common.tuples.PermutingTupleReference; -import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentFilterManager; -import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent; -import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponentBulkLoader; -import org.apache.hyracks.storage.am.lsm.common.api.ILSMTreeTupleWriter; -import org.apache.hyracks.storage.common.IIndex; -import org.apache.hyracks.storage.common.IIndexBulkLoader; -import org.apache.hyracks.storage.common.MultiComparator; - -public abstract class AbstractLSMDiskComponentBulkLoader implements ILSMDiskComponentBulkLoader { - protected final ILSMDiskComponent component; - - protected final IIndexBulkLoader indexBulkLoader; - protected final IIndexBulkLoader bloomFilterBuilder; - - protected final ILSMComponentFilterManager filterManager; - protected final PermutingTupleReference indexTuple; - protected final PermutingTupleReference filterTuple; - protected final MultiComparator filterCmp; - protected final boolean cleanupEmptyComponent; - - protected boolean cleanedUpArtifacts = false; - protected boolean isEmptyComponent = true; - protected boolean endedBloomFilterLoad = false; - - //with filter - public AbstractLSMDiskComponentBulkLoader(ILSMDiskComponent component, BloomFilterSpecification bloomFilterSpec, - float fillFactor, boolean verifyInput, long numElementsHint, boolean checkIfEmptyIndex, - boolean cleanupEmptyComponent, ILSMComponentFilterManager filterManager, int[] indexFields, - int[] filterFields, MultiComparator filterCmp) throws HyracksDataException { - this.component = component; - this.indexBulkLoader = - getIndex(component).createBulkLoader(fillFactor, verifyInput, numElementsHint, checkIfEmptyIndex); - if (bloomFilterSpec != null) { - this.bloomFilterBuilder = getBloomFilter(component).createBuilder(numElementsHint, - bloomFilterSpec.getNumHashes(), bloomFilterSpec.getNumBucketsPerElements()); - } else { - this.bloomFilterBuilder = null; - } - this.cleanupEmptyComponent = cleanupEmptyComponent; - if (filterManager != null) { - this.filterManager = filterManager; - this.indexTuple = new PermutingTupleReference(indexFields); - this.filterTuple = new PermutingTupleReference(filterFields); - this.filterCmp = filterCmp; - } else { - this.filterManager = null; - this.indexTuple = null; - this.filterTuple = null; - this.filterCmp = null; - } - } - - @Override - public void add(ITupleReference tuple) throws HyracksDataException { - try { - ITupleReference t; - if (indexTuple != null) { - indexTuple.reset(tuple); - t = indexTuple; - } else { - t = tuple; - } - - indexBulkLoader.add(t); - if (bloomFilterBuilder != null) { - bloomFilterBuilder.add(t); - } - updateFilter(tuple); - - } catch (Exception e) { - cleanupArtifacts(); - throw e; - } - if (isEmptyComponent) { - isEmptyComponent = false; - } - } - - @Override - public void delete(ITupleReference tuple) throws HyracksDataException { - ILSMTreeTupleWriter tupleWriter = - (ILSMTreeTupleWriter) ((AbstractTreeIndexBulkLoader) indexBulkLoader).getLeafFrame().getTupleWriter(); - tupleWriter.setAntimatter(true); - try { - ITupleReference t; - if (indexTuple != null) { - indexTuple.reset(tuple); - t = indexTuple; - } else { - t = tuple; - } - - indexBulkLoader.add(t); - - updateFilter(tuple); - } catch (Exception e) { - cleanupArtifacts(); - throw e; - } finally { - tupleWriter.setAntimatter(false); - } - if (isEmptyComponent) { - isEmptyComponent = false; - } - } - - @Override - public void abort() throws HyracksDataException { - if (indexBulkLoader != null) { - indexBulkLoader.abort(); - } - if (bloomFilterBuilder != null) { - bloomFilterBuilder.abort(); - } - - } - - @Override - public void end() throws HyracksDataException { - if (!cleanedUpArtifacts) { - if (bloomFilterBuilder != null && !endedBloomFilterLoad) { - bloomFilterBuilder.end(); - endedBloomFilterLoad = true; - } - - //use filter - if (filterManager != null && component.getLSMComponentFilter() != null) { - filterManager.writeFilter(component.getLSMComponentFilter(), getTreeIndex(component)); - } - indexBulkLoader.end(); - - if (isEmptyComponent && cleanupEmptyComponent) { - cleanupArtifacts(); - } - } - } - - protected void cleanupArtifacts() throws HyracksDataException { - if (!cleanedUpArtifacts) { - cleanedUpArtifacts = true; - if (bloomFilterBuilder != null && !endedBloomFilterLoad) { - bloomFilterBuilder.abort(); - endedBloomFilterLoad = true; - } - getIndex(component).deactivate(); - getIndex(component).destroy(); - if (bloomFilterBuilder != null) { - getBloomFilter(component).deactivate(); - getBloomFilter(component).destroy(); - } - } - } - - protected void updateFilter(ITupleReference tuple) throws HyracksDataException { - if (filterTuple != null) { - filterTuple.reset(tuple); - component.getLSMComponentFilter().update(filterTuple, filterCmp); - } - } - - /** - * TreeIndex is used to hold the filter tuple values - * - * @param component - * @return - */ - protected ITreeIndex getTreeIndex(ILSMDiskComponent component) { - return (ITreeIndex) getIndex(component); - } - - protected abstract IIndex getIndex(ILSMDiskComponent component); - - protected abstract BloomFilter getBloomFilter(ILSMDiskComponent component); - -} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/77f89525/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMDiskComponentWithBuddyBulkLoader.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMDiskComponentWithBuddyBulkLoader.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMDiskComponentWithBuddyBulkLoader.java deleted file mode 100644 index fc7310c..0000000 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMDiskComponentWithBuddyBulkLoader.java +++ /dev/null @@ -1,156 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.hyracks.storage.am.lsm.common.impls; - -import org.apache.hyracks.api.exceptions.ErrorCode; -import org.apache.hyracks.api.exceptions.HyracksDataException; -import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference; -import org.apache.hyracks.storage.am.bloomfilter.impls.BloomFilterSpecification; -import org.apache.hyracks.storage.am.common.api.ITreeIndex; -import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentFilterManager; -import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent; -import org.apache.hyracks.storage.common.IIndexBulkLoader; -import org.apache.hyracks.storage.common.MultiComparator; - -public abstract class AbstractLSMDiskComponentWithBuddyBulkLoader extends AbstractLSMDiskComponentBulkLoader { - - protected final IIndexBulkLoader buddyBTreeBulkLoader; - - //with filter - public AbstractLSMDiskComponentWithBuddyBulkLoader(ILSMDiskComponent component, - BloomFilterSpecification bloomFilterSpec, float fillFactor, boolean verifyInput, long numElementsHint, - boolean checkIfEmptyIndex, boolean cleanupEmptyComponent, ILSMComponentFilterManager filterManager, - int[] indexFields, int[] filterFields, MultiComparator filterCmp) throws HyracksDataException { - super(component, bloomFilterSpec, fillFactor, verifyInput, numElementsHint, checkIfEmptyIndex, - cleanupEmptyComponent, filterManager, indexFields, filterFields, filterCmp); - - // BuddyBTree must be created even if it could be empty, - // since without it the component is not considered as valid. - buddyBTreeBulkLoader = - getBuddyBTree(component).createBulkLoader(fillFactor, verifyInput, numElementsHint, checkIfEmptyIndex); - } - - @Override - public void add(ITupleReference tuple) throws HyracksDataException { - try { - ITupleReference t; - if (indexTuple != null) { - indexTuple.reset(tuple); - t = indexTuple; - } else { - t = tuple; - } - - indexBulkLoader.add(t); - - updateFilter(tuple); - } catch (Exception e) { - cleanupArtifacts(); - throw e; - } - if (isEmptyComponent) { - isEmptyComponent = false; - } - } - - @Override - public void delete(ITupleReference tuple) throws HyracksDataException { - try { - ITupleReference t; - if (indexTuple != null) { - indexTuple.reset(tuple); - t = indexTuple; - } else { - t = tuple; - } - - buddyBTreeBulkLoader.add(t); - if (bloomFilterBuilder != null) { - bloomFilterBuilder.add(t); - } - - updateFilter(tuple); - } catch (HyracksDataException e) { - //deleting a key multiple times is OK - if (e.getErrorCode() != ErrorCode.DUPLICATE_KEY) { - cleanupArtifacts(); - throw e; - } - } catch (Exception e) { - cleanupArtifacts(); - throw e; - } - if (isEmptyComponent) { - isEmptyComponent = false; - } - } - - @Override - public void abort() throws HyracksDataException { - super.abort(); - if (buddyBTreeBulkLoader != null) { - buddyBTreeBulkLoader.abort(); - } - } - - @Override - public void end() throws HyracksDataException { - if (!cleanedUpArtifacts) { - if (bloomFilterBuilder != null && !endedBloomFilterLoad) { - bloomFilterBuilder.end(); - endedBloomFilterLoad = true; - } - - //use filter - if (filterManager != null && component.getLSMComponentFilter() != null) { - filterManager.writeFilter(component.getLSMComponentFilter(), getTreeIndex(component)); - } - indexBulkLoader.end(); - buddyBTreeBulkLoader.end(); - - if (isEmptyComponent && cleanupEmptyComponent) { - cleanupArtifacts(); - } - } - } - - @Override - protected void cleanupArtifacts() throws HyracksDataException { - if (!cleanedUpArtifacts) { - cleanedUpArtifacts = true; - if (bloomFilterBuilder != null && !endedBloomFilterLoad) { - bloomFilterBuilder.abort(); - endedBloomFilterLoad = true; - } - getIndex(component).deactivate(); - getIndex(component).destroy(); - - getBuddyBTree(component).deactivate(); - getBuddyBTree(component).destroy(); - - if (bloomFilterBuilder != null) { - getBloomFilter(component).deactivate(); - getBloomFilter(component).destroy(); - } - } - } - - protected abstract ITreeIndex getBuddyBTree(ILSMDiskComponent component); - -} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/77f89525/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndex.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndex.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndex.java index dc64f9b..50c7720 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndex.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndex.java @@ -31,6 +31,7 @@ import java.util.concurrent.atomic.AtomicInteger; import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory; import org.apache.hyracks.api.exceptions.ErrorCode; import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.api.io.FileReference; import org.apache.hyracks.api.io.IIOManager; import org.apache.hyracks.api.replication.IReplicationJob.ReplicationExecutionType; import org.apache.hyracks.api.replication.IReplicationJob.ReplicationOperation; @@ -43,6 +44,7 @@ import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponent; import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponent.ComponentState; import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentFilterFrameFactory; import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent; +import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponentFactory; import org.apache.hyracks.storage.am.lsm.common.api.ILSMHarness; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallback; @@ -90,11 +92,16 @@ public abstract class AbstractLSMIndex implements ILSMIndex { protected final AtomicBoolean[] flushRequests; protected boolean memoryComponentsAllocated = false; protected ITracer tracer; + // Factory for creating on-disk index components during flush and merge. + protected final ILSMDiskComponentFactory componentFactory; + // Factory for creating on-disk index components during bulkload. + protected final ILSMDiskComponentFactory bulkLoadComponentFactory; public AbstractLSMIndex(IIOManager ioManager, List<IVirtualBufferCache> virtualBufferCaches, IBufferCache diskBufferCache, ILSMIndexFileManager fileManager, double bloomFilterFalsePositiveRate, ILSMMergePolicy mergePolicy, ILSMOperationTracker opTracker, ILSMIOOperationScheduler ioScheduler, - ILSMIOOperationCallback ioOpCallback, ILSMComponentFilterFrameFactory filterFrameFactory, + ILSMIOOperationCallback ioOpCallback, ILSMDiskComponentFactory componentFactory, + ILSMDiskComponentFactory bulkLoadComponentFactory, ILSMComponentFilterFrameFactory filterFrameFactory, LSMComponentFilterManager filterManager, int[] filterFields, boolean durable, IComponentFilterHelper filterHelper, int[] treeFields, ITracer tracer) { this.ioManager = ioManager; @@ -104,6 +111,8 @@ public abstract class AbstractLSMIndex implements ILSMIndex { this.bloomFilterFalsePositiveRate = bloomFilterFalsePositiveRate; this.ioScheduler = ioScheduler; this.ioOpCallback = ioOpCallback; + this.componentFactory = componentFactory; + this.bulkLoadComponentFactory = bulkLoadComponentFactory; this.ioOpCallback.setNumOfMutableComponents(virtualBufferCaches.size()); this.filterHelper = filterHelper; this.filterFrameFactory = filterFrameFactory; @@ -127,13 +136,17 @@ public abstract class AbstractLSMIndex implements ILSMIndex { // The constructor used by external indexes public AbstractLSMIndex(IIOManager ioManager, IBufferCache diskBufferCache, ILSMIndexFileManager fileManager, double bloomFilterFalsePositiveRate, ILSMMergePolicy mergePolicy, ILSMOperationTracker opTracker, - ILSMIOOperationScheduler ioScheduler, ILSMIOOperationCallback ioOpCallback, boolean durable) { + ILSMIOOperationScheduler ioScheduler, ILSMIOOperationCallback ioOpCallback, + ILSMDiskComponentFactory componentFactory, ILSMDiskComponentFactory bulkLoadComponentFactory, + boolean durable) { this.ioManager = ioManager; this.diskBufferCache = diskBufferCache; this.fileManager = fileManager; this.bloomFilterFalsePositiveRate = bloomFilterFalsePositiveRate; this.ioScheduler = ioScheduler; this.ioOpCallback = ioOpCallback; + this.componentFactory = componentFactory; + this.bulkLoadComponentFactory = bulkLoadComponentFactory; this.durable = durable; lsmHarness = new ExternalIndexHarness(this, mergePolicy, opTracker, diskBufferCache.isReplicationEnabled()); isActive = false; @@ -169,11 +182,14 @@ public abstract class AbstractLSMIndex implements ILSMIndex { isActive = true; } - protected void loadDiskComponents() throws HyracksDataException { + private void loadDiskComponents() throws HyracksDataException { diskComponents.clear(); List<LSMComponentFileReferences> validFileReferences = fileManager.cleanupAndGetValidFiles(); - for (LSMComponentFileReferences lsmComonentFileReference : validFileReferences) { - ILSMDiskComponent component = loadComponent(lsmComonentFileReference); + for (LSMComponentFileReferences lsmComponentFileReferences : validFileReferences) { + ILSMDiskComponent component = + createDiskComponent(componentFactory, lsmComponentFileReferences.getInsertIndexFileReference(), + lsmComponentFileReferences.getDeleteIndexFileReference(), + lsmComponentFileReferences.getBloomFilterFileReference(), false); diskComponents.add(component); } } @@ -192,7 +208,7 @@ public abstract class AbstractLSMIndex implements ILSMIndex { flushMemoryComponent(); } deactivateDiskComponents(); - deactivateMemoryComponents(); + deallocateMemoryComponents(); isActive = false; } @@ -208,17 +224,16 @@ public abstract class AbstractLSMIndex implements ILSMIndex { } } - protected void deactivateDiskComponents() throws HyracksDataException { - List<ILSMDiskComponent> immutableComponents = diskComponents; - for (ILSMDiskComponent c : immutableComponents) { - deactivateDiskComponent(c); + private void deactivateDiskComponents() throws HyracksDataException { + for (ILSMDiskComponent c : diskComponents) { + c.deactivateAndPurge(); } } - protected void deactivateMemoryComponents() throws HyracksDataException { + private void deallocateMemoryComponents() throws HyracksDataException { if (memoryComponentsAllocated) { for (ILSMMemoryComponent c : memoryComponents) { - deactivateMemoryComponent(c); + c.deallocate(); } memoryComponentsAllocated = false; } @@ -233,10 +248,9 @@ public abstract class AbstractLSMIndex implements ILSMIndex { fileManager.deleteDirs(); } - protected void destroyDiskComponents() throws HyracksDataException { - List<ILSMDiskComponent> immutableComponents = diskComponents; - for (ILSMDiskComponent c : immutableComponents) { - destroyDiskComponent(c); + private void destroyDiskComponents() throws HyracksDataException { + for (ILSMDiskComponent c : diskComponents) { + c.destroy(); } } @@ -245,26 +259,30 @@ public abstract class AbstractLSMIndex implements ILSMIndex { if (!isActive) { throw HyracksDataException.create(ErrorCode.CANNOT_CLEAR_INACTIVE_INDEX); } - clearMemoryComponents(); - clearDiskComponents(); + resetMemoryComponents(); + deactivateAndDestroyDiskComponents(); } - private void clearDiskComponents() throws HyracksDataException { + private void deactivateAndDestroyDiskComponents() throws HyracksDataException { for (ILSMDiskComponent c : diskComponents) { - clearDiskComponent(c); + c.deactivateAndDestroy(); } diskComponents.clear(); } - protected void clearMemoryComponents() throws HyracksDataException { - if (memoryComponentsAllocated) { + private void resetMemoryComponents() throws HyracksDataException { + if (memoryComponentsAllocated && memoryComponents != null) { for (ILSMMemoryComponent c : memoryComponents) { - clearMemoryComponent(c); + c.reset(); } } } @Override + public void purge() throws HyracksDataException { + } + + @Override public void getOperationalComponents(ILSMIndexOperationContext ctx) throws HyracksDataException { List<ILSMDiskComponent> immutableComponents = diskComponents; List<ILSMComponent> operationalComponents = ctx.getComponentHolder(); @@ -376,16 +394,37 @@ public abstract class AbstractLSMIndex implements ILSMIndex { return createBulkLoader(fillLevel, verifyInput, numElementsHint); } + public IIndexBulkLoader createBulkLoader(float fillLevel, boolean verifyInput, long numElementsHint) + throws HyracksDataException { + return new LSMIndexDiskComponentBulkLoader(this, fillLevel, verifyInput, numElementsHint); + } + + @Override + public ILSMDiskComponent createBulkLoadTarget() throws HyracksDataException { + LSMComponentFileReferences componentFileRefs = fileManager.getRelFlushFileReference(); + return createDiskComponent(bulkLoadComponentFactory, componentFileRefs.getInsertIndexFileReference(), + componentFileRefs.getDeleteIndexFileReference(), componentFileRefs.getBloomFilterFileReference(), true); + } + + protected ILSMDiskComponent createDiskComponent(ILSMDiskComponentFactory factory, FileReference insertFileReference, + FileReference deleteIndexFileReference, FileReference bloomFilterFileRef, boolean createComponent) + throws HyracksDataException { + ILSMDiskComponent component = factory.createComponent(this, + new LSMComponentFileReferences(insertFileReference, deleteIndexFileReference, bloomFilterFileRef)); + component.activate(createComponent); + return component; + } + @Override public final synchronized void allocateMemoryComponents() throws HyracksDataException { if (!isActive) { throw HyracksDataException.create(ErrorCode.CANNOT_ALLOCATE_MEMORY_FOR_INACTIVE_INDEX); } - if (memoryComponentsAllocated) { + if (memoryComponentsAllocated || memoryComponents == null) { return; } for (ILSMMemoryComponent c : memoryComponents) { - allocateMemoryComponent(c); + c.allocate(); } memoryComponentsAllocated = true; } @@ -410,7 +449,7 @@ public abstract class AbstractLSMIndex implements ILSMIndex { @Override public void changeMutableComponent() { currentMutableComponentId.set((currentMutableComponentId.get() + 1) % memoryComponents.size()); - memoryComponents.get(currentMutableComponentId.get()).activate(); + memoryComponents.get(currentMutableComponentId.get()).requestActivation(); } @Override @@ -507,8 +546,8 @@ public abstract class AbstractLSMIndex implements ILSMIndex { Set<String> componentFiles = new HashSet<>(); //get set of files to be replicated for each component - for (ILSMComponent lsmComponent : lsmComponents) { - componentFiles.addAll(getLSMComponentPhysicalFiles(lsmComponent)); + for (ILSMDiskComponent lsmComponent : lsmComponents) { + componentFiles.addAll(lsmComponent.getLSMComponentPhysicalFiles()); } ReplicationExecutionType executionType; @@ -595,11 +634,11 @@ public abstract class AbstractLSMIndex implements ILSMIndex { public final void validate() throws HyracksDataException { if (memoryComponentsAllocated) { for (ILSMMemoryComponent c : memoryComponents) { - validateMemoryComponent(c); + c.validate(); } } for (ILSMDiskComponent c : diskComponents) { - validateDiskComponent(c); + c.validate(); } } @@ -607,7 +646,7 @@ public abstract class AbstractLSMIndex implements ILSMIndex { public long getMemoryAllocationSize() { long size = 0; for (ILSMMemoryComponent c : memoryComponents) { - size += getMemoryComponentSize(c); + size += c.getSize(); } return size; } @@ -628,31 +667,6 @@ public abstract class AbstractLSMIndex implements ILSMIndex { : doMerge(operation); } - public abstract Set<String> getLSMComponentPhysicalFiles(ILSMComponent newComponent); - - protected abstract void allocateMemoryComponent(ILSMMemoryComponent c) throws HyracksDataException; - - protected abstract IIndexBulkLoader createBulkLoader(float fillLevel, boolean verifyInput, long numElementsHint) - throws HyracksDataException; - - protected abstract ILSMDiskComponent loadComponent(LSMComponentFileReferences refs) throws HyracksDataException; - - protected abstract void deactivateMemoryComponent(ILSMMemoryComponent c) throws HyracksDataException; - - protected abstract void deactivateDiskComponent(ILSMDiskComponent c) throws HyracksDataException; - - protected abstract void destroyDiskComponent(ILSMDiskComponent c) throws HyracksDataException; - - protected abstract void clearDiskComponent(ILSMDiskComponent c) throws HyracksDataException; - - protected abstract void clearMemoryComponent(ILSMMemoryComponent c) throws HyracksDataException; - - protected abstract void validateMemoryComponent(ILSMMemoryComponent c) throws HyracksDataException; - - protected abstract void validateDiskComponent(ILSMDiskComponent c) throws HyracksDataException; - - protected abstract long getMemoryComponentSize(ILSMMemoryComponent c); - protected abstract LSMComponentFileReferences getMergeFileReferences(ILSMDiskComponent firstComponent, ILSMDiskComponent lastComponent) throws HyracksDataException; http://git-wip-us.apache.org/repos/asf/asterixdb/blob/77f89525/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMMemoryComponent.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMMemoryComponent.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMMemoryComponent.java index 1ee68d9..a4fe35c 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMMemoryComponent.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMMemoryComponent.java @@ -25,6 +25,7 @@ import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentFilter; import org.apache.hyracks.storage.am.lsm.common.api.ILSMMemoryComponent; import org.apache.hyracks.storage.am.lsm.common.api.IVirtualBufferCache; import org.apache.hyracks.storage.am.lsm.common.api.LSMOperationType; +import org.apache.hyracks.storage.common.buffercache.IBufferCache; public abstract class AbstractLSMMemoryComponent extends AbstractLSMComponent implements ILSMMemoryComponent { @@ -178,7 +179,7 @@ public abstract class AbstractLSMMemoryComponent extends AbstractLSMComponent im } @Override - public void activate() { + public void requestActivation() { requestedToBeActive = true; } @@ -198,12 +199,20 @@ public abstract class AbstractLSMMemoryComponent extends AbstractLSMComponent im } @Override - public void reset() throws HyracksDataException { + public final void reset() throws HyracksDataException { isModified.set(false); metadata.reset(); if (filter != null) { filter.reset(); } + doReset(); + } + + protected void doReset() throws HyracksDataException { + getIndex().deactivate(); + getIndex().destroy(); + getIndex().create(); + getIndex().activate(); } @Override @@ -215,4 +224,37 @@ public abstract class AbstractLSMMemoryComponent extends AbstractLSMComponent im public MemoryComponentMetadata getMetadata() { return metadata; } + + @Override + public final void allocate() throws HyracksDataException { + ((IVirtualBufferCache) getIndex().getBufferCache()).open(); + doAllocate(); + } + + protected void doAllocate() throws HyracksDataException { + getIndex().create(); + getIndex().activate(); + } + + @Override + public final void deallocate() throws HyracksDataException { + doDeallocate(); + getIndex().getBufferCache().close(); + } + + protected void doDeallocate() throws HyracksDataException { + getIndex().deactivate(); + getIndex().destroy(); + } + + @Override + public void validate() throws HyracksDataException { + getIndex().validate(); + } + + @Override + public long getSize() { + IBufferCache virtualBufferCache = getIndex().getBufferCache(); + return virtualBufferCache.getNumPages() * (long) virtualBufferCache.getPageSize(); + } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/77f89525/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/BloomFilterBulkLoader.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/BloomFilterBulkLoader.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/BloomFilterBulkLoader.java new file mode 100644 index 0000000..0dcf349 --- /dev/null +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/BloomFilterBulkLoader.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hyracks.storage.am.lsm.common.impls; + +import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference; +import org.apache.hyracks.storage.common.IIndexBulkLoader; + +public class BloomFilterBulkLoader implements IChainedComponentBulkLoader { + + private final IIndexBulkLoader bulkLoader; + + public BloomFilterBulkLoader(IIndexBulkLoader bulkLoader) { + this.bulkLoader = bulkLoader; + } + + private boolean endedBloomFilterLoad = false; + + @Override + public ITupleReference add(ITupleReference tuple) throws HyracksDataException { + bulkLoader.add(tuple); + return tuple; + } + + @Override + public ITupleReference delete(ITupleReference tuple) throws HyracksDataException { + //Noop + return tuple; + } + + @Override + public void end() throws HyracksDataException { + if (!endedBloomFilterLoad) { + bulkLoader.end(); + endedBloomFilterLoad = true; + } + } + + @Override + public void abort() throws HyracksDataException { + bulkLoader.abort(); + } + + @Override + public void cleanupArtifacts() throws HyracksDataException { + if (!endedBloomFilterLoad) { + bulkLoader.abort(); + endedBloomFilterLoad = true; + } + } +} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/77f89525/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/ChainedLSMDiskComponentBulkLoader.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/ChainedLSMDiskComponentBulkLoader.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/ChainedLSMDiskComponentBulkLoader.java new file mode 100644 index 0000000..f38614c --- /dev/null +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/ChainedLSMDiskComponentBulkLoader.java @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hyracks.storage.am.lsm.common.impls; + +import java.util.LinkedList; +import java.util.List; + +import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference; +import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent; +import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponentBulkLoader; + +/** + * Class encapsulates a chain of operations, happening during an LSM disk component bulkload + */ +public class ChainedLSMDiskComponentBulkLoader implements ILSMDiskComponentBulkLoader { + + private List<IChainedComponentBulkLoader> bulkloaderChain = new LinkedList<>(); + private boolean isEmptyComponent = true; + private boolean cleanedUpArtifacts = false; + private final ILSMDiskComponent diskComponent; + private final boolean cleanupEmptyComponent; + + public ChainedLSMDiskComponentBulkLoader(ILSMDiskComponent diskComponent, boolean cleanupEmptyComponent) { + this.diskComponent = diskComponent; + this.cleanupEmptyComponent = cleanupEmptyComponent; + } + + public void addBulkLoader(IChainedComponentBulkLoader bulkloader) { + bulkloaderChain.add(bulkloader); + } + + @Override + public void add(ITupleReference tuple) throws HyracksDataException { + try { + ITupleReference t = tuple; + for (IChainedComponentBulkLoader lsmBulkloader : bulkloaderChain) { + t = lsmBulkloader.add(t); + } + } catch (Exception e) { + cleanupArtifacts(); + throw e; + } + if (isEmptyComponent) { + isEmptyComponent = false; + } + } + + @Override + public void delete(ITupleReference tuple) throws HyracksDataException { + try { + ITupleReference t = tuple; + for (IChainedComponentBulkLoader lsmOperation : bulkloaderChain) { + t = lsmOperation.delete(t); + } + } catch (Exception e) { + cleanupArtifacts(); + throw e; + } + if (isEmptyComponent) { + isEmptyComponent = false; + } + } + + @Override + public void cleanupArtifacts() throws HyracksDataException { + if (!cleanedUpArtifacts) { + cleanedUpArtifacts = true; + for (IChainedComponentBulkLoader lsmOperation : bulkloaderChain) { + lsmOperation.cleanupArtifacts(); + } + } + diskComponent.deactivateAndDestroy(); + } + + @Override + public void end() throws HyracksDataException { + if (!cleanedUpArtifacts) { + for (IChainedComponentBulkLoader lsmOperation : bulkloaderChain) { + lsmOperation.end(); + } + if (isEmptyComponent && cleanupEmptyComponent) { + cleanupArtifacts(); + } + } + } + + @Override + public void abort() throws HyracksDataException { + for (IChainedComponentBulkLoader lsmOperation : bulkloaderChain) { + lsmOperation.abort(); + } + } +} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/77f89525/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/EmptyComponent.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/EmptyComponent.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/EmptyComponent.java index 0134dca..f2751bf 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/EmptyComponent.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/EmptyComponent.java @@ -18,12 +18,17 @@ */ package org.apache.hyracks.storage.am.lsm.common.impls; +import java.util.Collections; +import java.util.Set; + import org.apache.hyracks.api.exceptions.ErrorCode; import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.storage.am.common.api.ITreeIndex; import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentFilter; import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent; import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponentId; import org.apache.hyracks.storage.am.lsm.common.api.LSMOperationType; +import org.apache.hyracks.storage.common.IIndex; public class EmptyComponent implements ILSMDiskComponent { public static final EmptyComponent INSTANCE = new EmptyComponent(); @@ -53,6 +58,11 @@ public class EmptyComponent implements ILSMDiskComponent { } @Override + public IIndex getIndex() { + return null; + } + + @Override public DiskComponentMetadata getMetadata() { return EmptyDiskComponentMetadata.INSTANCE; } @@ -78,8 +88,65 @@ public class EmptyComponent implements ILSMDiskComponent { } @Override + public AbstractLSMIndex getLsmIndex() { + return null; + } + + @Override + public ITreeIndex getMetadataHolder() { + return null; + } + + @Override + public Set<String> getLSMComponentPhysicalFiles() { + return Collections.emptySet(); + } + + @Override public void markAsValid(boolean persist) throws HyracksDataException { // No Op } + @Override + public void activate(boolean createNewComponent) throws HyracksDataException { + // No Op + } + + @Override + public void deactivateAndDestroy() throws HyracksDataException { + // No Op + } + + @Override + public void deactivate() throws HyracksDataException { + // No Op + } + + @Override + public void deactivateAndPurge() throws HyracksDataException { + // No Op + } + + @Override + public void validate() throws HyracksDataException { + // No Op + } + + @Override + public IChainedComponentBulkLoader createFilterBulkLoader() throws HyracksDataException { + return null; + } + + @Override + public IChainedComponentBulkLoader createIndexBulkLoader(float fillFactor, boolean verifyInput, + long numElementsHint, boolean checkIfEmptyIndex) throws HyracksDataException { + return null; + } + + @Override + public ChainedLSMDiskComponentBulkLoader createBulkLoader(float fillFactor, boolean verifyInput, + long numElementsHint, boolean checkIfEmptyIndex, boolean withFilter, boolean cleanupEmptyComponent) + throws HyracksDataException { + return null; + } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/77f89525/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/ExternalIndexHarness.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/ExternalIndexHarness.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/ExternalIndexHarness.java index 2f65b18..11f2441 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/ExternalIndexHarness.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/ExternalIndexHarness.java @@ -135,7 +135,7 @@ public class ExternalIndexHarness extends LSMHarness { lsmIndex.scheduleReplication(null, componentsToBeReplicated, false, ReplicationOperation.DELETE, opType); } - ((ILSMDiskComponent) c).destroy(); + ((ILSMDiskComponent) c).deactivateAndDestroy(); break; default: break; @@ -348,7 +348,7 @@ public class ExternalIndexHarness extends LSMHarness { componentsToBeReplicated.add(diskComponent); lsmIndex.scheduleReplication(null, componentsToBeReplicated, false, ReplicationOperation.DELETE, null); } - diskComponent.destroy(); + diskComponent.deactivateAndDestroy(); } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/77f89525/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/FilterBulkLoader.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/FilterBulkLoader.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/FilterBulkLoader.java new file mode 100644 index 0000000..7359d2b --- /dev/null +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/FilterBulkLoader.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hyracks.storage.am.lsm.common.impls; + +import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference; +import org.apache.hyracks.storage.am.common.api.ITreeIndex; +import org.apache.hyracks.storage.am.common.tuples.PermutingTupleReference; +import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentFilter; +import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentFilterManager; +import org.apache.hyracks.storage.common.MultiComparator; + +public class FilterBulkLoader implements IChainedComponentBulkLoader { + + private final ILSMComponentFilter filter; + private final ITreeIndex treeIndex; + protected final ILSMComponentFilterManager filterManager; + protected final PermutingTupleReference indexTuple; + protected final PermutingTupleReference filterTuple; + protected final MultiComparator filterCmp; + + public FilterBulkLoader(ILSMComponentFilter filter, ITreeIndex treeIndex, + ILSMComponentFilterManager filterManager, int[] indexFields, int[] filterFields, + MultiComparator filterCmp) { + this.filter = filter; + this.treeIndex = treeIndex; + this.filterManager = filterManager; + this.indexTuple = new PermutingTupleReference(indexFields); + this.filterTuple = new PermutingTupleReference(filterFields); + this.filterCmp = filterCmp; + } + + @Override + public ITupleReference delete(ITupleReference tuple) throws HyracksDataException { + indexTuple.reset(tuple); + updateFilter(tuple); + return indexTuple; + } + + @Override + public void cleanupArtifacts() throws HyracksDataException { + //Noop + } + + @Override + public ITupleReference add(ITupleReference tuple) throws HyracksDataException { + indexTuple.reset(tuple); + updateFilter(tuple); + return indexTuple; + } + + @Override + public void end() throws HyracksDataException { + filterManager.writeFilter(filter, treeIndex); + } + + @Override + public void abort() throws HyracksDataException { + //Noop + } + + private void updateFilter(ITupleReference tuple) throws HyracksDataException { + filterTuple.reset(tuple); + filter.update(filterTuple, filterCmp); + } +} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/77f89525/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/IChainedComponentBulkLoader.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/IChainedComponentBulkLoader.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/IChainedComponentBulkLoader.java new file mode 100644 index 0000000..90ef127 --- /dev/null +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/IChainedComponentBulkLoader.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hyracks.storage.am.lsm.common.impls; + +import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference; + +public interface IChainedComponentBulkLoader { + /** + * Adds a tuple to the bulkloaded component + * + * @param tuple + * @return Potentially modified tuple, which is used as an input for downstream bulkloaders + * @throws HyracksDataException + */ + ITupleReference add(ITupleReference tuple) throws HyracksDataException; + + /** + * Deletes a tuple (i.e. appends anti-matter tuple or deleted-key tuple) from the bulkloaded component + * + * @param tuple + * @return Potentially modified tuple, which is used as an input for downstream bulkloaders + * @throws HyracksDataException + */ + ITupleReference delete(ITupleReference tuple) throws HyracksDataException; + + /** + * Correctly finalizes bulkloading process and releases all resources + * + * @throws HyracksDataException + */ + void end() throws HyracksDataException; + + /** + * Aborts bulkloading process without releasing associated resources + * + * @throws HyracksDataException + */ + void abort() throws HyracksDataException; + + /** + * Releases all resources allocated during the bulkloading process + * + * @throws HyracksDataException + */ + void cleanupArtifacts() throws HyracksDataException; +} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/77f89525/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/IndexWithBuddyBulkLoader.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/IndexWithBuddyBulkLoader.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/IndexWithBuddyBulkLoader.java new file mode 100644 index 0000000..4fb2919 --- /dev/null +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/IndexWithBuddyBulkLoader.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hyracks.storage.am.lsm.common.impls; + +import org.apache.hyracks.api.exceptions.ErrorCode; +import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference; +import org.apache.hyracks.storage.common.IIndexBulkLoader; + +public class IndexWithBuddyBulkLoader implements IChainedComponentBulkLoader { + + private final IIndexBulkLoader bulkLoader; + private final IIndexBulkLoader buddyBTreeBulkLoader; + + public IndexWithBuddyBulkLoader(IIndexBulkLoader bulkLoader, IIndexBulkLoader buddyBTreeBulkLoader) { + this.bulkLoader = bulkLoader; + this.buddyBTreeBulkLoader = buddyBTreeBulkLoader; + } + + @Override + public ITupleReference delete(ITupleReference tuple) throws HyracksDataException { + try { + buddyBTreeBulkLoader.add(tuple); + } catch (HyracksDataException e) { + //deleting a key multiple times is OK + if (e.getErrorCode() != ErrorCode.DUPLICATE_KEY) { + cleanupArtifacts(); + throw e; + } + } + return tuple; + } + + @Override + public void cleanupArtifacts() throws HyracksDataException { + //Noop + } + + @Override + public ITupleReference add(ITupleReference tuple) throws HyracksDataException { + bulkLoader.add(tuple); + return tuple; + } + + @Override + public void end() throws HyracksDataException { + bulkLoader.end(); + buddyBTreeBulkLoader.end(); + } + + @Override + public void abort() throws HyracksDataException { + bulkLoader.abort(); + buddyBTreeBulkLoader.abort(); + } +} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/77f89525/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java index 1bdd250..c8be9b9 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java @@ -356,8 +356,8 @@ public class LSMHarness implements ILSMHarness { lsmIndex.scheduleReplication(null, inactiveDiskComponentsToBeDeleted, false, ReplicationOperation.DELETE, opType); } - for (ILSMComponent c : inactiveDiskComponentsToBeDeleted) { - ((AbstractLSMDiskComponent) c).destroy(); + for (ILSMDiskComponent c : inactiveDiskComponentsToBeDeleted) { + c.deactivateAndDestroy(); } } catch (Throwable e) { if (LOGGER.isLoggable(Level.WARNING)) { http://git-wip-us.apache.org/repos/asf/asterixdb/blob/77f89525/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMIndexBulkLoader.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMIndexBulkLoader.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMIndexBulkLoader.java new file mode 100644 index 0000000..84857f4 --- /dev/null +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMIndexBulkLoader.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hyracks.storage.am.lsm.common.impls; + +import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference; +import org.apache.hyracks.storage.am.common.impls.AbstractTreeIndex.AbstractTreeIndexBulkLoader; +import org.apache.hyracks.storage.am.lsm.common.api.ILSMTreeTupleWriter; +import org.apache.hyracks.storage.common.IIndexBulkLoader; + +public class LSMIndexBulkLoader implements IChainedComponentBulkLoader { + private final IIndexBulkLoader bulkLoader; + + public LSMIndexBulkLoader(IIndexBulkLoader bulkLoader) { + this.bulkLoader = bulkLoader; + } + + @Override + public ITupleReference delete(ITupleReference tuple) throws HyracksDataException { + ILSMTreeTupleWriter tupleWriter = + (ILSMTreeTupleWriter) ((AbstractTreeIndexBulkLoader) bulkLoader).getLeafFrame().getTupleWriter(); + tupleWriter.setAntimatter(true); + try { + bulkLoader.add(tuple); + } finally { + tupleWriter.setAntimatter(false); + } + return tuple; + } + + @Override + public void cleanupArtifacts() throws HyracksDataException { + //Noop + } + + @Override + public ITupleReference add(ITupleReference tuple) throws HyracksDataException { + bulkLoader.add(tuple); + return tuple; + } + + @Override + public void end() throws HyracksDataException { + bulkLoader.end(); + } + + @Override + public void abort() throws HyracksDataException { + bulkLoader.abort(); + } +} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/77f89525/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMIndexDiskComponentBulkLoader.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMIndexDiskComponentBulkLoader.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMIndexDiskComponentBulkLoader.java new file mode 100644 index 0000000..8befee1 --- /dev/null +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMIndexDiskComponentBulkLoader.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hyracks.storage.am.lsm.common.impls; + +import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference; +import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent; +import org.apache.hyracks.storage.am.lsm.common.api.LSMOperationType; +import org.apache.hyracks.storage.common.IIndexBulkLoader; + +public class LSMIndexDiskComponentBulkLoader implements IIndexBulkLoader { + private final AbstractLSMIndex lsmIndex; + private final ILSMDiskComponent component; + private final IIndexBulkLoader componentBulkLoader; + + public LSMIndexDiskComponentBulkLoader(AbstractLSMIndex lsmIndex, float fillFactor, boolean verifyInput, + long numElementsHint) throws HyracksDataException { + this.lsmIndex = lsmIndex; + // Note that by using a flush target file name, we state that the + // new bulk loaded component is "newer" than any other merged component. + this.component = lsmIndex.createBulkLoadTarget(); + this.componentBulkLoader = + component.createBulkLoader(fillFactor, verifyInput, numElementsHint, false, true, true); + } + + @Override + public void add(ITupleReference tuple) throws HyracksDataException { + componentBulkLoader.add(tuple); + } + + @Override + public void end() throws HyracksDataException { + componentBulkLoader.end(); + if (component.getComponentSize() > 0) { + //TODO(amoudi): Ensure Bulk load follow the same lifecycle Other Operations (Flush, Merge, etc). + //then after operation should be called from harness as well + //https://issues.apache.org/jira/browse/ASTERIXDB-1764 + lsmIndex.getIOOperationCallback().afterOperation(LSMOperationType.FLUSH, null, component); + lsmIndex.getLsmHarness().addBulkLoadedComponent(component); + } + } + + @Override + public void abort() throws HyracksDataException { + componentBulkLoader.abort(); + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/asterixdb/blob/77f89525/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/api/IInPlaceInvertedIndex.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/api/IInPlaceInvertedIndex.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/api/IInPlaceInvertedIndex.java index a26e14c..6f583b4 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/api/IInPlaceInvertedIndex.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/api/IInPlaceInvertedIndex.java @@ -41,13 +41,4 @@ public interface IInPlaceInvertedIndex extends IInvertedIndex { */ void openInvertedListCursor(IInvertedListCursor listCursor, ITupleReference searchKey, IIndexOperationContext ictx) throws HyracksDataException; - - /** - * Purge the index files out of the buffer cache. - * Can only be called if the caller is absolutely sure the files don't contain dirty pages - * - * @throws HyracksDataException - * if the index is active - */ - void purge() throws HyracksDataException; }
