http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e5a65429/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/AlgebricksMetaOperatorDescriptor.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/AlgebricksMetaOperatorDescriptor.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/AlgebricksMetaOperatorDescriptor.java index f6ebf19..25a4aea 100644 --- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/AlgebricksMetaOperatorDescriptor.java +++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/AlgebricksMetaOperatorDescriptor.java @@ -78,33 +78,48 @@ public class AlgebricksMetaOperatorDescriptor extends AbstractSingleActivityOper public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx, final IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) { if (inputArity == 0) { - return createSourceInputPushRuntime(ctx); + return new SourcePushRuntime(ctx); } else { return createOneInputOneOutputPushRuntime(ctx, recordDescProvider); } } - private IOperatorNodePushable createSourceInputPushRuntime(final IHyracksTaskContext ctx) { - return new AbstractUnaryOutputSourceOperatorNodePushable() { + private class SourcePushRuntime extends AbstractUnaryOutputSourceOperatorNodePushable { + private final IHyracksTaskContext ctx; - @Override - public void initialize() throws HyracksDataException { - IFrameWriter startOfPipeline; - RecordDescriptor pipelineOutputRecordDescriptor = - outputArity > 0 ? AlgebricksMetaOperatorDescriptor.this.outRecDescs[0] : null; - PipelineAssembler pa = - new PipelineAssembler(pipeline, inputArity, outputArity, null, pipelineOutputRecordDescriptor); - startOfPipeline = pa.assemblePipeline(writer, ctx); + public SourcePushRuntime(IHyracksTaskContext ctx) { + this.ctx = ctx; + } + + @Override + public void initialize() throws HyracksDataException { + IFrameWriter startOfPipeline; + RecordDescriptor pipelineOutputRecordDescriptor = + outputArity > 0 ? AlgebricksMetaOperatorDescriptor.this.outRecDescs[0] : null; + PipelineAssembler pa = + new PipelineAssembler(pipeline, inputArity, outputArity, null, pipelineOutputRecordDescriptor); + startOfPipeline = pa.assemblePipeline(writer, ctx); + HyracksDataException exception = null; + try { + startOfPipeline.open(); + } catch (Exception e) { + startOfPipeline.fail(); + exception = HyracksDataException.create(e); + } finally { try { - startOfPipeline.open(); - } catch (Exception e) { - startOfPipeline.fail(); - throw e; - } finally { startOfPipeline.close(); + } catch (Exception e) { + if (exception == null) { + exception = HyracksDataException.create(e); + } else { + exception.addSuppressed(e); + } } } - }; + if (exception != null) { + throw exception; + } + } } private IOperatorNodePushable createOneInputOneOutputPushRuntime(final IHyracksTaskContext ctx,
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e5a65429/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServer.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServer.java b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServer.java index d64858b..71ddbc0 100644 --- a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServer.java +++ b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServer.java @@ -49,8 +49,8 @@ public class HttpServer { // Constants private static final int LOW_WRITE_BUFFER_WATER_MARK = 8 * 1024; private static final int HIGH_WRITE_BUFFER_WATER_MARK = 32 * 1024; - protected static final WriteBufferWaterMark WRITE_BUFFER_WATER_MARK = new WriteBufferWaterMark( - LOW_WRITE_BUFFER_WATER_MARK, HIGH_WRITE_BUFFER_WATER_MARK); + protected static final WriteBufferWaterMark WRITE_BUFFER_WATER_MARK = + new WriteBufferWaterMark(LOW_WRITE_BUFFER_WATER_MARK, HIGH_WRITE_BUFFER_WATER_MARK); protected static final int RECEIVE_BUFFER_SIZE = 4096; protected static final int DEFAULT_NUM_EXECUTOR_THREADS = 16; protected static final int DEFAULT_REQUEST_QUEUE_SIZE = 256; @@ -92,8 +92,8 @@ public class HttpServer { long directMemoryBudget = numExecutorThreads * (long) HIGH_WRITE_BUFFER_WATER_MARK + numExecutorThreads * HttpServerInitializer.RESPONSE_CHUNK_SIZE; LOGGER.log(Level.INFO, "The output direct memory budget for this server is " + directMemoryBudget + " bytes"); - long inputBudgetEstimate = (long) HttpServerInitializer.MAX_REQUEST_INITIAL_LINE_LENGTH - * (requestQueueSize + numExecutorThreads); + long inputBudgetEstimate = + (long) HttpServerInitializer.MAX_REQUEST_INITIAL_LINE_LENGTH * (requestQueueSize + numExecutorThreads); inputBudgetEstimate = inputBudgetEstimate * 2; LOGGER.log(Level.INFO, "The \"estimated\" input direct memory budget for this server is " + inputBudgetEstimate + " bytes"); @@ -111,7 +111,7 @@ public class HttpServer { doStart(); setStarted(); } catch (Throwable e) { // NOSONAR - LOGGER.log(Level.SEVERE, "Failure starting an Http Server", e); + LOGGER.log(Level.SEVERE, "Failure starting an Http Server with port: " + port, e); setFailed(e); throw e; } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e5a65429/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/ExternalBTreeLocalResource.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/ExternalBTreeLocalResource.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/ExternalBTreeLocalResource.java index a859f68..673bd3b 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/ExternalBTreeLocalResource.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/ExternalBTreeLocalResource.java @@ -62,6 +62,6 @@ public class ExternalBTreeLocalResource extends LSMBTreeLocalResource { cmpFactories, bloomFilterKeyFields, bloomFilterFalsePositiveRate, mergePolicyFactory.createMergePolicy(mergePolicyProperties, serviceCtx), opTrackerProvider.getOperationTracker(serviceCtx), ioSchedulerProvider.getIoScheduler(serviceCtx), - ioOpCallbackFactory, durable, metadataPageManagerFactory); + ioOpCallbackFactory, durable, metadataPageManagerFactory, serviceCtx.getTracer()); } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e5a65429/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/ExternalBTreeWithBuddyLocalResource.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/ExternalBTreeWithBuddyLocalResource.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/ExternalBTreeWithBuddyLocalResource.java index 9422253..7e44c63 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/ExternalBTreeWithBuddyLocalResource.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/ExternalBTreeWithBuddyLocalResource.java @@ -65,6 +65,6 @@ public class ExternalBTreeWithBuddyLocalResource extends LSMBTreeLocalResource { typeTraits, cmpFactories, bloomFilterFalsePositiveRate, mergePolicyFactory.createMergePolicy(mergePolicyProperties, serviceCtx), opTrackerProvider.getOperationTracker(serviceCtx), ioSchedulerProvider.getIoScheduler(serviceCtx), - ioOpCallbackFactory, bloomFilterKeyFields, durable, metadataPageManagerFactory); + ioOpCallbackFactory, bloomFilterKeyFields, durable, metadataPageManagerFactory, serviceCtx.getTracer()); } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e5a65429/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/ExternalBTree.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/ExternalBTree.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/ExternalBTree.java index 7dc5939..5b6ff9e 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/ExternalBTree.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/ExternalBTree.java @@ -62,6 +62,7 @@ import org.apache.hyracks.storage.common.IIndexCursor; import org.apache.hyracks.storage.common.ISearchOperationCallback; import org.apache.hyracks.storage.common.ISearchPredicate; import org.apache.hyracks.storage.common.buffercache.IBufferCache; +import org.apache.hyracks.util.trace.ITracer; /** * This is an lsm b-tree that does not have memory component and is modified @@ -94,18 +95,18 @@ public class ExternalBTree extends LSMBTree implements ITwoPCIndex { ILSMDiskComponentFactory bulkLoadComponentFactory, ILSMDiskComponentFactory transactionComponentFactory, double bloomFilterFalsePositiveRate, IBinaryComparatorFactory[] cmpFactories, ILSMMergePolicy mergePolicy, ILSMOperationTracker opTracker, ILSMIOOperationScheduler ioScheduler, - ILSMIOOperationCallbackFactory ioOpCallbackFactory, boolean durable) { + ILSMIOOperationCallbackFactory ioOpCallbackFactory, boolean durable, ITracer tracer) { super(ioManager, insertLeafFrameFactory, deleteLeafFrameFactory, bufferCache, fileManager, componentFactory, bulkLoadComponentFactory, bloomFilterFalsePositiveRate, cmpFactories, mergePolicy, opTracker, - ioScheduler, ioOpCallbackFactory, false, durable); + ioScheduler, ioOpCallbackFactory, false, durable, tracer); this.transactionComponentFactory = transactionComponentFactory; this.secondDiskComponents = new LinkedList<>(); this.interiorFrameFactory = interiorFrameFactory; } @Override - public ExternalIndexHarness getLsmHarness() { - return (ExternalIndexHarness) super.getLsmHarness(); + public ExternalIndexHarness getHarness() { + return (ExternalIndexHarness) super.getHarness(); } // The subsume merged components is overridden to account for: @@ -162,9 +163,9 @@ public class ExternalBTree extends LSMBTree implements ITwoPCIndex { // This method creates the appropriate opContext for the targeted version public ExternalBTreeOpContext createOpContext(ISearchOperationCallback searchCallback, int targetVersion) { - return new ExternalBTreeOpContext(insertLeafFrameFactory, deleteLeafFrameFactory, searchCallback, + return new ExternalBTreeOpContext(this, insertLeafFrameFactory, deleteLeafFrameFactory, searchCallback, ((LSMBTreeWithBloomFilterDiskComponentFactory) componentFactory).getBloomFilterKeyFields().length, - cmpFactories, targetVersion, getLsmHarness()); + cmpFactories, targetVersion, getHarness(), tracer); } // The only reason to override the following method is that it uses a different context object @@ -194,7 +195,7 @@ public class ExternalBTree extends LSMBTree implements ITwoPCIndex { FileReference lastFile = lastBTree.getFileReference(); LSMComponentFileReferences relMergeFileRefs = fileManager.getRelMergeFileReference(firstFile.getFile().getName(), lastFile.getFile().getName()); - ILSMIndexAccessor accessor = new LSMTreeIndexAccessor(getLsmHarness(), opCtx, cursorFactory); + ILSMIndexAccessor accessor = new LSMTreeIndexAccessor(getHarness(), opCtx, cursorFactory); ioScheduler.scheduleOperation(new LSMBTreeMergeOperation(accessor, cursor, relMergeFileRefs.getInsertIndexFileReference(), relMergeFileRefs.getBloomFilterFileReference(), callback, fileManager.getBaseDir().getAbsolutePath())); @@ -259,7 +260,7 @@ public class ExternalBTree extends LSMBTree implements ITwoPCIndex { diskComponents.add(component); secondDiskComponents.add(component); } - getLsmHarness().indexFirstTimeActivated(); + getHarness().indexFirstTimeActivated(); } else { // This index has been opened before for (ILSMDiskComponent c : diskComponents) { @@ -309,7 +310,7 @@ public class ExternalBTree extends LSMBTree implements ITwoPCIndex { if (!isActive) { throw new HyracksDataException("Failed to clear the index since it is not activated."); } - getLsmHarness().indexClear(); + getHarness().indexClear(); for (ILSMDiskComponent c : diskComponents) { c.deactivateAndDestroy(); @@ -451,7 +452,7 @@ public class ExternalBTree extends LSMBTree implements ITwoPCIndex { component.markAsValid(durable); component.deactivate(); } else { - getLsmHarness().addBulkLoadedComponent(component); + getHarness().addBulkLoadedComponent(component); } } } @@ -490,14 +491,14 @@ public class ExternalBTree extends LSMBTree implements ITwoPCIndex { @Override public ILSMIndexAccessor createAccessor(IIndexAccessParameters iap) { ExternalBTreeOpContext opCtx = createOpContext(iap.getSearchOperationCallback(), version); - return new LSMTreeIndexAccessor(getLsmHarness(), opCtx, cursorFactory); + return new LSMTreeIndexAccessor(getHarness(), opCtx, cursorFactory); } @Override public ILSMIndexAccessor createAccessor(ISearchOperationCallback searchCallback, int targetIndexVersion) throws HyracksDataException { ExternalBTreeOpContext opCtx = createOpContext(searchCallback, targetIndexVersion); - return new LSMTreeIndexAccessor(getLsmHarness(), opCtx, cursorFactory); + return new LSMTreeIndexAccessor(getHarness(), opCtx, cursorFactory); } @Override @@ -553,7 +554,7 @@ public class ExternalBTree extends LSMBTree implements ITwoPCIndex { component = createDiskComponent(componentFactory, componentFileReferences.getInsertIndexFileReference(), null, componentFileReferences.getBloomFilterFileReference(), false); } - getLsmHarness().addTransactionComponents(component); + getHarness().addTransactionComponents(component); } @Override http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e5a65429/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/ExternalBTreeOpContext.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/ExternalBTreeOpContext.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/ExternalBTreeOpContext.java index f94c38a..b0c31ad 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/ExternalBTreeOpContext.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/ExternalBTreeOpContext.java @@ -22,9 +22,11 @@ import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory; import org.apache.hyracks.storage.am.btree.api.IBTreeLeafFrame; import org.apache.hyracks.storage.am.common.api.ITreeIndexFrameFactory; import org.apache.hyracks.storage.am.lsm.common.api.ILSMHarness; +import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex; import org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMIndexOperationContext; import org.apache.hyracks.storage.common.ISearchOperationCallback; import org.apache.hyracks.storage.common.MultiComparator; +import org.apache.hyracks.util.trace.ITracer; public class ExternalBTreeOpContext extends AbstractLSMIndexOperationContext { private IBTreeLeafFrame insertLeafFrame; @@ -34,11 +36,11 @@ public class ExternalBTreeOpContext extends AbstractLSMIndexOperationContext { private final int targetIndexVersion; private LSMBTreeCursorInitialState searchInitialState; - public ExternalBTreeOpContext(ITreeIndexFrameFactory insertLeafFrameFactory, + public ExternalBTreeOpContext(ILSMIndex index, ITreeIndexFrameFactory insertLeafFrameFactory, ITreeIndexFrameFactory deleteLeafFrameFactory, ISearchOperationCallback searchCallback, int numBloomFilterKeyFields, IBinaryComparatorFactory[] cmpFactories, int targetIndexVersion, - ILSMHarness lsmHarness) { - super(null, null, null, searchCallback, null); + ILSMHarness lsmHarness, ITracer tracer) { + super(index, null, null, null, searchCallback, null, tracer); if (cmpFactories != null) { this.cmp = MultiComparator.create(cmpFactories); } else { http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e5a65429/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/ExternalBTreeWithBuddy.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/ExternalBTreeWithBuddy.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/ExternalBTreeWithBuddy.java index c495b69..c6ce339 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/ExternalBTreeWithBuddy.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/ExternalBTreeWithBuddy.java @@ -66,6 +66,7 @@ import org.apache.hyracks.storage.common.IModificationOperationCallback; import org.apache.hyracks.storage.common.ISearchOperationCallback; import org.apache.hyracks.storage.common.ISearchPredicate; import org.apache.hyracks.storage.common.buffercache.IBufferCache; +import org.apache.hyracks.util.trace.ITracer; public class ExternalBTreeWithBuddy extends AbstractLSMIndex implements ITreeIndex, ITwoPCIndex { @@ -89,9 +90,10 @@ public class ExternalBTreeWithBuddy extends AbstractLSMIndex implements ITreeInd ILSMDiskComponentFactory bulkLoadComponentFactory, double bloomFilterFalsePositiveRate, ILSMMergePolicy mergePolicy, ILSMOperationTracker opTracker, ILSMIOOperationScheduler ioScheduler, ILSMIOOperationCallbackFactory ioOpCallbackFactory, IBinaryComparatorFactory[] btreeCmpFactories, - IBinaryComparatorFactory[] buddyBtreeCmpFactories, int[] buddyBTreeFields, boolean durable) { + IBinaryComparatorFactory[] buddyBtreeCmpFactories, int[] buddyBTreeFields, boolean durable, + ITracer tracer) { super(ioManager, diskBufferCache, fileManager, bloomFilterFalsePositiveRate, mergePolicy, opTracker, - ioScheduler, ioOpCallbackFactory, componentFactory, bulkLoadComponentFactory, durable); + ioScheduler, ioOpCallbackFactory, componentFactory, bulkLoadComponentFactory, durable, tracer); this.btreeCmpFactories = btreeCmpFactories; this.buddyBtreeCmpFactories = buddyBtreeCmpFactories; this.buddyBTreeFields = buddyBTreeFields; @@ -125,7 +127,7 @@ public class ExternalBTreeWithBuddy extends AbstractLSMIndex implements ITreeInd diskComponents.add(component); secondDiskComponents.add(component); } - ((ExternalIndexHarness) getLsmHarness()).indexFirstTimeActivated(); + ((ExternalIndexHarness) getHarness()).indexFirstTimeActivated(); } else { // This index has been opened before or is brand new with no // components. It should also maintain the version pointer @@ -147,7 +149,7 @@ public class ExternalBTreeWithBuddy extends AbstractLSMIndex implements ITreeInd if (!isActive) { throw new HyracksDataException("Failed to clear the index since it is not activated."); } - ((ExternalIndexHarness) getLsmHarness()).indexClear(); + ((ExternalIndexHarness) getHarness()).indexClear(); for (ILSMDiskComponent c : diskComponents) { c.deactivateAndDestroy(); // Remove from second list to avoid destroying twice @@ -182,7 +184,7 @@ public class ExternalBTreeWithBuddy extends AbstractLSMIndex implements ITreeInd @Override public ILSMIndexAccessor createAccessor(IIndexAccessParameters iap) throws HyracksDataException { - return new LSMTreeIndexAccessor(getLsmHarness(), createOpContext(iap.getSearchOperationCallback(), version), + return new LSMTreeIndexAccessor(getHarness(), createOpContext(iap.getSearchOperationCallback(), version), ctx -> new LSMBTreeWithBuddySearchCursor(ctx, buddyBTreeFields)); } @@ -273,7 +275,7 @@ public class ExternalBTreeWithBuddy extends AbstractLSMIndex implements ITreeInd List<ILSMComponent> mergingComponents = ctx.getComponentHolder(); ITreeIndexCursor cursor = new LSMBTreeWithBuddySortedCursor(bctx, buddyBTreeFields); LSMComponentFileReferences relMergeFileRefs = getMergeTargetFileName(mergingComponents); - ILSMIndexAccessor accessor = new LSMTreeIndexAccessor(getLsmHarness(), bctx, + ILSMIndexAccessor accessor = new LSMTreeIndexAccessor(getHarness(), bctx, opCtx -> new LSMBTreeWithBuddySearchCursor(opCtx, buddyBTreeFields)); // Since we have two lists of components, to tell whether we need to @@ -296,9 +298,9 @@ public class ExternalBTreeWithBuddy extends AbstractLSMIndex implements ITreeInd // This method creates the appropriate opContext for the targeted version public ExternalBTreeWithBuddyOpContext createOpContext(ISearchOperationCallback searchCallback, int targetVersion) { - return new ExternalBTreeWithBuddyOpContext(btreeCmpFactories, buddyBtreeCmpFactories, searchCallback, - targetVersion, getLsmHarness(), btreeInteriorFrameFactory, btreeLeafFrameFactory, - buddyBtreeLeafFrameFactory); + return new ExternalBTreeWithBuddyOpContext(this, btreeCmpFactories, buddyBtreeCmpFactories, searchCallback, + targetVersion, getHarness(), btreeInteriorFrameFactory, btreeLeafFrameFactory, + buddyBtreeLeafFrameFactory, tracer); } @Override @@ -528,7 +530,7 @@ public class ExternalBTreeWithBuddy extends AbstractLSMIndex implements ITreeInd component.markAsValid(durable); component.deactivate(); } else { - getLsmHarness().addBulkLoadedComponent(component); + getHarness().addBulkLoadedComponent(component); } } } @@ -564,7 +566,7 @@ public class ExternalBTreeWithBuddy extends AbstractLSMIndex implements ITreeInd @Override public ILSMIndexAccessor createAccessor(ISearchOperationCallback searchCallback, int targetIndexVersion) throws HyracksDataException { - return new LSMTreeIndexAccessor(getLsmHarness(), createOpContext(searchCallback, targetIndexVersion), + return new LSMTreeIndexAccessor(getHarness(), createOpContext(searchCallback, targetIndexVersion), ctx -> new LSMBTreeWithBuddySearchCursor(ctx, buddyBTreeFields)); } @@ -609,7 +611,7 @@ public class ExternalBTreeWithBuddy extends AbstractLSMIndex implements ITreeInd componentFileRefrences.getDeleteIndexFileReference(), componentFileRefrences.getBloomFilterFileReference(), false); } - ((ExternalIndexHarness) getLsmHarness()).addTransactionComponents(component); + ((ExternalIndexHarness) getHarness()).addTransactionComponents(component); } @Override http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e5a65429/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/ExternalBTreeWithBuddyOpContext.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/ExternalBTreeWithBuddyOpContext.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/ExternalBTreeWithBuddyOpContext.java index d5cd2e2..85d4ab2 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/ExternalBTreeWithBuddyOpContext.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/ExternalBTreeWithBuddyOpContext.java @@ -22,10 +22,12 @@ import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory; import org.apache.hyracks.storage.am.common.api.ITreeIndexFrameFactory; import org.apache.hyracks.storage.am.common.impls.NoOpOperationCallback; import org.apache.hyracks.storage.am.lsm.common.api.ILSMHarness; +import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex; import org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMIndexOperationContext; import org.apache.hyracks.storage.common.IModificationOperationCallback; import org.apache.hyracks.storage.common.ISearchOperationCallback; import org.apache.hyracks.storage.common.MultiComparator; +import org.apache.hyracks.util.trace.ITracer; public class ExternalBTreeWithBuddyOpContext extends AbstractLSMIndexOperationContext { private MultiComparator bTreeCmp; @@ -33,11 +35,12 @@ public class ExternalBTreeWithBuddyOpContext extends AbstractLSMIndexOperationCo private final int targetIndexVersion; private LSMBTreeWithBuddyCursorInitialState searchInitialState; - public ExternalBTreeWithBuddyOpContext(IBinaryComparatorFactory[] btreeCmpFactories, + public ExternalBTreeWithBuddyOpContext(ILSMIndex index, IBinaryComparatorFactory[] btreeCmpFactories, IBinaryComparatorFactory[] buddyBtreeCmpFactories, ISearchOperationCallback searchCallback, int targetIndexVersion, ILSMHarness lsmHarness, ITreeIndexFrameFactory btreeInteriorFrameFactory, - ITreeIndexFrameFactory btreeLeafFrameFactory, ITreeIndexFrameFactory buddyBtreeLeafFrameFactory) { - super(null, null, null, searchCallback, null); + ITreeIndexFrameFactory btreeLeafFrameFactory, ITreeIndexFrameFactory buddyBtreeLeafFrameFactory, + ITracer tracer) { + super(index, null, null, null, searchCallback, null, tracer); this.targetIndexVersion = targetIndexVersion; this.bTreeCmp = MultiComparator.create(btreeCmpFactories); this.buddyBTreeCmp = MultiComparator.create(buddyBtreeCmpFactories); http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e5a65429/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTree.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTree.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTree.java index e69aae1..6929530 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTree.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTree.java @@ -125,9 +125,10 @@ public class LSMBTree extends AbstractLSMIndex implements ITreeIndex { ILSMDiskComponentFactory componentFactory, ILSMDiskComponentFactory bulkLoadComponentFactory, double bloomFilterFalsePositiveRate, IBinaryComparatorFactory[] cmpFactories, ILSMMergePolicy mergePolicy, ILSMOperationTracker opTracker, ILSMIOOperationScheduler ioScheduler, - ILSMIOOperationCallbackFactory ioOpCallbackFactory, boolean needKeyDupCheck, boolean durable) { + ILSMIOOperationCallbackFactory ioOpCallbackFactory, boolean needKeyDupCheck, boolean durable, + ITracer tracer) { super(ioManager, bufferCache, fileManager, bloomFilterFalsePositiveRate, mergePolicy, opTracker, ioScheduler, - ioOpCallbackFactory, componentFactory, bulkLoadComponentFactory, durable); + ioOpCallbackFactory, componentFactory, bulkLoadComponentFactory, durable, tracer); this.insertLeafFrameFactory = insertLeafFrameFactory; this.deleteLeafFrameFactory = deleteLeafFrameFactory; this.cmpFactories = cmpFactories; @@ -364,10 +365,11 @@ public class LSMBTree extends AbstractLSMIndex implements ITreeIndex { public LSMBTreeOpContext createOpContext(IModificationOperationCallback modificationCallback, ISearchOperationCallback searchCallback) { int numBloomFilterKeyFields = hasBloomFilter - ? ((LSMBTreeWithBloomFilterDiskComponentFactory) componentFactory).getBloomFilterKeyFields().length : 0; - return new LSMBTreeOpContext(memoryComponents, insertLeafFrameFactory, deleteLeafFrameFactory, + ? ((LSMBTreeWithBloomFilterDiskComponentFactory) componentFactory).getBloomFilterKeyFields().length + : 0; + return new LSMBTreeOpContext(this, memoryComponents, insertLeafFrameFactory, deleteLeafFrameFactory, modificationCallback, searchCallback, numBloomFilterKeyFields, getTreeFields(), getFilterFields(), - getLsmHarness(), getFilterCmpFactories()); + getHarness(), getFilterCmpFactories(), tracer); } @Override @@ -376,7 +378,7 @@ public class LSMBTree extends AbstractLSMIndex implements ITreeIndex { } public ILSMIndexAccessor createAccessor(AbstractLSMIndexOperationContext opCtx) { - return new LSMTreeIndexAccessor(getLsmHarness(), opCtx, cursorFactory); + return new LSMTreeIndexAccessor(getHarness(), opCtx, cursorFactory); } @Override http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e5a65429/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeOpContext.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeOpContext.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeOpContext.java index 951ec81..9cab94e 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeOpContext.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeOpContext.java @@ -32,12 +32,14 @@ import org.apache.hyracks.storage.am.common.impls.IndexAccessParameters; import org.apache.hyracks.storage.am.common.impls.NoOpOperationCallback; import org.apache.hyracks.storage.am.common.ophelpers.IndexOperation; import org.apache.hyracks.storage.am.lsm.common.api.ILSMHarness; +import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex; import org.apache.hyracks.storage.am.lsm.common.api.ILSMMemoryComponent; import org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMIndexOperationContext; import org.apache.hyracks.storage.common.IIndexAccessParameters; import org.apache.hyracks.storage.common.IModificationOperationCallback; import org.apache.hyracks.storage.common.ISearchOperationCallback; import org.apache.hyracks.storage.common.MultiComparator; +import org.apache.hyracks.util.trace.ITracer; public final class LSMBTreeOpContext extends AbstractLSMIndexOperationContext { @@ -62,11 +64,12 @@ public final class LSMBTreeOpContext extends AbstractLSMIndexOperationContext { private BTree.BTreeAccessor currentMutableBTreeAccessor; private BTreeOpContext currentMutableBTreeOpCtx; - public LSMBTreeOpContext(List<ILSMMemoryComponent> mutableComponents, ITreeIndexFrameFactory insertLeafFrameFactory, - ITreeIndexFrameFactory deleteLeafFrameFactory, IModificationOperationCallback modificationCallback, - ISearchOperationCallback searchCallback, int numBloomFilterKeyFields, int[] btreeFields, int[] filterFields, - ILSMHarness lsmHarness, IBinaryComparatorFactory[] filterCmpFactories) { - super(btreeFields, filterFields, filterCmpFactories, searchCallback, modificationCallback); + public LSMBTreeOpContext(ILSMIndex index, List<ILSMMemoryComponent> mutableComponents, + ITreeIndexFrameFactory insertLeafFrameFactory, ITreeIndexFrameFactory deleteLeafFrameFactory, + IModificationOperationCallback modificationCallback, ISearchOperationCallback searchCallback, + int numBloomFilterKeyFields, int[] btreeFields, int[] filterFields, ILSMHarness lsmHarness, + IBinaryComparatorFactory[] filterCmpFactories, ITracer tracer) { + super(index, btreeFields, filterFields, filterCmpFactories, searchCallback, modificationCallback, tracer); LSMBTreeMemoryComponent c = (LSMBTreeMemoryComponent) mutableComponents.get(0); IBinaryComparatorFactory cmpFactories[] = c.getIndex().getComparatorFactories(); if (cmpFactories[0] != null) { http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e5a65429/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeRangeSearchCursor.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeRangeSearchCursor.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeRangeSearchCursor.java index 6cc8fbb..3e14fb9 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeRangeSearchCursor.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeRangeSearchCursor.java @@ -34,8 +34,10 @@ import org.apache.hyracks.storage.am.btree.impls.RangePredicate; import org.apache.hyracks.storage.am.common.impls.NoOpIndexAccessParameters; import org.apache.hyracks.storage.am.common.impls.NoOpOperationCallback; import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponent; +import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponent.ComponentState; import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponent.LSMComponentType; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexOperationContext; +import org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMMemoryComponent; import org.apache.hyracks.storage.am.lsm.common.impls.LSMIndexSearchCursor; import org.apache.hyracks.storage.common.ICursorInitialState; import org.apache.hyracks.storage.common.IIndexCursor; @@ -47,10 +49,11 @@ public class LSMBTreeRangeSearchCursor extends LSMIndexSearchCursor { private final RangePredicate reusablePred; private ISearchOperationCallback searchCallback; - private RangePredicate predicate; + private BTreeAccessor[] btreeAccessors; private ArrayTupleBuilder tupleBuilder; private boolean canCallProceed = true; + private int tupleFromMemoryComponentCount = 0; public LSMBTreeRangeSearchCursor(ILSMIndexOperationContext opCtx) { this(opCtx, false); @@ -73,6 +76,9 @@ public class LSMBTreeRangeSearchCursor extends LSMIndexSearchCursor { outputElement = outputPriorityQueue.poll(); needPushElementIntoQueue = true; canCallProceed = false; + if (outputElement.getCursorIndex() == 0) { + tupleFromMemoryComponentCount++; + } } /** @@ -87,6 +93,15 @@ public class LSMBTreeRangeSearchCursor extends LSMIndexSearchCursor { */ @Override protected void checkPriorityQueue() throws HyracksDataException { + // Every SWITCH_COMPONENT_CYCLE calls, check if memory components need to be swapped with disk components + // We should do this regardless of the value of includeMutableComponent. This is because if the cursor + // of the memory component has gone past the end of the in memory component, then the includeMutableComponent + // will be set to false. Still, when that happens, we want to exit the memory component to allow it to be + // recycled and used for modifications. + if (hasNextCallCount >= SWITCH_COMPONENT_CYCLE) { + replaceMemoryComponentWithDiskComponentIfNeeded(); + hasNextCallCount = 0; + } while (!outputPriorityQueue.isEmpty() || needPushElementIntoQueue) { if (!outputPriorityQueue.isEmpty()) { PriorityQueueElement queueHead = outputPriorityQueue.peek(); @@ -97,7 +112,7 @@ public class LSMBTreeRangeSearchCursor extends LSMIndexSearchCursor { if (!searchCallback.proceed(queueHead.getTuple())) { // In case proceed() fails and there is an in-memory component, // we can't simply use this element since there might be a change. - PriorityQueueElement mutableElement = removeMutable(outputPriorityQueue); + PriorityQueueElement mutableElement = remove(outputPriorityQueue, 0); if (mutableElement != null) { // Copies the current queue head if (tupleBuilder == null) { @@ -166,10 +181,7 @@ public class LSMBTreeRangeSearchCursor extends LSMIndexSearchCursor { } else { // If the previous tuple and the head tuple are different // the info of previous tuple is useless - if (needPushElementIntoQueue == true) { - pushIntoQueueFromCursorAndReplaceThisElement(outputElement); - needPushElementIntoQueue = false; - } + pushOutputElementIntoQueueIfNeeded(); canCallProceed = true; outputElement = null; } @@ -185,16 +197,128 @@ public class LSMBTreeRangeSearchCursor extends LSMIndexSearchCursor { } - private PriorityQueueElement removeMutable(PriorityQueue<PriorityQueueElement> outputPriorityQueue) { - // Scans the PQ for the mutable component's element and delete it - // since it can be changed. - // (i.e. we can't ensure that the element is the most current one.) + private void pushOutputElementIntoQueueIfNeeded() throws HyracksDataException { + if (needPushElementIntoQueue) { + pushIntoQueueFromCursorAndReplaceThisElement(outputElement); + needPushElementIntoQueue = false; + } + } + + private void replaceMemoryComponentWithDiskComponentIfNeeded() throws HyracksDataException { + int replaceFrom = replaceFrom(); + if (replaceFrom < 0) { + // no switch is needed, check if we need to re-do the search on the memory component. + // searches and modifications compete on the pages of the memory component + // if the cursor on the memory component is not advancing, we re-do the operation in order + // to release the latches and allow modifications to proceed + redoMemoryComponentSearchIfNeeded(); + return; + } + opCtx.getIndex().getHarness().replaceMemoryComponentsWithDiskComponents(getOpCtx(), replaceFrom); + // redo the search on the new component + for (int i = replaceFrom; i < switchRequest.length; i++) { + if (switchRequest[i] && switchedElements[i] != null) { + copyTuple.reset(switchComponentTupleBuilders[i].getFieldEndOffsets(), + switchComponentTupleBuilders[i].getByteArray()); + reusablePred.setLowKey(copyTuple, true); + rangeCursors[i].reset(); + ILSMComponent component = operationalComponents.get(i); + BTree btree = (BTree) component.getIndex(); + if (i == 0 && component.getType() != LSMComponentType.MEMORY) { + includeMutableComponent = false; + } + btreeAccessors[i].reset(btree, NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE); + btreeAccessors[i].search(rangeCursors[i], reusablePred); + pushIntoQueueFromCursorAndReplaceThisElement(switchedElements[i]); + } + switchRequest[i] = false; + // any failed switch makes further switches pointless + switchPossible = switchPossible && operationalComponents.get(i).getType() == LSMComponentType.DISK; + } + } + + private int replaceFrom() throws HyracksDataException { + int replaceFrom = -1; + if (!switchPossible) { + return replaceFrom; + } + for (int i = 0; i < operationalComponents.size(); i++) { + ILSMComponent next = operationalComponents.get(i); + if (next.getType() == LSMComponentType.DISK) { + if (i == 0) { + // if the first component is a disk component, then switch is not possible + switchPossible = false; + } + break; + } else if (next.getState() == ComponentState.UNREADABLE_UNWRITABLE) { + // if the component is UNREADABLE_UNWRITABLE, then it means that the flush has been completed while + // the search cursor is inside the component, a switch candidate + if (replaceFrom < 0) { + replaceFrom = i; + } + // we return the outputElement to the priority queue if it came from this component + if (outputElement != null && outputElement.getCursorIndex() == i) { + pushIntoQueueFromCursorAndReplaceThisElement(outputElement); + needPushElementIntoQueue = false; + outputElement = null; + canCallProceed = true; + } + PriorityQueueElement element = remove(outputPriorityQueue, i); + // if this cursor is still active (has an element) + // then we copy the search key to restart the operation after + // replacing the component + if (element != null) { + if (switchComponentTupleBuilders[i] == null) { + switchComponentTupleBuilders[i] = new ArrayTupleBuilder(cmp.getKeyFieldCount()); + } + TupleUtils.copyTuple(switchComponentTupleBuilders[i], element.getTuple(), cmp.getKeyFieldCount()); + } + rangeCursors[i].reset(); + rangeCursors[i].close(); + switchRequest[i] = true; + switchedElements[i] = element; + } + } + return replaceFrom; + } + + private void redoMemoryComponentSearchIfNeeded() throws HyracksDataException { + if (!includeMutableComponent) { + return; + } + // if the last n records, none were from memory and there are writers inside the component, + // we need to re-do the search so the cursor doesn't block modifications due to latches over page + if (tupleFromMemoryComponentCount == 0 + && ((AbstractLSMMemoryComponent) operationalComponents.get(0)).getWriterCount() > 0) { + // When we reach here, we know that the mutable component element is not the outputElement + // since if it was the output element, the tupleFromMemoryComponentCount would be at least 1 + PriorityQueueElement mutableElement = remove(outputPriorityQueue, 0); + if (mutableElement != null) { + // if the element is null, then there is nothing to do since no latches are held + if (tupleBuilder == null) { + tupleBuilder = new ArrayTupleBuilder(cmp.getKeyFieldCount()); + } + TupleUtils.copyTuple(tupleBuilder, mutableElement.getTuple(), cmp.getKeyFieldCount()); + copyTuple.reset(tupleBuilder.getFieldEndOffsets(), tupleBuilder.getByteArray()); + // Unlatches/unpins the leaf page of the index. + rangeCursors[0].reset(); + // Re-traverses the index. + reusablePred.setLowKey(copyTuple, true); + btreeAccessors[0].search(rangeCursors[0], reusablePred); + includeMutableComponent = pushIntoQueueFromCursorAndReplaceThisElement(mutableElement); + } + } + tupleFromMemoryComponentCount = 0; + } + + private PriorityQueueElement remove(PriorityQueue<PriorityQueueElement> outputPriorityQueue, int cursorIndex) { + // Scans the PQ for the component's element and delete it Iterator<PriorityQueueElement> it = outputPriorityQueue.iterator(); while (it.hasNext()) { - PriorityQueueElement mutableElement = it.next(); - if (mutableElement.getCursorIndex() == 0) { + PriorityQueueElement e = it.next(); + if (e.getCursorIndex() == cursorIndex) { it.remove(); - return mutableElement; + return e; } } return null; @@ -207,7 +331,7 @@ public class LSMBTreeRangeSearchCursor extends LSMIndexSearchCursor { operationalComponents = lsmInitialState.getOperationalComponents(); lsmHarness = lsmInitialState.getLSMHarness(); searchCallback = lsmInitialState.getSearchOperationCallback(); - predicate = (RangePredicate) lsmInitialState.getSearchPredicate(); + RangePredicate predicate = (RangePredicate) lsmInitialState.getSearchPredicate(); reusablePred.setLowKeyComparator(cmp); reusablePred.setHighKey(predicate.getHighKey(), predicate.isHighKeyInclusive()); reusablePred.setHighKeyComparator(predicate.getHighKeyComparator()); http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e5a65429/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/utils/LSMBTreeUtil.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/utils/LSMBTreeUtil.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/utils/LSMBTreeUtil.java index a0b1905..08e5af0 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/utils/LSMBTreeUtil.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/utils/LSMBTreeUtil.java @@ -132,7 +132,7 @@ public class LSMBTreeUtil { int[] bloomFilterKeyFields, double bloomFilterFalsePositiveRate, ILSMMergePolicy mergePolicy, ILSMOperationTracker opTracker, ILSMIOOperationScheduler ioScheduler, ILSMIOOperationCallbackFactory ioOpCallbackFactory, boolean durable, - IMetadataPageManagerFactory freePageManagerFactory) { + IMetadataPageManagerFactory freePageManagerFactory, ITracer tracer) { LSMBTreeTupleWriterFactory insertTupleWriterFactory = new LSMBTreeTupleWriterFactory(typeTraits, cmpFactories.length, false, false); LSMBTreeTupleWriterFactory deleteTupleWriterFactory = @@ -176,14 +176,15 @@ public class LSMBTreeUtil { return new ExternalBTree(ioManager, interiorFrameFactory, insertLeafFrameFactory, deleteLeafFrameFactory, diskBufferCache, fileNameManager, componentFactory, bulkLoadComponentFactory, transactionComponentFactory, bloomFilterFalsePositiveRate, cmpFactories, mergePolicy, opTracker, - ioScheduler, ioOpCallbackFactory, durable); + ioScheduler, ioOpCallbackFactory, durable, tracer); } public static ExternalBTreeWithBuddy createExternalBTreeWithBuddy(IIOManager ioManager, FileReference file, IBufferCache diskBufferCache, ITypeTraits[] typeTraits, IBinaryComparatorFactory[] cmpFactories, double bloomFilterFalsePositiveRate, ILSMMergePolicy mergePolicy, ILSMOperationTracker opTracker, ILSMIOOperationScheduler ioScheduler, ILSMIOOperationCallbackFactory ioOpCallbackFactory, - int[] buddyBTreeFields, boolean durable, IMetadataPageManagerFactory freePageManagerFactory) { + int[] buddyBTreeFields, boolean durable, IMetadataPageManagerFactory freePageManagerFactory, + ITracer tracer) { ITypeTraits[] buddyBtreeTypeTraits = new ITypeTraits[buddyBTreeFields.length]; IBinaryComparatorFactory[] buddyBtreeCmpFactories = new IBinaryComparatorFactory[buddyBTreeFields.length]; for (int i = 0; i < buddyBtreeTypeTraits.length; i++) { @@ -232,6 +233,6 @@ public class LSMBTreeUtil { return new ExternalBTreeWithBuddy(ioManager, interiorFrameFactory, insertLeafFrameFactory, buddyBtreeLeafFrameFactory, diskBufferCache, fileNameManager, componentFactory, bulkLoadComponentFactory, bloomFilterFalsePositiveRate, mergePolicy, opTracker, ioScheduler, - ioOpCallbackFactory, cmpFactories, buddyBtreeCmpFactories, buddyBTreeFields, durable); + ioOpCallbackFactory, cmpFactories, buddyBtreeCmpFactories, buddyBTreeFields, durable, tracer); } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e5a65429/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMComponent.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMComponent.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMComponent.java index ab8e899..fc55ce5 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMComponent.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMComponent.java @@ -129,6 +129,11 @@ public interface ILSMComponent { IIndex getIndex(); /** + * @return the {@link ILSMIndex} this component belong to + */ + ILSMIndex getLsmIndex(); + + /** * * @return id of the component * @throws HyracksDataException http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e5a65429/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMHarness.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMHarness.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMHarness.java index 89c8cb9..b32dd0f 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMHarness.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMHarness.java @@ -253,4 +253,17 @@ public interface ILSMHarness { */ void deleteComponents(ILSMIndexOperationContext ctx, Predicate<ILSMComponent> predicate) throws HyracksDataException; + + /** + * Replace the memory components in this operation context with their corresponding disk + * components if possible + * + * @param ctx + * the operation context + * @param startIndex + * the index of the first component to switch + * @throws HyracksDataException + */ + void replaceMemoryComponentsWithDiskComponents(ILSMIndexOperationContext ctx, int startIndex) + throws HyracksDataException; } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e5a65429/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIndex.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIndex.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIndex.java index 52f9f06..62493f4 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIndex.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIndex.java @@ -143,4 +143,9 @@ public interface ILSMIndex extends IIndex { * @return The number of all memory components (active and inactive) */ int getNumberOfAllMemoryComponents(); + + /** + * @return the {@link ILSMHarness} of the index + */ + ILSMHarness getHarness(); } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e5a65429/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIndexOperationContext.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIndexOperationContext.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIndexOperationContext.java index 5b0378a..ec9124d 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIndexOperationContext.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIndexOperationContext.java @@ -56,4 +56,30 @@ public interface ILSMIndexOperationContext extends IIndexOperationContext { PermutingTupleReference getFilterTuple(); MultiComparator getFilterCmp(); + + /** + * @return the {@link ILSMIndex} of the component + */ + ILSMIndex getIndex(); + + /** + * Performance tracing method. Logs the accumulated counters for number of tuples + * + * @param tupleCount + * the number of tuples represented by the counters + */ + void logPerformanceCounters(int tupleCount); + + /** + * Increment the time taken for entering and exiting components + * + * @param increment + * the time increment in nanoseconds + */ + void incrementEnterExitTime(long increment); + + /** + * @return true if performance tracing is enabled, false otherwise + */ + boolean isTracingEnabled(); } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e5a65429/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMComponent.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMComponent.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMComponent.java index ac124ba..84d2fe5 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMComponent.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMComponent.java @@ -28,7 +28,7 @@ public abstract class AbstractLSMComponent implements ILSMComponent { protected final ILSMComponentFilter filter; protected final AbstractLSMIndex lsmIndex; // Mutables - protected ComponentState state; + protected volatile ComponentState state; protected int readerCount; public AbstractLSMComponent(AbstractLSMIndex lsmIndex, ILSMComponentFilter filter) { @@ -46,4 +46,9 @@ public abstract class AbstractLSMComponent implements ILSMComponent { public ILSMComponentFilter getLSMComponentFilter() { return filter; } + + @Override + public final AbstractLSMIndex getLsmIndex() { + return lsmIndex; + } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e5a65429/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMDiskComponent.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMDiskComponent.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMDiskComponent.java index b664102..bb27236 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMDiskComponent.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMDiskComponent.java @@ -18,6 +18,7 @@ */ package org.apache.hyracks.storage.am.lsm.common.impls; +import java.util.logging.Level; import java.util.logging.Logger; import org.apache.hyracks.api.exceptions.HyracksDataException; @@ -49,11 +50,6 @@ public abstract class AbstractLSMDiskComponent extends AbstractLSMComponent impl } @Override - public AbstractLSMIndex getLsmIndex() { - return lsmIndex; - } - - @Override public boolean threadEnter(LSMOperationType opType, boolean isMutableComponent) { if (state == ComponentState.INACTIVE) { throw new IllegalStateException("Trying to enter an inactive disk component"); @@ -148,6 +144,9 @@ public abstract class AbstractLSMDiskComponent extends AbstractLSMComponent impl @Override public void markAsValid(boolean persist) throws HyracksDataException { ComponentUtils.markAsValid(getMetadataHolder(), persist); + if (LOGGER.isLoggable(Level.INFO)) { + LOGGER.log(Level.INFO, "Marked as valid component with id: " + getId()); + } } @Override http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e5a65429/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndex.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndex.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndex.java index b0cc318..dc808ad 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndex.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndex.java @@ -27,6 +27,8 @@ import java.util.List; import java.util.Set; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import java.util.logging.Level; +import java.util.logging.Logger; import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory; import org.apache.hyracks.api.exceptions.ErrorCode; @@ -71,6 +73,7 @@ import org.apache.hyracks.storage.common.buffercache.IBufferCache; import org.apache.hyracks.util.trace.ITracer; public abstract class AbstractLSMIndex implements ILSMIndex { + private static final Logger LOGGER = Logger.getLogger(AbstractLSMIndex.class.getName()); protected final ILSMHarness lsmHarness; protected final IIOManager ioManager; protected final ILSMIOOperationScheduler ioScheduler; @@ -142,7 +145,7 @@ public abstract class AbstractLSMIndex implements ILSMIndex { double bloomFilterFalsePositiveRate, ILSMMergePolicy mergePolicy, ILSMOperationTracker opTracker, ILSMIOOperationScheduler ioScheduler, ILSMIOOperationCallbackFactory ioOpCallbackFactory, ILSMDiskComponentFactory componentFactory, ILSMDiskComponentFactory bulkLoadComponentFactory, - boolean durable) { + boolean durable, ITracer tracer) { this.ioManager = ioManager; this.diskBufferCache = diskBufferCache; this.fileManager = fileManager; @@ -152,6 +155,7 @@ public abstract class AbstractLSMIndex implements ILSMIndex { this.componentFactory = componentFactory; this.bulkLoadComponentFactory = bulkLoadComponentFactory; this.durable = durable; + this.tracer = tracer; lsmHarness = new ExternalIndexHarness(this, mergePolicy, opTracker, diskBufferCache.isReplicationEnabled()); isActive = false; diskComponents = new LinkedList<>(); @@ -303,12 +307,12 @@ public abstract class AbstractLSMIndex implements ILSMIndex { operationalComponents.add(memoryComponents.get(cmc)); break; case INSERT: - addOperationalMutableComponents(operationalComponents); + addOperationalMutableComponents(operationalComponents, true); operationalComponents.addAll(immutableComponents); break; case SEARCH: if (memoryComponentsAllocated) { - addOperationalMutableComponents(operationalComponents); + addOperationalMutableComponents(operationalComponents, false); } if (filterManager != null) { for (ILSMComponent c : immutableComponents) { @@ -375,18 +379,23 @@ public abstract class AbstractLSMIndex implements ILSMIndex { ioScheduler.scheduleOperation(TracedIOOperation.wrap(mergeOp, tracer)); } - private void addOperationalMutableComponents(List<ILSMComponent> operationalComponents) { + private void addOperationalMutableComponents(List<ILSMComponent> operationalComponents, boolean modification) { int cmc = currentMutableComponentId.get(); int numMutableComponents = memoryComponents.size(); for (int i = 0; i < numMutableComponents - 1; i++) { ILSMMemoryComponent c = memoryComponents.get((cmc + i + 1) % numMutableComponents); if (c.isReadable()) { - // Make sure newest components are added first + // Make sure newest components are added first if readable operationalComponents.add(0, c); } } - // The current mutable component is always added - operationalComponents.add(0, memoryComponents.get(cmc)); + // The current mutable component is added if modification operation or if readable + // This ensures that activation of new component only happens in case of modifications + // and allow for controlling that without stopping search operations + ILSMMemoryComponent c = memoryComponents.get(cmc); + if (modification || c.isReadable()) { + operationalComponents.add(0, c); + } } @Override @@ -421,7 +430,7 @@ public abstract class AbstractLSMIndex implements ILSMIndex { } @Override - public final synchronized void allocateMemoryComponents() throws HyracksDataException { + public synchronized void allocateMemoryComponents() throws HyracksDataException { if (!isActive) { throw HyracksDataException.create(ErrorCode.CANNOT_ALLOCATE_MEMORY_FOR_INACTIVE_INDEX); } @@ -652,7 +661,8 @@ public abstract class AbstractLSMIndex implements ILSMIndex { return filterManager; } - public ILSMHarness getLsmHarness() { + @Override + public ILSMHarness getHarness() { return lsmHarness; } @@ -681,8 +691,15 @@ public abstract class AbstractLSMIndex implements ILSMIndex { public final ILSMDiskComponent flush(ILSMIOOperation operation) throws HyracksDataException { ILSMIndexAccessor accessor = operation.getAccessor(); ILSMIndexOperationContext opCtx = accessor.getOpContext(); - return opCtx.getOperation() == IndexOperation.DELETE_MEMORY_COMPONENT ? EmptyComponent.INSTANCE - : doFlush(operation); + if (opCtx.getOperation() == IndexOperation.DELETE_MEMORY_COMPONENT) { + return EmptyComponent.INSTANCE; + } else { + if (LOGGER.isLoggable(Level.INFO)) { + FlushOperation flushOp = (FlushOperation) operation; + LOGGER.log(Level.INFO, "Flushing component with id: " + flushOp.getFlushingComponent().getId()); + } + return doFlush(operation); + } } @Override http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e5a65429/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndexOperationContext.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndexOperationContext.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndexOperationContext.java index 065d465..0818b08 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndexOperationContext.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndexOperationContext.java @@ -27,14 +27,18 @@ import org.apache.hyracks.storage.am.common.ophelpers.IndexOperation; import org.apache.hyracks.storage.am.common.tuples.PermutingTupleReference; import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponent; import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent; +import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexOperationContext; import org.apache.hyracks.storage.common.IModificationOperationCallback; import org.apache.hyracks.storage.common.ISearchOperationCallback; import org.apache.hyracks.storage.common.ISearchPredicate; import org.apache.hyracks.storage.common.MultiComparator; +import org.apache.hyracks.util.trace.ITracer; +import org.apache.hyracks.util.trace.ITracer.Scope; public abstract class AbstractLSMIndexOperationContext implements ILSMIndexOperationContext { + protected final ILSMIndex index; protected final PermutingTupleReference indexTuple; protected final MultiComparator filterCmp; protected final PermutingTupleReference filterTuple; @@ -47,10 +51,14 @@ public abstract class AbstractLSMIndexOperationContext implements ILSMIndexOpera protected IndexOperation op; protected boolean accessingComponents = false; protected ISearchPredicate searchPredicate; + protected final ITracer tracer; + protected final long traceCategory; + private long enterExitTime = 0L; - public AbstractLSMIndexOperationContext(int[] treeFields, int[] filterFields, + public AbstractLSMIndexOperationContext(ILSMIndex index, int[] treeFields, int[] filterFields, IBinaryComparatorFactory[] filterCmpFactories, ISearchOperationCallback searchCallback, - IModificationOperationCallback modificationCallback) { + IModificationOperationCallback modificationCallback, ITracer tracer) { + this.index = index; this.searchCallback = searchCallback; this.modificationCallback = modificationCallback; this.componentHolder = new LinkedList<>(); @@ -73,6 +81,8 @@ public abstract class AbstractLSMIndexOperationContext implements ILSMIndexOpera filterTuple = null; allFields = null; } + this.tracer = tracer; + this.traceCategory = tracer.getRegistry().get("op-ctx"); } @Override @@ -153,4 +163,32 @@ public abstract class AbstractLSMIndexOperationContext implements ILSMIndexOpera public ISearchPredicate getSearchPredicate() { return searchPredicate; } + + @Override + public final boolean isTracingEnabled() { + return tracer.isEnabled(traceCategory); + } + + @Override + public void logPerformanceCounters(int tupleCount) { + if (isTracingEnabled()) { + tracer.instant("store-counters", traceCategory, Scope.t, + "{\"count\":" + tupleCount + ",\"enter-exit-duration-ns\":" + enterExitTime + "}"); + resetCounters(); + } + } + + public void resetCounters() { + enterExitTime = 0L; + } + + @Override + public void incrementEnterExitTime(long increment) { + enterExitTime += increment; + } + + @Override + public ILSMIndex getIndex() { + return index; + } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e5a65429/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMMemoryComponent.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMMemoryComponent.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMMemoryComponent.java index 6a186dc..17dadcb 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMMemoryComponent.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMMemoryComponent.java @@ -19,6 +19,8 @@ package org.apache.hyracks.storage.am.lsm.common.impls; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.logging.Level; +import java.util.logging.Logger; import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentFilter; @@ -32,6 +34,7 @@ import org.apache.hyracks.storage.common.buffercache.IBufferCache; public abstract class AbstractLSMMemoryComponent extends AbstractLSMComponent implements ILSMMemoryComponent { + private static final Logger LOGGER = Logger.getLogger(AbstractLSMMemoryComponent.class.getName()); private final IVirtualBufferCache vbc; private final AtomicBoolean isModified; private int writerCount; @@ -277,6 +280,9 @@ public abstract class AbstractLSMMemoryComponent extends AbstractLSMComponent im throw new IllegalStateException( "LSM memory component receives illegal id. Old id " + this.componentId + ", new id " + componentId); } + if (LOGGER.isLoggable(Level.INFO)) { + LOGGER.log(Level.INFO, "Component Id was reset from " + this.componentId + " to " + componentId); + } this.componentId = componentId; LSMComponentIdUtils.persist(this.componentId, metadata); } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e5a65429/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/ComponentReplacementContext.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/ComponentReplacementContext.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/ComponentReplacementContext.java new file mode 100644 index 0000000..98c1560 --- /dev/null +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/ComponentReplacementContext.java @@ -0,0 +1,215 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hyracks.storage.am.lsm.common.impls; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.logging.Level; +import java.util.logging.Logger; + +import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.storage.am.common.ophelpers.IndexOperation; +import org.apache.hyracks.storage.am.common.tuples.PermutingTupleReference; +import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponent; +import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponent.LSMComponentType; +import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentId; +import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent; +import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex; +import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexOperationContext; +import org.apache.hyracks.storage.common.IModificationOperationCallback; +import org.apache.hyracks.storage.common.ISearchOperationCallback; +import org.apache.hyracks.storage.common.ISearchPredicate; +import org.apache.hyracks.storage.common.MultiComparator; + +public class ComponentReplacementContext implements ILSMIndexOperationContext { + private static final Logger LOGGER = Logger.getLogger(ComponentReplacementContext.class.getName()); + private final List<ILSMComponent> components; + private final List<ILSMComponent> diskComponents; + private final List<ILSMComponentId> replacedComponentIds; + private final int[] swapIndexes; + private int count = 0; + boolean accessingComponent = true; + + public ComponentReplacementContext(ILSMIndex lsmIndex) { + components = new ArrayList<>(lsmIndex.getNumberOfAllMemoryComponents()); + replacedComponentIds = new ArrayList<>(lsmIndex.getNumberOfAllMemoryComponents()); + swapIndexes = new int[lsmIndex.getNumberOfAllMemoryComponents()]; + diskComponents = new ArrayList<>(lsmIndex.getNumberOfAllMemoryComponents()); + } + + @Override + public void setOperation(IndexOperation newOp) throws HyracksDataException { + // Do nothing + } + + @Override + public IndexOperation getOperation() { + return IndexOperation.SEARCH; + } + + @Override + public void reset() { + accessingComponent = true; + components.clear(); + diskComponents.clear(); + replacedComponentIds.clear(); + count = 0; + } + + @Override + public List<ILSMComponent> getComponentHolder() { + return components; + } + + @Override + public List<ILSMDiskComponent> getComponentsToBeMerged() { + return Collections.emptyList(); + } + + @Override + public ISearchOperationCallback getSearchOperationCallback() { + return null; + } + + @Override + public IModificationOperationCallback getModificationCallback() { + return null; + } + + @Override + public void setCurrentMutableComponentId(int currentMutableComponentId) { + throw new UnsupportedOperationException(); + } + + @Override + public void setSearchPredicate(ISearchPredicate searchPredicate) { + throw new UnsupportedOperationException(); + } + + @Override + public ISearchPredicate getSearchPredicate() { + return null; + } + + @Override + public List<ILSMDiskComponent> getComponentsToBeReplicated() { + return Collections.emptyList(); + } + + @Override + public boolean isAccessingComponents() { + return accessingComponent; + } + + @Override + public void setAccessingComponents(boolean accessingComponents) { + // Ignore since this is only used for component replacement + } + + @Override + public PermutingTupleReference getIndexTuple() { + return null; + } + + @Override + public PermutingTupleReference getFilterTuple() { + return null; + } + + @Override + public MultiComparator getFilterCmp() { + return null; + } + + @Override + public void logPerformanceCounters(int tupleCount) { + throw new UnsupportedOperationException(); + } + + @Override + public void incrementEnterExitTime(long increment) { + // Ignore since this is only used for component replacement + } + + public boolean proceed(List<ILSMDiskComponent> allDiskComponents) throws HyracksDataException { + for (int i = 0; i < components.size(); i++) { + replacedComponentIds.add(components.get(i).getId()); + // ensure that disk component exists + boolean found = false; + LOGGER.log(Level.INFO, "Looking for a component with the id: " + replacedComponentIds.get(i)); + for (int j = 0; j < allDiskComponents.size(); j++) { + ILSMDiskComponent dc = allDiskComponents.get(j); + ILSMComponentId diskComponentId = dc.getId(); + LOGGER.log(Level.INFO, "Next disk component id: " + diskComponentId); + if (diskComponentId.equals(replacedComponentIds.get(i))) { + found = true; + diskComponents.add(dc); + break; + } + } + if (!found) { + // component has been merged? + LOGGER.log(Level.WARNING, "Memory Component with id = " + replacedComponentIds.get(i) + + " was flushed and merged before search cursor replaces it"); + return false; + } + } + return true; + } + + public void swapIndex(int i) { + swapIndexes[count] = i; + count++; + } + + public void prepareToEnter() { + components.clear(); + components.addAll(diskComponents); + accessingComponent = false; + } + + public void replace(ILSMIndexOperationContext ctx) { + // Called after exit and enter has been completed + try { + for (int i = 0; i < count; i++) { + ILSMComponent removed = ctx.getComponentHolder().remove(swapIndexes[i]); + if (removed.getType() == LSMComponentType.MEMORY) { + LOGGER.log(Level.INFO, "Removed a memory component from the search operation"); + } else { + throw new IllegalStateException("Disk components can't be removed from the search operation"); + } + ctx.getComponentHolder().add(swapIndexes[i], diskComponents.get(i)); + } + } catch (Exception e) { + LOGGER.log(Level.WARNING, "Failure replacing memory components with disk components", e); + throw e; + } + } + + @Override + public ILSMIndex getIndex() { + return null; + } + + @Override + public boolean isTracingEnabled() { + return false; + } +} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e5a65429/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMComponentId.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMComponentId.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMComponentId.java index dd86f65..442af56 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMComponentId.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMComponentId.java @@ -78,13 +78,7 @@ public class LSMComponentId implements ILSMComponentId { return false; } LSMComponentId other = (LSMComponentId) obj; - if (maxId != other.maxId) { - return false; - } - if (minId != other.minId) { - return false; - } - return true; + return maxId == other.maxId && minId == other.minId; } @Override
