http://git-wip-us.apache.org/repos/asf/asterixdb/blob/b2e50b7d/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/EmptyDiskComponentMetadata.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/EmptyDiskComponentMetadata.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/EmptyDiskComponentMetadata.java new file mode 100644 index 0000000..7d1925b --- /dev/null +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/EmptyDiskComponentMetadata.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hyracks.storage.am.lsm.common.impls; + +import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.data.std.api.IPointable; +import org.apache.hyracks.data.std.api.IValueReference; + +public class EmptyDiskComponentMetadata extends DiskComponentMetadata { + public static final EmptyDiskComponentMetadata INSTANCE = new EmptyDiskComponentMetadata(); + + private EmptyDiskComponentMetadata() { + super(null); + } + + @Override + public void put(IValueReference key, IValueReference value) throws HyracksDataException { + // No op + } + + @Override + public void get(IValueReference key, IPointable value) throws HyracksDataException { + throw new IllegalStateException("Attempt to read metadata of empty component"); + } + + @Override + public IValueReference get(IValueReference key) throws HyracksDataException { + throw new IllegalStateException("Attempt to read metadata of empty component"); + } + + @Override + public void put(MemoryComponentMetadata metadata) throws HyracksDataException { + // No op + } +}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/b2e50b7d/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/ExternalIndexHarness.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/ExternalIndexHarness.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/ExternalIndexHarness.java index e9b2058..2f65b18 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/ExternalIndexHarness.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/ExternalIndexHarness.java @@ -236,7 +236,7 @@ public class ExternalIndexHarness extends LSMHarness { try { newComponent = lsmIndex.merge(operation); operation.getCallback().afterOperation(LSMOperationType.MERGE, ctx.getComponentHolder(), newComponent); - lsmIndex.markAsValid(newComponent); + newComponent.markAsValid(lsmIndex.isDurable()); } finally { exitComponents(ctx, LSMOperationType.MERGE, newComponent, false); operation.getCallback().afterFinalize(LSMOperationType.MERGE, newComponent); @@ -248,7 +248,7 @@ public class ExternalIndexHarness extends LSMHarness { @Override public void addBulkLoadedComponent(ILSMDiskComponent c) throws HyracksDataException { - lsmIndex.markAsValid(c); + c.markAsValid(lsmIndex.isDurable()); synchronized (opTracker) { lsmIndex.addDiskComponent(c); if (replicationEnabled) { http://git-wip-us.apache.org/repos/asf/asterixdb/blob/b2e50b7d/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/FlushOperation.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/FlushOperation.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/FlushOperation.java index 1173aeb..7b7f950 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/FlushOperation.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/FlushOperation.java @@ -26,16 +26,12 @@ import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponent; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallback; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor; -import org.apache.hyracks.storage.am.lsm.common.api.ILSMMemoryComponent; public class FlushOperation extends AbstractIoOperation implements Comparable<ILSMIOOperation> { - protected final ILSMMemoryComponent flushingComponent; - - public FlushOperation(ILSMIndexAccessor accessor, ILSMMemoryComponent flushingComponent, FileReference target, - ILSMIOOperationCallback callback, String indexIdentifier) { + public FlushOperation(ILSMIndexAccessor accessor, FileReference target, ILSMIOOperationCallback callback, + String indexIdentifier) { super(accessor, target, callback, indexIdentifier); - this.flushingComponent = flushingComponent; } @Override @@ -55,7 +51,7 @@ public class FlushOperation extends AbstractIoOperation implements Comparable<IL } public ILSMComponent getFlushingComponent() { - return flushingComponent; + return accessor.getOpContext().getComponentHolder().get(0); } @Override http://git-wip-us.apache.org/repos/asf/asterixdb/blob/b2e50b7d/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMComponentState.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMComponentState.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMComponentState.java deleted file mode 100644 index 094b6c6..0000000 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMComponentState.java +++ /dev/null @@ -1,27 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.hyracks.storage.am.lsm.common.impls; - -public enum LSMComponentState { - FLUSHING, - MERGING, - DONE_FLUSHING, - DONE_MERGING -} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/b2e50b7d/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java index 1fc702c..1ef807f 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java @@ -23,6 +23,7 @@ import java.util.ArrayList; import java.util.LinkedList; import java.util.List; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Predicate; import java.util.logging.Level; import java.util.logging.Logger; @@ -51,6 +52,7 @@ import org.apache.hyracks.storage.am.lsm.common.api.ILSMMemoryComponent; import org.apache.hyracks.storage.am.lsm.common.api.ILSMMergePolicy; import org.apache.hyracks.storage.am.lsm.common.api.ILSMOperationTracker; import org.apache.hyracks.storage.am.lsm.common.api.LSMOperationType; +import org.apache.hyracks.storage.am.lsm.common.util.IOOperationUtils; import org.apache.hyracks.storage.common.IIndexCursor; import org.apache.hyracks.storage.common.ISearchPredicate; import org.apache.hyracks.util.trace.Tracer; @@ -120,7 +122,8 @@ public class LSMHarness implements ILSMHarness { } break; case MERGE: - if (ctx.getComponentHolder().size() < 2) { + if (ctx.getComponentHolder().size() < 2 + && ctx.getOperation() != IndexOperation.DELETE_DISK_COMPONENTS) { // There is only a single component. There is nothing to merge. return false; } @@ -518,7 +521,7 @@ public class LSMHarness implements ILSMHarness { try { newComponent = lsmIndex.flush(operation); operation.getCallback().afterOperation(LSMOperationType.FLUSH, null, newComponent); - lsmIndex.markAsValid(newComponent); + newComponent.markAsValid(lsmIndex.isDurable()); } catch (Throwable e) { failedOperation = true; if (LOGGER.isLoggable(Level.SEVERE)) { @@ -569,7 +572,7 @@ public class LSMHarness implements ILSMHarness { try { newComponent = lsmIndex.merge(operation); operation.getCallback().afterOperation(LSMOperationType.MERGE, ctx.getComponentHolder(), newComponent); - lsmIndex.markAsValid(newComponent); + newComponent.markAsValid(lsmIndex.isDurable()); } catch (Throwable e) { failedOperation = true; if (LOGGER.isLoggable(Level.SEVERE)) { @@ -602,7 +605,7 @@ public class LSMHarness implements ILSMHarness { @Override public void addBulkLoadedComponent(ILSMDiskComponent c) throws HyracksDataException { - lsmIndex.markAsValid(c); + c.markAsValid(lsmIndex.isDurable()); synchronized (opTracker) { lsmIndex.addDiskComponent(c); if (replicationEnabled) { @@ -753,6 +756,105 @@ public class LSMHarness implements ILSMHarness { } @Override + public void deleteComponents(ILSMIndexOperationContext ctx, Predicate<ILSMComponent> predicate) + throws HyracksDataException { + BlockingIOOperationCallbackWrapper ioCallback = + new BlockingIOOperationCallbackWrapper(lsmIndex.getIOOperationCallback()); + boolean deleteMemoryComponent; + synchronized (opTracker) { + waitForFlushesAndMerges(); + ensureNoFailedFlush(); + // We always start with the memory component + ILSMMemoryComponent memComponent = lsmIndex.getCurrentMemoryComponent(); + deleteMemoryComponent = predicate.test(memComponent); + if (deleteMemoryComponent) { + // schedule a delete for flushed component + ctx.reset(); + ctx.setOperation(IndexOperation.DELETE_MEMORY_COMPONENT); + // ScheduleFlush is actually a try operation + scheduleFlush(ctx, ioCallback); + } + } + // Here, we are releasing the opTracker to allow other operations: + // (searches, delete flush we will schedule, delete merge we will schedule). + if (deleteMemoryComponent) { + IOOperationUtils.waitForIoOperation(ioCallback); + } + ctx.reset(); + ioCallback = new BlockingIOOperationCallbackWrapper(lsmIndex.getIOOperationCallback()); + ctx.setOperation(IndexOperation.DELETE_DISK_COMPONENTS); + List<ILSMDiskComponent> toBeDeleted; + synchronized (opTracker) { + waitForFlushesAndMerges(); + // Ensure that current memory component is empty and that no failed flushes happened so far + // This is a workaround until ASTERIXDB-2106 is fixed + ensureNoFailedFlush(); + List<ILSMDiskComponent> diskComponents = lsmIndex.getDiskComponents(); + for (ILSMDiskComponent component : diskComponents) { + if (predicate.test(component)) { + ctx.getComponentsToBeMerged().add(component); + } + } + if (ctx.getComponentsToBeMerged().isEmpty()) { + return; + } + toBeDeleted = new ArrayList<>(ctx.getComponentsToBeMerged()); + // ScheduleMerge is actually a try operation + scheduleMerge(ctx, ioCallback); + } + IOOperationUtils.waitForIoOperation(ioCallback); + // ensure that merge has succeeded + for (ILSMDiskComponent component : toBeDeleted) { + if (lsmIndex.getDiskComponents().contains(component)) { + throw HyracksDataException.create(ErrorCode.A_MERGE_OPERATION_HAS_FAILED); + } + } + } + + /** + * This can only be called in the steady state where: + * 1. no scheduled flushes + * 2. no incoming data + * + * @throws HyracksDataException + */ + private void ensureNoFailedFlush() throws HyracksDataException { + for (ILSMMemoryComponent memoryComponent : lsmIndex.getMemoryComponents()) { + if (memoryComponent.getState() == ComponentState.READABLE_UNWRITABLE) { + throw HyracksDataException.create(ErrorCode.A_FLUSH_OPERATION_HAS_FAILED); + } + } + } + + private void waitForFlushesAndMerges() throws HyracksDataException { + while (flushingOrMerging()) { + try { + opTracker.wait(); // NOSONAR: OpTracker is always synchronized here + } catch (InterruptedException e) { + LOGGER.log(Level.WARNING, "Interrupted while attempting component level delete", e); + Thread.currentThread().interrupt(); + throw HyracksDataException.create(e); + } + } + } + + private boolean flushingOrMerging() { + // check if flushes are taking place + for (ILSMMemoryComponent memComponent : lsmIndex.getMemoryComponents()) { + if (memComponent.getState() == ComponentState.READABLE_UNWRITABLE_FLUSHING) { + return true; + } + } + // check if merges are taking place + for (ILSMDiskComponent diskComponent : lsmIndex.getDiskComponents()) { + if (diskComponent.getState() == ComponentState.READABLE_MERGING) { + return true; + } + } + return false; + } + + @Override public String toString() { return getClass().getSimpleName() + ":" + lsmIndex; } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/b2e50b7d/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMTreeIndexAccessor.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMTreeIndexAccessor.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMTreeIndexAccessor.java index a45225d..c0fd443 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMTreeIndexAccessor.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMTreeIndexAccessor.java @@ -20,6 +20,7 @@ package org.apache.hyracks.storage.am.lsm.common.impls; import java.util.List; +import java.util.function.Predicate; import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.data.std.api.IValueReference; @@ -30,6 +31,7 @@ import org.apache.hyracks.storage.am.common.api.ITreeIndexCursor; import org.apache.hyracks.storage.am.common.ophelpers.IndexOperation; import org.apache.hyracks.storage.am.lsm.common.api.IFrameOperationCallback; import org.apache.hyracks.storage.am.lsm.common.api.IFrameTupleProcessor; +import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponent; import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent; import org.apache.hyracks.storage.am.lsm.common.api.ILSMHarness; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation; @@ -123,7 +125,6 @@ public class LSMTreeIndexAccessor implements ILSMIndexAccessor { @Override public void merge(ILSMIOOperation operation) throws HyracksDataException { - ctx.setOperation(IndexOperation.MERGE); lsmHarness.merge(ctx, operation); } @@ -224,4 +225,14 @@ public class LSMTreeIndexAccessor implements ILSMIndexAccessor { public String toString() { return getClass().getSimpleName() + ':' + lsmHarness.toString(); } + + @Override + public void deleteComponents(Predicate<ILSMComponent> predicate) throws HyracksDataException { + lsmHarness.deleteComponents(ctx, predicate); + } + + @Override + public ILSMIndexOperationContext getOpContext() { + return ctx; + } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/b2e50b7d/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/MergeOperation.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/MergeOperation.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/MergeOperation.java index 3540b84..c83d534 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/MergeOperation.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/MergeOperation.java @@ -28,19 +28,16 @@ import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor; import org.apache.hyracks.storage.common.IIndexCursor; public class MergeOperation extends AbstractIoOperation { - - protected final List<ILSMComponent> mergingComponents; protected final IIndexCursor cursor; public MergeOperation(ILSMIndexAccessor accessor, FileReference target, ILSMIOOperationCallback callback, - String indexIdentifier, List<ILSMComponent> mergingComponents, IIndexCursor cursor) { + String indexIdentifier, IIndexCursor cursor) { super(accessor, target, callback, indexIdentifier); - this.mergingComponents = mergingComponents; this.cursor = cursor; } public List<ILSMComponent> getMergingComponents() { - return mergingComponents; + return accessor.getOpContext().getComponentHolder(); } @Override http://git-wip-us.apache.org/repos/asf/asterixdb/blob/b2e50b7d/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/PrefixMergePolicy.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/PrefixMergePolicy.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/PrefixMergePolicy.java index 6878910..7d7266e 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/PrefixMergePolicy.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/PrefixMergePolicy.java @@ -52,7 +52,7 @@ public class PrefixMergePolicy implements ILSMMergePolicy { @Override public void diskComponentAdded(final ILSMIndex index, boolean fullMergeIsRequested) throws HyracksDataException { - List<ILSMDiskComponent> immutableComponents = new ArrayList<>(index.getImmutableComponents()); + List<ILSMDiskComponent> immutableComponents = new ArrayList<>(index.getDiskComponents()); if (!areComponentsReadableWritableState(immutableComponents)) { return; @@ -140,7 +140,7 @@ public class PrefixMergePolicy implements ILSMMergePolicy { * there will be no new merge either in this situation. */ - List<ILSMDiskComponent> immutableComponents = new ArrayList<>(index.getImmutableComponents()); + List<ILSMDiskComponent> immutableComponents = new ArrayList<>(index.getDiskComponents()); // reverse the list so that we look from the oldest to the newest components Collections.reverse(immutableComponents); int mergableImmutableComponentCount = getMergableImmutableComponentCount(immutableComponents); @@ -225,7 +225,7 @@ public class PrefixMergePolicy implements ILSMMergePolicy { * @throws IndexException */ protected boolean scheduleMerge(final ILSMIndex index) throws HyracksDataException { - List<ILSMDiskComponent> immutableComponents = new ArrayList<>(index.getImmutableComponents()); + List<ILSMDiskComponent> immutableComponents = new ArrayList<>(index.getDiskComponents()); // Reverse the components order so that we look at components from oldest to newest. Collections.reverse(immutableComponents); http://git-wip-us.apache.org/repos/asf/asterixdb/blob/b2e50b7d/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/TracedIOOperation.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/TracedIOOperation.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/TracedIOOperation.java index d801a44..08e5f94 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/TracedIOOperation.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/TracedIOOperation.java @@ -26,6 +26,7 @@ import org.apache.hyracks.api.io.FileReference; import org.apache.hyracks.api.io.IODeviceHandle; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallback; +import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor; import org.apache.hyracks.util.trace.Tracer; import org.apache.hyracks.util.trace.Tracer.Scope; @@ -94,6 +95,11 @@ class TracedIOOperation implements ILSMIOOperation { public FileReference getTarget() { return ioOp.getTarget(); } + + @Override + public ILSMIndexAccessor getAccessor() { + return ioOp.getAccessor(); + } } class ComparableTracedIOOperation extends TracedIOOperation implements Comparable<ILSMIOOperation> { @@ -122,4 +128,4 @@ class ComparableTracedIOOperation extends TracedIOOperation implements Comparabl + other.getClass().getSimpleName() + " in " + getClass().getSimpleName()); return Integer.signum(hashCode() - other.hashCode()); } -} +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/asterixdb/blob/b2e50b7d/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/util/ComponentUtils.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/util/ComponentUtils.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/util/ComponentUtils.java new file mode 100644 index 0000000..d9f5c8f --- /dev/null +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/util/ComponentUtils.java @@ -0,0 +1,178 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hyracks.storage.am.lsm.common.util; + +import java.util.List; +import java.util.logging.Level; +import java.util.logging.Logger; + +import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.data.std.api.IPointable; +import org.apache.hyracks.data.std.api.IValueReference; +import org.apache.hyracks.data.std.primitive.LongPointable; +import org.apache.hyracks.storage.am.bloomfilter.impls.BloomFilter; +import org.apache.hyracks.storage.am.common.api.ITreeIndex; +import org.apache.hyracks.storage.am.common.freepage.MutableArrayValueReference; +import org.apache.hyracks.storage.am.lsm.common.api.IComponentMetadata; +import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent; +import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex; +import org.apache.hyracks.storage.am.lsm.common.api.ILSMMemoryComponent; +import org.apache.hyracks.storage.common.buffercache.IBufferCache; + +public class ComponentUtils { + + private static final Logger LOGGER = Logger.getLogger(ComponentUtils.class.getName()); + public static final MutableArrayValueReference MARKER_LSN_KEY = new MutableArrayValueReference("Marker".getBytes()); + public static final long NOT_FOUND = -1L; + + private ComponentUtils() { + } + + /** + * Get a long value from the metadata of a component or return a default value + * + * @param metadata + * the component's metadata + * @param key + * the key + * @param defaultValue + * the default value + * @return + * the long value if found, the default value otherwise + * @throws HyracksDataException + * If the comopnent was a disk component and an IO error was encountered + */ + public static long getLong(IComponentMetadata metadata, IValueReference key, long defaultValue) + throws HyracksDataException { + IValueReference value = metadata.get(key); + return value == null || value.getLength() == 0 ? defaultValue + : LongPointable.getLong(value.getByteArray(), value.getStartOffset()); + } + + /** + * Get a value from an index's metadata pages. It first, searches the current in memory component + * then searches the other components. in reverse order. + * Note: This method locks on the OpTracker of the index + * + * @param index + * @param key + * @param pointable + * @throws HyracksDataException + */ + public static void get(ILSMIndex index, IValueReference key, IPointable pointable) throws HyracksDataException { + LOGGER.log(Level.INFO, "Getting " + key + " from index " + index); + // Lock the opTracker to ensure index components don't change + synchronized (index.getOperationTracker()) { + index.getCurrentMemoryComponent().getMetadata().get(key, pointable); + if (pointable.getLength() == 0) { + LOGGER.log(Level.INFO, key + " was not found in mutable memory component of " + index); + // was not found in the in current mutable component, search in the other in memory components + fromImmutableMemoryComponents(index, key, pointable); + if (pointable.getLength() == 0) { + LOGGER.log(Level.INFO, key + " was not found in all immmutable memory components of " + index); + // was not found in the in all in memory components, search in the disk components + fromDiskComponents(index, key, pointable); + if (pointable.getLength() == 0) { + LOGGER.log(Level.INFO, key + " was not found in all disk components of " + index); + } else { + LOGGER.log(Level.INFO, key + " was found in disk components of " + index); + } + } else { + LOGGER.log(Level.INFO, key + " was found in the immutable memory components of " + index); + } + } else { + LOGGER.log(Level.INFO, key + " was found in mutable memory component of " + index); + } + } + } + + /** + * Put LSM metadata state into the index's current memory component. + * + * @param index, + * the LSM index. + * @param key, + * the key for the metadata state. + * @param pointable, + * the value for the metadata state. + * @throws HyracksDataException + */ + public static void put(ILSMIndex index, IValueReference key, IPointable pointable) throws HyracksDataException { + // write the opTracker to ensure the component layout don't change + synchronized (index.getOperationTracker()) { + index.getCurrentMemoryComponent().getMetadata().put(key, pointable); + } + } + + private static void fromDiskComponents(ILSMIndex index, IValueReference key, IPointable pointable) + throws HyracksDataException { + LOGGER.log(Level.INFO, "Getting " + key + " from disk components of " + index); + for (ILSMDiskComponent c : index.getDiskComponents()) { + LOGGER.log(Level.INFO, "Getting " + key + " from disk components " + c); + c.getMetadata().get(key, pointable); + if (pointable.getLength() != 0) { + // Found + return; + } + } + } + + private static void fromImmutableMemoryComponents(ILSMIndex index, IValueReference key, IPointable pointable) { + LOGGER.log(Level.INFO, "Getting " + key + " from immutable memory components of " + index); + List<ILSMMemoryComponent> memComponents = index.getMemoryComponents(); + int numOtherMemComponents = memComponents.size() - 1; + int next = index.getCurrentMemoryComponentIndex(); + LOGGER.log(Level.INFO, index + " has " + numOtherMemComponents + " immutable memory components"); + for (int i = 0; i < numOtherMemComponents; i++) { + LOGGER.log(Level.INFO, "trying to get " + key + " from immutable memory components number: " + (i + 1)); + next = next - 1; + if (next < 0) { + next = memComponents.size() - 1; + } + ILSMMemoryComponent c = index.getMemoryComponents().get(next); + if (c.isReadable()) { + c.getMetadata().get(key, pointable); + if (pointable.getLength() != 0) { + // Found + return; + } + } + } + } + + public static void markAsValid(ITreeIndex treeIndex, boolean forceToDisk) throws HyracksDataException { + int fileId = treeIndex.getFileId(); + IBufferCache bufferCache = treeIndex.getBufferCache(); + treeIndex.getPageManager().close(); + // WARNING: flushing the metadata page should be done after releasing the write latch; otherwise, the page + // won't be flushed to disk because it won't be dirty until the write latch has been released. + // Force modified metadata page to disk. + // If the index is not durable, then the flush is not necessary. + if (forceToDisk) { + bufferCache.force(fileId, true); + } + } + + public static void markAsValid(IBufferCache bufferCache, BloomFilter filter, boolean forceToDisk) + throws HyracksDataException { + if (forceToDisk) { + bufferCache.force(filter.getFileId(), true); + } + } +} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/b2e50b7d/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/util/IOOperationUtils.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/util/IOOperationUtils.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/util/IOOperationUtils.java new file mode 100644 index 0000000..0aeb0b9 --- /dev/null +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/util/IOOperationUtils.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hyracks.storage.am.lsm.common.util; + +import java.util.logging.Level; +import java.util.logging.Logger; + +import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.storage.am.lsm.common.impls.BlockingIOOperationCallbackWrapper; + +public class IOOperationUtils { + private static final Logger LOGGER = Logger.getLogger(IOOperationUtils.class.getName()); + + private IOOperationUtils() { + } + + public static void waitForIoOperation(BlockingIOOperationCallbackWrapper ioCallback) throws HyracksDataException { + // Note that the following call assumes that the io operation has succeeded. + try { + ioCallback.waitForIO(); + } catch (InterruptedException e) { + LOGGER.log(Level.WARNING, "Operation has been interrupted. returning"); + Thread.currentThread().interrupt(); + throw HyracksDataException.create(e); + } + } +} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/b2e50b7d/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/utils/ComponentMetadataUtil.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/utils/ComponentMetadataUtil.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/utils/ComponentMetadataUtil.java deleted file mode 100644 index 40017d1..0000000 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/utils/ComponentMetadataUtil.java +++ /dev/null @@ -1,155 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.hyracks.storage.am.lsm.common.utils; - -import java.util.List; -import java.util.logging.Level; -import java.util.logging.Logger; - -import org.apache.hyracks.api.exceptions.HyracksDataException; -import org.apache.hyracks.data.std.api.IPointable; -import org.apache.hyracks.data.std.api.IValueReference; -import org.apache.hyracks.data.std.primitive.LongPointable; -import org.apache.hyracks.storage.am.common.freepage.MutableArrayValueReference; -import org.apache.hyracks.storage.am.lsm.common.api.IComponentMetadata; -import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent; -import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex; -import org.apache.hyracks.storage.am.lsm.common.api.ILSMMemoryComponent; - -public class ComponentMetadataUtil { - - private static final Logger LOGGER = Logger.getLogger(ComponentMetadataUtil.class.getName()); - public static final MutableArrayValueReference MARKER_LSN_KEY = new MutableArrayValueReference("Marker".getBytes()); - public static final long NOT_FOUND = -1L; - - private ComponentMetadataUtil() { - } - - /** - * Get a long value from the metadata of a component or return a default value - * - * @param metadata - * the component's metadata - * @param key - * the key - * @param defaultValue - * the default value - * @return - * the long value if found, the default value otherwise - * @throws HyracksDataException - * If the comopnent was a disk component and an IO error was encountered - */ - public static long getLong(IComponentMetadata metadata, IValueReference key, long defaultValue) - throws HyracksDataException { - IValueReference value = metadata.get(key); - return value == null || value.getLength() == 0 ? defaultValue - : LongPointable.getLong(value.getByteArray(), value.getStartOffset()); - } - - /** - * Get a value from an index's metadata pages. It first, searches the current in memory component - * then searches the other components. in reverse order. - * Note: This method locks on the OpTracker of the index - * - * @param index - * @param key - * @param pointable - * @throws HyracksDataException - */ - public static void get(ILSMIndex index, IValueReference key, IPointable pointable) throws HyracksDataException { - LOGGER.log(Level.INFO, "Getting " + key + " from index " + index); - // Lock the opTracker to ensure index components don't change - synchronized (index.getOperationTracker()) { - index.getCurrentMemoryComponent().getMetadata().get(key, pointable); - if (pointable.getLength() == 0) { - LOGGER.log(Level.INFO, key + " was not found in mutable memory component of " + index); - // was not found in the in current mutable component, search in the other in memory components - fromImmutableMemoryComponents(index, key, pointable); - if (pointable.getLength() == 0) { - LOGGER.log(Level.INFO, key + " was not found in all immmutable memory components of " + index); - // was not found in the in all in memory components, search in the disk components - fromDiskComponents(index, key, pointable); - if (pointable.getLength() == 0) { - LOGGER.log(Level.INFO, key + " was not found in all disk components of " + index); - } else { - LOGGER.log(Level.INFO, key + " was found in disk components of " + index); - } - } else { - LOGGER.log(Level.INFO, key + " was found in the immutable memory components of " + index); - } - } else { - LOGGER.log(Level.INFO, key + " was found in mutable memory component of " + index); - } - } - } - - /** - * Put LSM metadata state into the index's current memory component. - * - * @param index, - * the LSM index. - * @param key, - * the key for the metadata state. - * @param pointable, - * the value for the metadata state. - * @throws HyracksDataException - */ - public static void put(ILSMIndex index, IValueReference key, IPointable pointable) throws HyracksDataException { - // write the opTracker to ensure the component layout don't change - synchronized (index.getOperationTracker()) { - index.getCurrentMemoryComponent().getMetadata().put(key, pointable); - } - } - - private static void fromDiskComponents(ILSMIndex index, IValueReference key, IPointable pointable) - throws HyracksDataException { - LOGGER.log(Level.INFO, "Getting " + key + " from disk components of " + index); - for (ILSMDiskComponent c : index.getImmutableComponents()) { - LOGGER.log(Level.INFO, "Getting " + key + " from disk components " + c); - c.getMetadata().get(key, pointable); - if (pointable.getLength() != 0) { - // Found - return; - } - } - } - - private static void fromImmutableMemoryComponents(ILSMIndex index, IValueReference key, IPointable pointable) { - LOGGER.log(Level.INFO, "Getting " + key + " from immutable memory components of " + index); - List<ILSMMemoryComponent> memComponents = index.getMemoryComponents(); - int numOtherMemComponents = memComponents.size() - 1; - int next = index.getCurrentMemoryComponentIndex(); - LOGGER.log(Level.INFO, index + " has " + numOtherMemComponents + " immutable memory components"); - for (int i = 0; i < numOtherMemComponents; i++) { - LOGGER.log(Level.INFO, "trying to get " + key + " from immutable memory components number: " + (i + 1)); - next = next - 1; - if (next < 0) { - next = memComponents.size() - 1; - } - ILSMMemoryComponent c = index.getMemoryComponents().get(next); - if (c.isReadable()) { - c.getMetadata().get(key, pointable); - if (pointable.getLength() != 0) { - // Found - return; - } - } - } - } -} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/b2e50b7d/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndex.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndex.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndex.java index 7994bf0..4cd8543 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndex.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndex.java @@ -219,7 +219,7 @@ public class LSMInvertedIndex extends AbstractLSMIndex implements IInvertedIndex if (ctx.getIndexTuple() != null) { ctx.getIndexTuple().reset(tuple); indexTuple = ctx.getIndexTuple(); - ((InMemoryInvertedIndexAccessor)(ctx.getCurrentMutableInvIndexAccessors())).resetLogTuple(tuple); + ((InMemoryInvertedIndexAccessor) (ctx.getCurrentMutableInvIndexAccessors())).resetLogTuple(tuple); } else { indexTuple = tuple; } @@ -330,7 +330,7 @@ public class LSMInvertedIndex extends AbstractLSMIndex implements IInvertedIndex } @Override - public ILSMDiskComponent flush(ILSMIOOperation operation) throws HyracksDataException { + public ILSMDiskComponent doFlush(ILSMIOOperation operation) throws HyracksDataException { LSMInvertedIndexFlushOperation flushOp = (LSMInvertedIndexFlushOperation) operation; // Create an inverted index instance to be bulk loaded. @@ -408,13 +408,12 @@ public class LSMInvertedIndex extends AbstractLSMIndex implements IInvertedIndex } @Override - public ILSMDiskComponent merge(ILSMIOOperation operation) throws HyracksDataException { + public ILSMDiskComponent doMerge(ILSMIOOperation operation) throws HyracksDataException { LSMInvertedIndexMergeOperation mergeOp = (LSMInvertedIndexMergeOperation) operation; IIndexCursor cursor = mergeOp.getCursor(); RangePredicate mergePred = new RangePredicate(null, null, true, true, null, null); ILSMIndexOperationContext opCtx = ((LSMIndexSearchCursor) cursor).getOpCtx(); - opCtx.getComponentHolder().addAll(mergeOp.getMergingComponents()); // Scan diskInvertedIndexes ignoring the memoryInvertedIndex. search(opCtx, cursor, mergePred); @@ -620,26 +619,6 @@ public class LSMInvertedIndex extends AbstractLSMIndex implements IInvertedIndex } @Override - public void markAsValid(ILSMDiskComponent lsmComponent) throws HyracksDataException { - LSMInvertedIndexDiskComponent invIndexComponent = (LSMInvertedIndexDiskComponent) lsmComponent; - OnDiskInvertedIndex invIndex = (OnDiskInvertedIndex) invIndexComponent.getInvIndex(); - IBufferCache bufferCache = invIndex.getBufferCache(); - markAsValidInternal(invIndex.getBufferCache(), invIndexComponent.getBloomFilter()); - - // Flush inverted index second. - bufferCache.force(invIndex.getInvListsFileId(), true); - markAsValidInternal(invIndex.getBTree()); - - // Flush deleted keys BTree. - markAsValidInternal(invIndexComponent.getDeletedKeysBTree()); - } - - @Override - public String toString() { - return "LSMInvertedIndex [" + fileManager.getBaseDir() + "]"; - } - - @Override public boolean isPrimaryIndex() { return false; } @@ -709,22 +688,20 @@ public class LSMInvertedIndex extends AbstractLSMIndex implements IInvertedIndex @Override protected ILSMIOOperation createFlushOperation(AbstractLSMIndexOperationContext opCtx, - ILSMMemoryComponent flushingComponent, LSMComponentFileReferences componentFileRefs, - ILSMIOOperationCallback callback) throws HyracksDataException { + LSMComponentFileReferences componentFileRefs, ILSMIOOperationCallback callback) + throws HyracksDataException { return new LSMInvertedIndexFlushOperation(new LSMInvertedIndexAccessor(getLsmHarness(), opCtx), - flushingComponent, componentFileRefs.getInsertIndexFileReference(), - componentFileRefs.getDeleteIndexFileReference(), componentFileRefs.getBloomFilterFileReference(), - callback, fileManager.getBaseDir().getAbsolutePath()); + componentFileRefs.getInsertIndexFileReference(), componentFileRefs.getDeleteIndexFileReference(), + componentFileRefs.getBloomFilterFileReference(), callback, fileManager.getBaseDir().getAbsolutePath()); } @Override protected ILSMIOOperation createMergeOperation(AbstractLSMIndexOperationContext opCtx, - List<ILSMComponent> mergingComponents, LSMComponentFileReferences mergeFileRefs, - ILSMIOOperationCallback callback) throws HyracksDataException { + LSMComponentFileReferences mergeFileRefs, ILSMIOOperationCallback callback) throws HyracksDataException { ILSMIndexAccessor accessor = new LSMInvertedIndexAccessor(getLsmHarness(), opCtx); IIndexCursor cursor = new LSMInvertedIndexRangeSearchCursor(opCtx); - return new LSMInvertedIndexMergeOperation(accessor, mergingComponents, cursor, - mergeFileRefs.getInsertIndexFileReference(), mergeFileRefs.getDeleteIndexFileReference(), - mergeFileRefs.getBloomFilterFileReference(), callback, fileManager.getBaseDir().getAbsolutePath()); + return new LSMInvertedIndexMergeOperation(accessor, cursor, mergeFileRefs.getInsertIndexFileReference(), + mergeFileRefs.getDeleteIndexFileReference(), mergeFileRefs.getBloomFilterFileReference(), callback, + fileManager.getBaseDir().getAbsolutePath()); } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/b2e50b7d/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexAccessor.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexAccessor.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexAccessor.java index dddd14a..61fc84e 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexAccessor.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexAccessor.java @@ -19,12 +19,14 @@ package org.apache.hyracks.storage.am.lsm.invertedindex.impls; import java.util.List; +import java.util.function.Predicate; import org.apache.hyracks.api.exceptions.ErrorCode; import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.data.std.api.IValueReference; import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference; import org.apache.hyracks.storage.am.common.ophelpers.IndexOperation; +import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponent; import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent; import org.apache.hyracks.storage.am.lsm.common.api.ILSMHarness; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation; @@ -213,4 +215,14 @@ public class LSMInvertedIndexAccessor implements ILSMIndexAccessor, IInvertedInd public String toString() { return getClass().getSimpleName() + ':' + lsmHarness.toString(); } + + @Override + public void deleteComponents(Predicate<ILSMComponent> predicate) throws HyracksDataException { + lsmHarness.deleteComponents(ctx, predicate); + } + + @Override + public ILSMIndexOperationContext getOpContext() { + return ctx; + } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/b2e50b7d/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexDiskComponent.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexDiskComponent.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexDiskComponent.java index 2470a39..b77f894 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexDiskComponent.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexDiskComponent.java @@ -24,8 +24,10 @@ import org.apache.hyracks.storage.am.btree.impls.BTree; import org.apache.hyracks.storage.am.common.api.IMetadataPageManager; import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentFilter; import org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMDiskComponent; +import org.apache.hyracks.storage.am.lsm.common.util.ComponentUtils; import org.apache.hyracks.storage.am.lsm.invertedindex.api.IInPlaceInvertedIndex; import org.apache.hyracks.storage.am.lsm.invertedindex.ondisk.OnDiskInvertedIndex; +import org.apache.hyracks.storage.common.buffercache.IBufferCache; public class LSMInvertedIndexDiskComponent extends AbstractLSMDiskComponent { @@ -80,4 +82,17 @@ public class LSMInvertedIndexDiskComponent extends AbstractLSMDiskComponent { public String toString() { return getClass().getSimpleName() + ":" + ((OnDiskInvertedIndex) invIndex).getInvListsFile().getRelativePath(); } + + @Override + public void markAsValid(boolean persist) throws HyracksDataException { + IBufferCache bufferCache = invIndex.getBufferCache(); + ComponentUtils.markAsValid(invIndex.getBufferCache(), bloomFilter, persist); + + // Flush inverted index second. + bufferCache.force(((OnDiskInvertedIndex) invIndex).getInvListsFileId(), true); + ComponentUtils.markAsValid(((OnDiskInvertedIndex) invIndex).getBTree(), persist); + + // Flush deleted keys BTree. + ComponentUtils.markAsValid(deletedKeysBTree, persist); + } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/b2e50b7d/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexFlushOperation.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexFlushOperation.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexFlushOperation.java index df4f095..2106f6a 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexFlushOperation.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexFlushOperation.java @@ -22,17 +22,16 @@ package org.apache.hyracks.storage.am.lsm.invertedindex.impls; import org.apache.hyracks.api.io.FileReference; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallback; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor; -import org.apache.hyracks.storage.am.lsm.common.api.ILSMMemoryComponent; import org.apache.hyracks.storage.am.lsm.common.impls.FlushOperation; public class LSMInvertedIndexFlushOperation extends FlushOperation { private final FileReference deletedKeysBTreeFlushTarget; private final FileReference bloomFilterFlushTarget; - public LSMInvertedIndexFlushOperation(ILSMIndexAccessor accessor, ILSMMemoryComponent flushingComponent, - FileReference flushTarget, FileReference deletedKeysBTreeFlushTarget, FileReference bloomFilterFlushTarget, + public LSMInvertedIndexFlushOperation(ILSMIndexAccessor accessor, FileReference flushTarget, + FileReference deletedKeysBTreeFlushTarget, FileReference bloomFilterFlushTarget, ILSMIOOperationCallback callback, String indexIdentifier) { - super(accessor, flushingComponent, flushTarget, callback, indexIdentifier); + super(accessor, flushTarget, callback, indexIdentifier); this.deletedKeysBTreeFlushTarget = deletedKeysBTreeFlushTarget; this.bloomFilterFlushTarget = bloomFilterFlushTarget; } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/b2e50b7d/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexMergeOperation.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexMergeOperation.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexMergeOperation.java index da374dc..2c1db0f 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexMergeOperation.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexMergeOperation.java @@ -19,10 +19,7 @@ package org.apache.hyracks.storage.am.lsm.invertedindex.impls; -import java.util.List; - import org.apache.hyracks.api.io.FileReference; -import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponent; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallback; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor; import org.apache.hyracks.storage.am.lsm.common.impls.MergeOperation; @@ -32,10 +29,10 @@ public class LSMInvertedIndexMergeOperation extends MergeOperation { private final FileReference deletedKeysBTreeMergeTarget; private final FileReference bloomFilterMergeTarget; - public LSMInvertedIndexMergeOperation(ILSMIndexAccessor accessor, List<ILSMComponent> mergingComponents, - IIndexCursor cursor, FileReference target, FileReference deletedKeysBTreeMergeTarget, - FileReference bloomFilterMergeTarget, ILSMIOOperationCallback callback, String indexIdentifier) { - super(accessor, target, callback, indexIdentifier, mergingComponents, cursor); + public LSMInvertedIndexMergeOperation(ILSMIndexAccessor accessor, IIndexCursor cursor, FileReference target, + FileReference deletedKeysBTreeMergeTarget, FileReference bloomFilterMergeTarget, + ILSMIOOperationCallback callback, String indexIdentifier) { + super(accessor, target, callback, indexIdentifier, cursor); this.deletedKeysBTreeMergeTarget = deletedKeysBTreeMergeTarget; this.bloomFilterMergeTarget = bloomFilterMergeTarget; } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/b2e50b7d/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/AbstractLSMRTree.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/AbstractLSMRTree.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/AbstractLSMRTree.java index eb03696..4f08dd3 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/AbstractLSMRTree.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/AbstractLSMRTree.java @@ -294,11 +294,6 @@ public abstract class AbstractLSMRTree extends AbstractLSMIndex implements ITree } @Override - public String toString() { - return "LSMRTree [" + fileManager.getBaseDir() + "]"; - } - - @Override protected void allocateMemoryComponent(ILSMMemoryComponent c) throws HyracksDataException { LSMRTreeMemoryComponent mutableComponent = (LSMRTreeMemoryComponent) c; ((IVirtualBufferCache) mutableComponent.getRTree().getBufferCache()).open(); http://git-wip-us.apache.org/repos/asf/asterixdb/blob/b2e50b7d/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/ExternalRTree.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/ExternalRTree.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/ExternalRTree.java index e39c3f9..6595403 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/ExternalRTree.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/ExternalRTree.java @@ -136,7 +136,7 @@ public class ExternalRTree extends LSMRTree implements ITwoPCIndex { // is needed. // It only needs to return the newer list @Override - public List<ILSMDiskComponent> getImmutableComponents() { + public List<ILSMDiskComponent> getDiskComponents() { if (version == 0) { return diskComponents; } else { @@ -258,12 +258,11 @@ public class ExternalRTree extends LSMRTree implements ITwoPCIndex { // This can be done in a better way by creating a method boolean // keepDeletedTuples(mergedComponents); @Override - public ILSMDiskComponent merge(ILSMIOOperation operation) throws HyracksDataException { + public ILSMDiskComponent doMerge(ILSMIOOperation operation) throws HyracksDataException { LSMRTreeMergeOperation mergeOp = (LSMRTreeMergeOperation) operation; IIndexCursor cursor = mergeOp.getCursor(); ISearchPredicate rtreeSearchPred = new SearchPredicate(null, null); ILSMIndexOperationContext opCtx = ((LSMRTreeSortedCursor) cursor).getOpCtx(); - opCtx.getComponentHolder().addAll(mergeOp.getMergingComponents()); search(opCtx, cursor, rtreeSearchPred); LSMRTreeDiskComponent mergedComponent = createDiskComponent(componentFactory, mergeOp.getTarget(), @@ -424,7 +423,7 @@ public class ExternalRTree extends LSMRTree implements ITwoPCIndex { // Not supported @Override - public ILSMDiskComponent flush(ILSMIOOperation operation) throws HyracksDataException { + public ILSMDiskComponent doFlush(ILSMIOOperation operation) throws HyracksDataException { throw new UnsupportedOperationException("flush not supported in LSM-Disk-Only-RTree"); } @@ -570,7 +569,7 @@ public class ExternalRTree extends LSMRTree implements ITwoPCIndex { } else if (isTransaction) { // Since this is a transaction component, validate and // deactivate. it could later be added or deleted - markAsValid(component); + component.markAsValid(durable); RTree rtree = ((LSMRTreeDiskComponent) component).getRTree(); BTree btree = ((LSMRTreeDiskComponent) component).getBTree(); BloomFilter bloomFilter = ((LSMRTreeDiskComponent) component).getBloomFilter(); @@ -621,11 +620,6 @@ public class ExternalRTree extends LSMRTree implements ITwoPCIndex { } } - @Override - public String toString() { - return "LSMTwoPCRTree [" + fileManager.getBaseDir() + "]"; - } - // The only change the the schedule merge is the method used to create the // opCtx. first line <- in schedule merge, we-> @Override @@ -640,7 +634,7 @@ public class ExternalRTree extends LSMRTree implements ITwoPCIndex { (ILSMDiskComponent) mergingComponents.get(mergingComponents.size() - 1)); ILSMIndexAccessor accessor = new LSMRTreeAccessor(getLsmHarness(), rctx, buddyBTreeFields); // create the merge operation. - LSMRTreeMergeOperation mergeOp = new LSMRTreeMergeOperation(accessor, mergingComponents, cursor, + LSMRTreeMergeOperation mergeOp = new LSMRTreeMergeOperation(accessor, cursor, relMergeFileRefs.getInsertIndexFileReference(), relMergeFileRefs.getDeleteIndexFileReference(), relMergeFileRefs.getBloomFilterFileReference(), callback, fileManager.getBaseDir().getAbsolutePath()); ioScheduler.scheduleOperation(mergeOp); http://git-wip-us.apache.org/repos/asf/asterixdb/blob/b2e50b7d/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTree.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTree.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTree.java index 0a47aea..ca0e4e1 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTree.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTree.java @@ -56,7 +56,6 @@ import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationScheduler; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexFileManager; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexOperationContext; -import org.apache.hyracks.storage.am.lsm.common.api.ILSMMemoryComponent; import org.apache.hyracks.storage.am.lsm.common.api.ILSMMergePolicy; import org.apache.hyracks.storage.am.lsm.common.api.ILSMOperationTracker; import org.apache.hyracks.storage.am.lsm.common.api.IVirtualBufferCache; @@ -163,7 +162,7 @@ public class LSMRTree extends AbstractLSMRTree { } @Override - public ILSMDiskComponent flush(ILSMIOOperation operation) throws HyracksDataException { + public ILSMDiskComponent doFlush(ILSMIOOperation operation) throws HyracksDataException { LSMRTreeFlushOperation flushOp = (LSMRTreeFlushOperation) operation; LSMRTreeMemoryComponent flushingComponent = (LSMRTreeMemoryComponent) flushOp.getFlushingComponent(); // Renaming order is critical because we use assume ordering when we @@ -263,19 +262,15 @@ public class LSMRTree extends AbstractLSMRTree { } @Override - public ILSMDiskComponent merge(ILSMIOOperation operation) throws HyracksDataException { + public ILSMDiskComponent doMerge(ILSMIOOperation operation) throws HyracksDataException { LSMRTreeMergeOperation mergeOp = (LSMRTreeMergeOperation) operation; IIndexCursor cursor = mergeOp.getCursor(); ISearchPredicate rtreeSearchPred = new SearchPredicate(null, null); ILSMIndexOperationContext opCtx = ((LSMRTreeSortedCursor) cursor).getOpCtx(); - opCtx.getComponentHolder().addAll(mergeOp.getMergingComponents()); search(opCtx, cursor, rtreeSearchPred); - LSMRTreeDiskComponent mergedComponent = createDiskComponent(componentFactory, mergeOp.getTarget(), mergeOp.getBTreeTarget(), mergeOp.getBloomFilterTarget(), true); - ILSMDiskComponentBulkLoader componentBulkLoader; - // In case we must keep the deleted-keys BTrees, then they must be merged *before* merging the r-trees so that // lsmHarness.endSearch() is called once when the r-trees have been merged. if (mergeOp.getMergingComponents().get(mergeOp.getMergingComponents().size() - 1) != diskComponents @@ -409,14 +404,6 @@ public class LSMRTree extends AbstractLSMRTree { } @Override - public void markAsValid(ILSMDiskComponent lsmComponent) throws HyracksDataException { - LSMRTreeDiskComponent component = (LSMRTreeDiskComponent) lsmComponent; - markAsValidInternal(component.getBTree().getBufferCache(), component.getBloomFilter()); - markAsValidInternal((component).getBTree()); - markAsValidInternal((component).getRTree()); - } - - @Override public Set<String> getLSMComponentPhysicalFiles(ILSMComponent lsmComponent) { Set<String> files = new HashSet<>(); LSMRTreeDiskComponent component = (LSMRTreeDiskComponent) lsmComponent; @@ -428,22 +415,21 @@ public class LSMRTree extends AbstractLSMRTree { @Override protected ILSMIOOperation createFlushOperation(AbstractLSMIndexOperationContext opCtx, - ILSMMemoryComponent flushingComponent, LSMComponentFileReferences componentFileRefs, - ILSMIOOperationCallback callback) throws HyracksDataException { + LSMComponentFileReferences componentFileRefs, ILSMIOOperationCallback callback) + throws HyracksDataException { LSMRTreeAccessor accessor = new LSMRTreeAccessor(getLsmHarness(), opCtx, buddyBTreeFields); - return new LSMRTreeFlushOperation(accessor, flushingComponent, componentFileRefs.getInsertIndexFileReference(), + return new LSMRTreeFlushOperation(accessor, componentFileRefs.getInsertIndexFileReference(), componentFileRefs.getDeleteIndexFileReference(), componentFileRefs.getBloomFilterFileReference(), callback, fileManager.getBaseDir().getAbsolutePath()); } @Override protected ILSMIOOperation createMergeOperation(AbstractLSMIndexOperationContext opCtx, - List<ILSMComponent> mergingComponents, LSMComponentFileReferences mergeFileRefs, - ILSMIOOperationCallback callback) throws HyracksDataException { + LSMComponentFileReferences mergeFileRefs, ILSMIOOperationCallback callback) throws HyracksDataException { ITreeIndexCursor cursor = new LSMRTreeSortedCursor(opCtx, linearizer, buddyBTreeFields); ILSMIndexAccessor accessor = new LSMRTreeAccessor(getLsmHarness(), opCtx, buddyBTreeFields); - return new LSMRTreeMergeOperation(accessor, mergingComponents, cursor, - mergeFileRefs.getInsertIndexFileReference(), mergeFileRefs.getDeleteIndexFileReference(), - mergeFileRefs.getBloomFilterFileReference(), callback, fileManager.getBaseDir().getAbsolutePath()); + return new LSMRTreeMergeOperation(accessor, cursor, mergeFileRefs.getInsertIndexFileReference(), + mergeFileRefs.getDeleteIndexFileReference(), mergeFileRefs.getBloomFilterFileReference(), callback, + fileManager.getBaseDir().getAbsolutePath()); } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/b2e50b7d/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeDiskComponent.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeDiskComponent.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeDiskComponent.java index 54ef122..0f7943d 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeDiskComponent.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeDiskComponent.java @@ -24,6 +24,7 @@ import org.apache.hyracks.storage.am.btree.impls.BTree; import org.apache.hyracks.storage.am.common.api.IMetadataPageManager; import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentFilter; import org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMDiskComponent; +import org.apache.hyracks.storage.am.lsm.common.util.ComponentUtils; import org.apache.hyracks.storage.am.rtree.impls.RTree; public class LSMRTreeDiskComponent extends AbstractLSMDiskComponent { @@ -81,4 +82,15 @@ public class LSMRTreeDiskComponent extends AbstractLSMDiskComponent { public String toString() { return getClass().getSimpleName() + ":" + rtree.getFileReference().getRelativePath(); } + + @Override + public void markAsValid(boolean persist) throws HyracksDataException { + if (bloomFilter != null) { + ComponentUtils.markAsValid(btree.getBufferCache(), bloomFilter, persist); + } + if (btree != null) { + ComponentUtils.markAsValid(btree, persist); + } + ComponentUtils.markAsValid(rtree, persist); + } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/b2e50b7d/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeFlushOperation.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeFlushOperation.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeFlushOperation.java index f3e45ac..6991c56 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeFlushOperation.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeFlushOperation.java @@ -21,7 +21,6 @@ package org.apache.hyracks.storage.am.lsm.rtree.impls; import org.apache.hyracks.api.io.FileReference; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallback; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor; -import org.apache.hyracks.storage.am.lsm.common.api.ILSMMemoryComponent; import org.apache.hyracks.storage.am.lsm.common.impls.FlushOperation; public class LSMRTreeFlushOperation extends FlushOperation { @@ -29,10 +28,9 @@ public class LSMRTreeFlushOperation extends FlushOperation { private final FileReference btreeFlushTarget; private final FileReference bloomFilterFlushTarget; - public LSMRTreeFlushOperation(ILSMIndexAccessor accessor, ILSMMemoryComponent flushingComponent, - FileReference flushTarget, FileReference btreeFlushTarget, FileReference bloomFilterFlushTarget, - ILSMIOOperationCallback callback, String indexIdentifier) { - super(accessor, flushingComponent, flushTarget, callback, indexIdentifier); + public LSMRTreeFlushOperation(ILSMIndexAccessor accessor, FileReference flushTarget, FileReference btreeFlushTarget, + FileReference bloomFilterFlushTarget, ILSMIOOperationCallback callback, String indexIdentifier) { + super(accessor, flushTarget, callback, indexIdentifier); this.btreeFlushTarget = btreeFlushTarget; this.bloomFilterFlushTarget = bloomFilterFlushTarget; } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/b2e50b7d/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeMergeOperation.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeMergeOperation.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeMergeOperation.java index 9b3aa0c..83872cf 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeMergeOperation.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeMergeOperation.java @@ -18,11 +18,8 @@ */ package org.apache.hyracks.storage.am.lsm.rtree.impls; -import java.util.List; - import org.apache.hyracks.api.io.FileReference; import org.apache.hyracks.storage.am.common.api.ITreeIndexCursor; -import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponent; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallback; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor; import org.apache.hyracks.storage.am.lsm.common.impls.MergeOperation; @@ -31,10 +28,10 @@ public class LSMRTreeMergeOperation extends MergeOperation { private final FileReference btreeMergeTarget; private final FileReference bloomFilterMergeTarget; - public LSMRTreeMergeOperation(ILSMIndexAccessor accessor, List<ILSMComponent> mergingComponents, - ITreeIndexCursor cursor, FileReference target, FileReference btreeMergeTarget, - FileReference bloomFilterMergeTarget, ILSMIOOperationCallback callback, String indexIdentifier) { - super(accessor, target, callback, indexIdentifier, mergingComponents, cursor); + public LSMRTreeMergeOperation(ILSMIndexAccessor accessor, ITreeIndexCursor cursor, FileReference target, + FileReference btreeMergeTarget, FileReference bloomFilterMergeTarget, ILSMIOOperationCallback callback, + String indexIdentifier) { + super(accessor, target, callback, indexIdentifier, cursor); this.btreeMergeTarget = btreeMergeTarget; this.bloomFilterMergeTarget = bloomFilterMergeTarget; } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/b2e50b7d/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeWithAntiMatterTuples.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeWithAntiMatterTuples.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeWithAntiMatterTuples.java index 94648fb..1e15455 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeWithAntiMatterTuples.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeWithAntiMatterTuples.java @@ -47,7 +47,6 @@ import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationScheduler; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexFileManager; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexOperationContext; -import org.apache.hyracks.storage.am.lsm.common.api.ILSMMemoryComponent; import org.apache.hyracks.storage.am.lsm.common.api.ILSMMergePolicy; import org.apache.hyracks.storage.am.lsm.common.api.ILSMOperationTracker; import org.apache.hyracks.storage.am.lsm.common.api.IVirtualBufferCache; @@ -124,7 +123,7 @@ public class LSMRTreeWithAntiMatterTuples extends AbstractLSMRTree { } @Override - public ILSMDiskComponent flush(ILSMIOOperation operation) throws HyracksDataException { + public ILSMDiskComponent doFlush(ILSMIOOperation operation) throws HyracksDataException { LSMRTreeFlushOperation flushOp = (LSMRTreeFlushOperation) operation; // Renaming order is critical because we use assume ordering when we // read the file names when we open the tree. @@ -212,12 +211,11 @@ public class LSMRTreeWithAntiMatterTuples extends AbstractLSMRTree { } @Override - public ILSMDiskComponent merge(ILSMIOOperation operation) throws HyracksDataException { + public ILSMDiskComponent doMerge(ILSMIOOperation operation) throws HyracksDataException { MergeOperation mergeOp = (MergeOperation) operation; IIndexCursor cursor = mergeOp.getCursor(); ISearchPredicate rtreeSearchPred = new SearchPredicate(null, null); ILSMIndexOperationContext opCtx = ((LSMIndexSearchCursor) cursor).getOpCtx(); - opCtx.getComponentHolder().addAll(mergeOp.getMergingComponents()); search(opCtx, cursor, rtreeSearchPred); // Bulk load the tuples from all on-disk RTrees into the new RTree. @@ -321,12 +319,6 @@ public class LSMRTreeWithAntiMatterTuples extends AbstractLSMRTree { } @Override - public void markAsValid(ILSMDiskComponent lsmComponent) throws HyracksDataException { - RTree rtree = ((LSMRTreeDiskComponent) lsmComponent).getRTree(); - markAsValidInternal(rtree); - } - - @Override public Set<String> getLSMComponentPhysicalFiles(ILSMComponent lsmComponent) { Set<String> files = new HashSet<>(); RTree rtree = ((LSMRTreeDiskComponent) lsmComponent).getRTree(); @@ -336,24 +328,24 @@ public class LSMRTreeWithAntiMatterTuples extends AbstractLSMRTree { @Override protected ILSMIOOperation createFlushOperation(AbstractLSMIndexOperationContext opCtx, - ILSMMemoryComponent flushingComponent, LSMComponentFileReferences componentFileRefs, - ILSMIOOperationCallback callback) throws HyracksDataException { + LSMComponentFileReferences componentFileRefs, ILSMIOOperationCallback callback) + throws HyracksDataException { ILSMIndexAccessor accessor = new LSMTreeIndexAccessor(getLsmHarness(), opCtx, cursorFactory); - return new LSMRTreeFlushOperation(accessor, flushingComponent, componentFileRefs.getInsertIndexFileReference(), - null, null, callback, fileManager.getBaseDir().getAbsolutePath()); + return new LSMRTreeFlushOperation(accessor, componentFileRefs.getInsertIndexFileReference(), null, null, + callback, fileManager.getBaseDir().getAbsolutePath()); } @Override protected ILSMIOOperation createMergeOperation(AbstractLSMIndexOperationContext opCtx, - List<ILSMComponent> mergingComponents, LSMComponentFileReferences mergeFileRefs, - ILSMIOOperationCallback callback) throws HyracksDataException { + LSMComponentFileReferences mergeFileRefs, ILSMIOOperationCallback callback) throws HyracksDataException { boolean returnDeletedTuples = false; + List<ILSMComponent> mergingComponents = opCtx.getComponentHolder(); if (mergingComponents.get(mergingComponents.size() - 1) != diskComponents.get(diskComponents.size() - 1)) { returnDeletedTuples = true; } ITreeIndexCursor cursor = new LSMRTreeWithAntiMatterTuplesSearchCursor(opCtx, returnDeletedTuples); ILSMIndexAccessor accessor = new LSMTreeIndexAccessor(getLsmHarness(), opCtx, cursorFactory); return new MergeOperation(accessor, mergeFileRefs.getInsertIndexFileReference(), callback, - fileManager.getBaseDir().getAbsolutePath(), mergingComponents, cursor); + fileManager.getBaseDir().getAbsolutePath(), cursor); } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/b2e50b7d/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/pom.xml ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/pom.xml b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/pom.xml index 55c5710..597ce59 100644 --- a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/pom.xml +++ b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/pom.xml @@ -36,6 +36,27 @@ <skip>true</skip> </configuration> </plugin> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-jar-plugin</artifactId> + <configuration> + <includes> + <include>**/*.class</include> + <include>**/*.properties</include> + <include>**/README*</include> + <include>**/NOTICE*</include> + <include>**/LICENSE*</include> + <include>**/DEPENDENCIES*</include> + </includes> + </configuration> + <executions> + <execution> + <goals> + <goal>test-jar</goal> + </goals> + </execution> + </executions> + </plugin> </plugins> </build> http://git-wip-us.apache.org/repos/asf/asterixdb/blob/b2e50b7d/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/LSMBTreeFileManagerTest.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/LSMBTreeFileManagerTest.java b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/LSMBTreeFileManagerTest.java index fd76a10..5d6d8de 100644 --- a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/LSMBTreeFileManagerTest.java +++ b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/LSMBTreeFileManagerTest.java @@ -76,12 +76,12 @@ public class LSMBTreeFileManagerTest { // Make sure the disk component was generated LSMBTree btree = (LSMBTree) ctx.getIndex(); - Assert.assertEquals("Check disk components", 1, btree.getImmutableComponents().size()); + Assert.assertEquals("Check disk components", 1, btree.getDiskComponents().size()); ctx.getIndex().deactivate(); // Delete the btree file and keep the bloom filter file from the disk component - LSMBTreeDiskComponent ilsmDiskComponent = (LSMBTreeDiskComponent) btree.getImmutableComponents().get(0); + LSMBTreeDiskComponent ilsmDiskComponent = (LSMBTreeDiskComponent) btree.getDiskComponents().get(0); ilsmDiskComponent.getBTree().getFileReference().delete(); File bloomFilterFile = ilsmDiskComponent.getBloomFilter().getFileReference().getFile().getAbsoluteFile(); @@ -90,6 +90,6 @@ public class LSMBTreeFileManagerTest { // Activating the index again should delete the orphaned bloom filter file as well as the disk component ctx.getIndex().activate(); Assert.assertEquals("Check bloom filter file deleted", false, bloomFilterFile.exists()); - Assert.assertEquals("Check disk components", 0, btree.getImmutableComponents().size()); + Assert.assertEquals("Check disk components", 0, btree.getDiskComponents().size()); } }
