http://git-wip-us.apache.org/repos/asf/asterixdb/blob/d718dc4a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMDiskComponent.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMDiskComponent.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMDiskComponent.java new file mode 100644 index 0000000..508a6cc --- /dev/null +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMDiskComponent.java @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hyracks.storage.am.lsm.common.impls; + +import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.storage.am.common.api.IMetadataPageManager; +import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentFilter; +import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent; +import org.apache.hyracks.storage.am.lsm.common.api.LSMOperationType; + +public abstract class AbstractLSMDiskComponent extends AbstractLSMComponent implements ILSMDiskComponent { + + private final DiskComponentMetadata metadata; + + public AbstractLSMDiskComponent(IMetadataPageManager mdPageManager, ILSMComponentFilter filter) { + super(filter); + state = ComponentState.READABLE_UNWRITABLE; + metadata = new DiskComponentMetadata(mdPageManager); + } + + @Override + public boolean threadEnter(LSMOperationType opType, boolean isMutableComponent) { + if (state == ComponentState.INACTIVE) { + throw new IllegalStateException("Trying to enter an inactive disk component"); + } + + switch (opType) { + case FORCE_MODIFICATION: + case MODIFICATION: + case REPLICATE: + case SEARCH: + readerCount++; + break; + case MERGE: + if (state == ComponentState.READABLE_MERGING) { + // This should never happen unless there are two concurrent merges that were scheduled + // concurrently and they have interleaving components to be merged. + // This should be handled properly by the merge policy, but we guard against that here anyway. + return false; + } + state = ComponentState.READABLE_MERGING; + readerCount++; + break; + default: + throw new UnsupportedOperationException("Unsupported operation " + opType); + } + return true; + } + + @Override + public void threadExit(LSMOperationType opType, boolean failedOperation, boolean isMutableComponent) + throws HyracksDataException { + switch (opType) { + case MERGE: + // In case two merge operations were scheduled to merge an overlapping set of components, + // the second merge will fail and it must reset those components back to their previous state. + if (failedOperation) { + state = ComponentState.READABLE_UNWRITABLE; + } + // Fallthrough + case FORCE_MODIFICATION: + case MODIFICATION: + case REPLICATE: + case SEARCH: + readerCount--; + if (readerCount == 0 && state == ComponentState.READABLE_MERGING) { + state = ComponentState.INACTIVE; + } + break; + default: + throw new UnsupportedOperationException("Unsupported operation " + opType); + } + + if (readerCount <= -1) { + throw new IllegalStateException("Invalid LSM disk component readerCount: " + readerCount); + } + } + + @Override + public DiskComponentMetadata getMetadata() { + return metadata; + } +} \ No newline at end of file
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/d718dc4a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndex.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndex.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndex.java index eaca234..aa23093 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndex.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndex.java @@ -34,30 +34,32 @@ import org.apache.hyracks.api.replication.IReplicationJob.ReplicationExecutionTy import org.apache.hyracks.api.replication.IReplicationJob.ReplicationOperation; import org.apache.hyracks.storage.am.bloomfilter.impls.BloomFilter; import org.apache.hyracks.storage.am.common.api.ITreeIndex; +import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent; import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponent; import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponent.ComponentState; import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentFilterFrameFactory; import org.apache.hyracks.storage.am.lsm.common.api.ILSMHarness; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallback; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationScheduler; +import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexFileManager; -import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexInternal; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexOperationContext; import org.apache.hyracks.storage.am.lsm.common.api.ILSMMergePolicy; import org.apache.hyracks.storage.am.lsm.common.api.ILSMOperationTracker; +import org.apache.hyracks.storage.am.lsm.common.api.ILSMMemoryComponent; import org.apache.hyracks.storage.am.lsm.common.api.IVirtualBufferCache; import org.apache.hyracks.storage.am.lsm.common.api.LSMOperationType; import org.apache.hyracks.storage.common.buffercache.IBufferCache; import org.apache.hyracks.storage.common.file.IFileMapProvider; -public abstract class AbstractLSMIndex implements ILSMIndexInternal { +public abstract class AbstractLSMIndex implements ILSMIndex { protected final ILSMHarness lsmHarness; protected final IIOManager ioManager; protected final ILSMIOOperationScheduler ioScheduler; protected final ILSMIOOperationCallback ioOpCallback; // In-memory components. - protected final List<ILSMComponent> memoryComponents; + protected final List<ILSMMemoryComponent> memoryComponents; protected final List<IVirtualBufferCache> virtualBufferCaches; protected AtomicInteger currentMutableComponentId; @@ -65,8 +67,9 @@ public abstract class AbstractLSMIndex implements ILSMIndexInternal { protected final IBufferCache diskBufferCache; protected final ILSMIndexFileManager fileManager; protected final IFileMapProvider diskFileMapProvider; - protected final List<ILSMComponent> diskComponents; - protected final List<ILSMComponent> inactiveDiskComponents; + // components with lower indexes are newer than components with higher index + protected final List<ILSMDiskComponent> diskComponents; + protected final List<ILSMDiskComponent> inactiveDiskComponents; protected final double bloomFilterFalsePositiveRate; protected final ILSMComponentFilterFrameFactory filterFrameFactory; protected final LSMComponentFilterManager filterManager; @@ -77,11 +80,11 @@ public abstract class AbstractLSMIndex implements ILSMIndexInternal { protected boolean memoryComponentsAllocated = false; public AbstractLSMIndex(IIOManager ioManager, List<IVirtualBufferCache> virtualBufferCaches, - IBufferCache diskBufferCache, - ILSMIndexFileManager fileManager, IFileMapProvider diskFileMapProvider, double bloomFilterFalsePositiveRate, - ILSMMergePolicy mergePolicy, ILSMOperationTracker opTracker, ILSMIOOperationScheduler ioScheduler, - ILSMIOOperationCallback ioOpCallback, ILSMComponentFilterFrameFactory filterFrameFactory, - LSMComponentFilterManager filterManager, int[] filterFields, boolean durable) { + IBufferCache diskBufferCache, ILSMIndexFileManager fileManager, IFileMapProvider diskFileMapProvider, + double bloomFilterFalsePositiveRate, ILSMMergePolicy mergePolicy, ILSMOperationTracker opTracker, + ILSMIOOperationScheduler ioScheduler, ILSMIOOperationCallback ioOpCallback, + ILSMComponentFilterFrameFactory filterFrameFactory, LSMComponentFilterManager filterManager, + int[] filterFields, boolean durable) { this.ioManager = ioManager; this.virtualBufferCaches = virtualBufferCaches; this.diskBufferCache = diskBufferCache; @@ -154,12 +157,12 @@ public abstract class AbstractLSMIndex implements ILSMIndexInternal { } @Override - public void addComponent(ILSMComponent c) throws HyracksDataException { + public void addDiskComponent(ILSMDiskComponent c) throws HyracksDataException { diskComponents.add(0, c); } @Override - public void subsumeMergedComponents(ILSMComponent newComponent, List<ILSMComponent> mergedComponents) + public void subsumeMergedComponents(ILSMDiskComponent newComponent, List<ILSMComponent> mergedComponents) throws HyracksDataException { int swapIndex = diskComponents.indexOf(mergedComponents.get(0)); diskComponents.removeAll(mergedComponents); @@ -169,11 +172,11 @@ public abstract class AbstractLSMIndex implements ILSMIndexInternal { @Override public void changeMutableComponent() { currentMutableComponentId.set((currentMutableComponentId.get() + 1) % memoryComponents.size()); - ((AbstractMemoryLSMComponent) memoryComponents.get(currentMutableComponentId.get())).setActive(); + memoryComponents.get(currentMutableComponentId.get()).activate(); } @Override - public List<ILSMComponent> getImmutableComponents() { + public List<ILSMDiskComponent> getImmutableComponents() { return diskComponents; } @@ -210,7 +213,7 @@ public abstract class AbstractLSMIndex implements ILSMIndexInternal { public boolean isEmptyIndex() { boolean isModified = false; for (ILSMComponent c : memoryComponents) { - AbstractMemoryLSMComponent mutableComponent = (AbstractMemoryLSMComponent) c; + AbstractLSMMemoryComponent mutableComponent = (AbstractLSMMemoryComponent) c; if (mutableComponent.isModified()) { isModified = true; break; @@ -232,36 +235,36 @@ public abstract class AbstractLSMIndex implements ILSMIndexInternal { @Override public boolean isCurrentMutableComponentEmpty() throws HyracksDataException { //check if the current memory component has been modified - return !((AbstractMemoryLSMComponent) memoryComponents.get(currentMutableComponentId.get())).isModified(); + return !memoryComponents.get(currentMutableComponentId.get()).isModified(); } public void setCurrentMutableComponentState(ComponentState componentState) { - ((AbstractMemoryLSMComponent) memoryComponents.get(currentMutableComponentId.get())).setState(componentState); + memoryComponents.get(currentMutableComponentId.get()).setState(componentState); } public ComponentState getCurrentMutableComponentState() { - return ((AbstractMemoryLSMComponent) memoryComponents.get(currentMutableComponentId.get())).getState(); + return memoryComponents.get(currentMutableComponentId.get()).getState(); } public int getCurrentMutableComponentWriterCount() { - return ((AbstractMemoryLSMComponent) memoryComponents.get(currentMutableComponentId.get())).getWriterCount(); + return memoryComponents.get(currentMutableComponentId.get()).getWriterCount(); } @Override - public List<ILSMComponent> getInactiveDiskComponents() { + public List<ILSMDiskComponent> getInactiveDiskComponents() { return inactiveDiskComponents; } @Override - public void addInactiveDiskComponent(ILSMComponent diskComponent) { + public void addInactiveDiskComponent(ILSMDiskComponent diskComponent) { inactiveDiskComponents.add(diskComponent); } public abstract Set<String> getLSMComponentPhysicalFiles(ILSMComponent newComponent); @Override - public void scheduleReplication(ILSMIndexOperationContext ctx, List<ILSMComponent> lsmComponents, boolean bulkload, - ReplicationOperation operation, LSMOperationType opType) throws HyracksDataException { + public void scheduleReplication(ILSMIndexOperationContext ctx, List<ILSMDiskComponent> lsmComponents, + boolean bulkload, ReplicationOperation operation, LSMOperationType opType) throws HyracksDataException { //get set of files to be replicated for this component Set<String> componentFiles = new HashSet<>(); @@ -278,8 +281,8 @@ public abstract class AbstractLSMIndex implements ILSMIndexInternal { } //create replication job and submit it - LSMIndexReplicationJob job = new LSMIndexReplicationJob(this, ctx, componentFiles, operation, executionType, - opType); + LSMIndexReplicationJob job = + new LSMIndexReplicationJob(this, ctx, componentFiles, operation, executionType, opType); try { diskBufferCache.getIOReplicationManager().submitJob(job); } catch (IOException e) { @@ -300,7 +303,18 @@ public abstract class AbstractLSMIndex implements ILSMIndexInternal { return durable; } - public ILSMComponent getCurrentMemoryComponent() { + @Override + public ILSMMemoryComponent getCurrentMemoryComponent() { return memoryComponents.get(currentMutableComponentId.get()); } + + @Override + public int getCurrentMemoryComponentIndex() { + return currentMutableComponentId.get(); + } + + @Override + public List<ILSMMemoryComponent> getMemoryComponents() { + return memoryComponents; + } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/d718dc4a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMMemoryComponent.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMMemoryComponent.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMMemoryComponent.java new file mode 100644 index 0000000..bfe7fc0 --- /dev/null +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMMemoryComponent.java @@ -0,0 +1,210 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hyracks.storage.am.lsm.common.impls; + +import java.util.concurrent.atomic.AtomicBoolean; + +import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentFilter; +import org.apache.hyracks.storage.am.lsm.common.api.ILSMMemoryComponent; +import org.apache.hyracks.storage.am.lsm.common.api.IVirtualBufferCache; +import org.apache.hyracks.storage.am.lsm.common.api.LSMOperationType; + +public abstract class AbstractLSMMemoryComponent extends AbstractLSMComponent implements ILSMMemoryComponent { + + private final IVirtualBufferCache vbc; + private final AtomicBoolean isModified; + private int writerCount; + private boolean requestedToBeActive; + private final MemoryComponentMetadata metadata; + + public AbstractLSMMemoryComponent(IVirtualBufferCache vbc, boolean isActive, ILSMComponentFilter filter) { + super(filter); + this.vbc = vbc; + writerCount = 0; + if (isActive) { + state = ComponentState.READABLE_WRITABLE; + } else { + state = ComponentState.INACTIVE; + } + isModified = new AtomicBoolean(); + metadata = new MemoryComponentMetadata(); + } + + @Override + public boolean threadEnter(LSMOperationType opType, boolean isMutableComponent) throws HyracksDataException { + if (state == ComponentState.INACTIVE && requestedToBeActive) { + state = ComponentState.READABLE_WRITABLE; + requestedToBeActive = false; + } + switch (opType) { + case FORCE_MODIFICATION: + if (isMutableComponent) { + if (state == ComponentState.READABLE_WRITABLE || state == ComponentState.READABLE_UNWRITABLE) { + writerCount++; + } else { + return false; + } + } else { + if (state == ComponentState.READABLE_UNWRITABLE + || state == ComponentState.READABLE_UNWRITABLE_FLUSHING) { + readerCount++; + } else { + return false; + } + } + break; + case MODIFICATION: + if (isMutableComponent) { + if (state == ComponentState.READABLE_WRITABLE) { + writerCount++; + } else { + return false; + } + } else { + if (state == ComponentState.READABLE_UNWRITABLE + || state == ComponentState.READABLE_UNWRITABLE_FLUSHING) { + readerCount++; + } else { + return false; + } + } + break; + case REPLICATE: + case SEARCH: + if (state == ComponentState.READABLE_WRITABLE || state == ComponentState.READABLE_UNWRITABLE + || state == ComponentState.READABLE_UNWRITABLE_FLUSHING) { + readerCount++; + } else { + return false; + } + break; + case FLUSH: + if (state == ComponentState.READABLE_WRITABLE || state == ComponentState.READABLE_UNWRITABLE) { + if (writerCount != 0) { + throw new IllegalStateException("Trying to flush when writerCount != 0"); + } + state = ComponentState.READABLE_UNWRITABLE_FLUSHING; + readerCount++; + } else { + return false; + } + break; + default: + throw new UnsupportedOperationException("Unsupported operation " + opType); + } + return true; + } + + @Override + public void threadExit(LSMOperationType opType, boolean failedOperation, boolean isMutableComponent) + throws HyracksDataException { + switch (opType) { + case FORCE_MODIFICATION: + case MODIFICATION: + if (isMutableComponent) { + writerCount--; + // A failed operation should not change the component state since it's better for + // the failed operation's effect to be no-op. + if (state == ComponentState.READABLE_WRITABLE && !failedOperation && isFull()) { + state = ComponentState.READABLE_UNWRITABLE; + } + } else { + readerCount--; + if (state == ComponentState.UNREADABLE_UNWRITABLE && readerCount == 0) { + state = ComponentState.INACTIVE; + } + } + break; + case REPLICATE: + case SEARCH: + readerCount--; + if (state == ComponentState.UNREADABLE_UNWRITABLE && readerCount == 0) { + state = ComponentState.INACTIVE; + } + break; + case FLUSH: + if (state != ComponentState.READABLE_UNWRITABLE_FLUSHING) { + throw new IllegalStateException("Flush sees an illegal LSM memory compoenent state: " + state); + } + readerCount--; + if (readerCount == 0) { + state = ComponentState.INACTIVE; + } else { + state = ComponentState.UNREADABLE_UNWRITABLE; + } + break; + default: + throw new UnsupportedOperationException("Unsupported operation " + opType); + } + + if (readerCount <= -1 || writerCount <= -1) { + throw new IllegalStateException("Invalid reader or writer count " + readerCount + " - " + writerCount); + } + } + + @Override + public boolean isReadable() { + if (state == ComponentState.INACTIVE || state == ComponentState.UNREADABLE_UNWRITABLE) { + return false; + } + return true; + } + + @Override + public void setState(ComponentState state) { + this.state = state; + } + + @Override + public void activate() { + requestedToBeActive = true; + } + + @Override + public void setModified() { + isModified.set(true); + } + + @Override + public boolean isModified() { + return isModified.get(); + } + + @Override + public boolean isFull() { + return vbc.isFull(); + } + + @Override + public void reset() throws HyracksDataException { + isModified.set(false); + metadata.reset(); + } + + @Override + public int getWriterCount() { + return writerCount; + } + + @Override + public MemoryComponentMetadata getMetadata() { + return metadata; + } +} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/d718dc4a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractMemoryLSMComponent.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractMemoryLSMComponent.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractMemoryLSMComponent.java deleted file mode 100644 index 500996f..0000000 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractMemoryLSMComponent.java +++ /dev/null @@ -1,214 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.hyracks.storage.am.lsm.common.impls; - -import java.util.concurrent.atomic.AtomicBoolean; - -import org.apache.hyracks.api.exceptions.HyracksDataException; -import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentFilter; -import org.apache.hyracks.storage.am.lsm.common.api.IVirtualBufferCache; -import org.apache.hyracks.storage.am.lsm.common.api.LSMOperationType; - -public abstract class AbstractMemoryLSMComponent extends AbstractLSMComponent { - - private final IVirtualBufferCache vbc; - private final AtomicBoolean isModified; - private int writerCount; - private boolean requestedToBeActive; - - public AbstractMemoryLSMComponent(IVirtualBufferCache vbc, boolean isActive, ILSMComponentFilter filter, - long mostRecentMarkerLSN) { - super(filter, mostRecentMarkerLSN); - this.vbc = vbc; - writerCount = 0; - if (isActive) { - state = ComponentState.READABLE_WRITABLE; - } else { - state = ComponentState.INACTIVE; - } - isModified = new AtomicBoolean(); - } - - public AbstractMemoryLSMComponent(IVirtualBufferCache vbc, boolean isActive, ILSMComponentFilter filter) { - this(vbc, isActive, filter, -1L); - } - - public AbstractMemoryLSMComponent(IVirtualBufferCache vbc, boolean isActive) { - this(vbc, isActive, null); - } - - @Override - public boolean threadEnter(LSMOperationType opType, boolean isMutableComponent) throws HyracksDataException { - if (state == ComponentState.INACTIVE && requestedToBeActive) { - state = ComponentState.READABLE_WRITABLE; - requestedToBeActive = false; - } - switch (opType) { - case FORCE_MODIFICATION: - if (isMutableComponent) { - if (state == ComponentState.READABLE_WRITABLE || state == ComponentState.READABLE_UNWRITABLE) { - writerCount++; - } else { - return false; - } - } else { - if (state == ComponentState.READABLE_UNWRITABLE - || state == ComponentState.READABLE_UNWRITABLE_FLUSHING) { - readerCount++; - } else { - return false; - } - } - break; - case MODIFICATION: - if (isMutableComponent) { - if (state == ComponentState.READABLE_WRITABLE) { - writerCount++; - } else { - return false; - } - } else { - if (state == ComponentState.READABLE_UNWRITABLE - || state == ComponentState.READABLE_UNWRITABLE_FLUSHING) { - readerCount++; - } else { - return false; - } - } - break; - case REPLICATE: - case SEARCH: - if (state == ComponentState.READABLE_WRITABLE || state == ComponentState.READABLE_UNWRITABLE - || state == ComponentState.READABLE_UNWRITABLE_FLUSHING) { - readerCount++; - } else { - return false; - } - break; - case FLUSH: - if (state == ComponentState.READABLE_WRITABLE || state == ComponentState.READABLE_UNWRITABLE) { - if (writerCount != 0) { - throw new IllegalStateException("Trying to flush when writerCount != 0"); - } - state = ComponentState.READABLE_UNWRITABLE_FLUSHING; - readerCount++; - } else { - return false; - } - break; - default: - throw new UnsupportedOperationException("Unsupported operation " + opType); - } - return true; - } - - @Override - public void threadExit(LSMOperationType opType, boolean failedOperation, boolean isMutableComponent) - throws HyracksDataException { - switch (opType) { - case FORCE_MODIFICATION: - case MODIFICATION: - if (isMutableComponent) { - writerCount--; - //A failed operation should not change the component state since it's better for the failed operation's effect to be no-op. - if (state == ComponentState.READABLE_WRITABLE && !failedOperation && isFull()) { - state = ComponentState.READABLE_UNWRITABLE; - } - } else { - readerCount--; - if (state == ComponentState.UNREADABLE_UNWRITABLE && readerCount == 0) { - state = ComponentState.INACTIVE; - } - } - break; - case REPLICATE: - case SEARCH: - readerCount--; - if (state == ComponentState.UNREADABLE_UNWRITABLE && readerCount == 0) { - state = ComponentState.INACTIVE; - } - break; - case FLUSH: - if (state != ComponentState.READABLE_UNWRITABLE_FLUSHING) { - throw new IllegalStateException("Flush sees an illegal LSM memory compoenent state: " + state); - } - readerCount--; - if (readerCount == 0) { - state = ComponentState.INACTIVE; - } else { - state = ComponentState.UNREADABLE_UNWRITABLE; - } - break; - default: - throw new UnsupportedOperationException("Unsupported operation " + opType); - } - - if (readerCount <= -1 || writerCount <= -1) { - throw new IllegalStateException("Invalid reader or writer count " + readerCount + " - " + writerCount); - } - } - - public boolean isReadable() { - if (state == ComponentState.INACTIVE || state == ComponentState.UNREADABLE_UNWRITABLE) { - return false; - } - return true; - } - - @Override - public LSMComponentType getType() { - return LSMComponentType.MEMORY; - } - - @Override - public ComponentState getState() { - return state; - } - - public void setState(ComponentState state) { - this.state = state; - } - - public void setActive() { - requestedToBeActive = true; - } - - public void setIsModified() { - isModified.set(true); - } - - public boolean isModified() { - return isModified.get(); - } - - public boolean isFull() { - return vbc.isFull(); - } - - protected void reset() throws HyracksDataException { - isModified.set(false); - if (filter != null) { - filter.reset(); - } - } - - public int getWriterCount() { - return writerCount; - } -} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/d718dc4a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/BTreeFactory.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/BTreeFactory.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/BTreeFactory.java index 6164c0a..562ed5a 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/BTreeFactory.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/BTreeFactory.java @@ -25,7 +25,6 @@ import org.apache.hyracks.api.io.IIOManager; import org.apache.hyracks.storage.am.btree.impls.BTree; import org.apache.hyracks.storage.am.common.api.IPageManagerFactory; import org.apache.hyracks.storage.am.common.api.ITreeIndexFrameFactory; -import org.apache.hyracks.storage.am.common.api.IndexException; import org.apache.hyracks.storage.common.buffercache.IBufferCache; import org.apache.hyracks.storage.common.file.IFileMapProvider; @@ -39,7 +38,7 @@ public class BTreeFactory extends TreeIndexFactory<BTree> { } @Override - public BTree createIndexInstance(FileReference file) throws IndexException { + public BTree createIndexInstance(FileReference file) { return new BTree(bufferCache, fileMapProvider, freePageManagerFactory.createPageManager(bufferCache), interiorFrameFactory, leafFrameFactory, cmpFactories, fieldCount, file); } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/d718dc4a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/BlockingIOOperationCallbackWrapper.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/BlockingIOOperationCallbackWrapper.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/BlockingIOOperationCallbackWrapper.java index a9a2129..8ae535b 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/BlockingIOOperationCallbackWrapper.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/BlockingIOOperationCallbackWrapper.java @@ -22,6 +22,7 @@ import java.util.List; import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponent; +import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallback; import org.apache.hyracks.storage.am.lsm.common.api.LSMOperationType; @@ -48,13 +49,14 @@ public class BlockingIOOperationCallbackWrapper implements ILSMIOOperationCallba } @Override - public void afterOperation(LSMOperationType opType, List<ILSMComponent> oldComponents, ILSMComponent newComponent) - throws HyracksDataException { + public void afterOperation(LSMOperationType opType, List<ILSMComponent> oldComponents, + ILSMDiskComponent newComponent) throws HyracksDataException { wrappedCallback.afterOperation(opType, oldComponents, newComponent); } @Override - public synchronized void afterFinalize(LSMOperationType opType, ILSMComponent newComponent) throws HyracksDataException { + public synchronized void afterFinalize(LSMOperationType opType, ILSMDiskComponent newComponent) + throws HyracksDataException { wrappedCallback.afterFinalize(opType, newComponent); notifyAll(); notified = true; http://git-wip-us.apache.org/repos/asf/asterixdb/blob/d718dc4a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/ConstantMergePolicy.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/ConstantMergePolicy.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/ConstantMergePolicy.java index 7340fdb..b3e1f6f 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/ConstantMergePolicy.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/ConstantMergePolicy.java @@ -25,6 +25,7 @@ import java.util.Map; import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.storage.am.common.api.IndexException; import org.apache.hyracks.storage.am.common.impls.NoOpOperationCallback; +import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent; import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponent; import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponent.ComponentState; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex; @@ -37,19 +38,19 @@ public class ConstantMergePolicy implements ILSMMergePolicy { @Override public void diskComponentAdded(final ILSMIndex index, boolean fullMergeIsRequested) throws HyracksDataException, IndexException { - List<ILSMComponent> immutableComponents = index.getImmutableComponents(); + List<ILSMDiskComponent> immutableComponents = index.getImmutableComponents(); if (!areComponentsMergable(immutableComponents)) { return; } if (fullMergeIsRequested) { - ILSMIndexAccessor accessor = index.createAccessor(NoOpOperationCallback.INSTANCE, - NoOpOperationCallback.INSTANCE); + ILSMIndexAccessor accessor = + index.createAccessor(NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE); accessor.scheduleFullMerge(index.getIOOperationCallback()); } else if (immutableComponents.size() >= numComponents) { - ILSMIndexAccessor accessor = index.createAccessor(NoOpOperationCallback.INSTANCE, - NoOpOperationCallback.INSTANCE); + ILSMIndexAccessor accessor = + index.createAccessor(NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE); accessor.scheduleMerge(index.getIOOperationCallback(), immutableComponents); } } @@ -85,7 +86,7 @@ public class ConstantMergePolicy implements ILSMMergePolicy { * there will be no new merge either in this situation. */ - List<ILSMComponent> immutableComponents = index.getImmutableComponents(); + List<ILSMDiskComponent> immutableComponents = index.getImmutableComponents(); int totalImmutableComponentCount = immutableComponents.size(); // [case 1] @@ -105,8 +106,8 @@ public class ConstantMergePolicy implements ILSMMergePolicy { if (!areComponentsMergable(immutableComponents)) { throw new IllegalStateException(); } - ILSMIndexAccessor accessor = index.createAccessor(NoOpOperationCallback.INSTANCE, - NoOpOperationCallback.INSTANCE); + ILSMIndexAccessor accessor = + index.createAccessor(NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE); accessor.scheduleMerge(index.getIOOperationCallback(), immutableComponents); return true; } @@ -118,7 +119,7 @@ public class ConstantMergePolicy implements ILSMMergePolicy { * @param immutableComponents * @return true if all components are mergable, false otherwise. */ - private boolean areComponentsMergable(List<ILSMComponent> immutableComponents) { + private boolean areComponentsMergable(List<ILSMDiskComponent> immutableComponents) { for (ILSMComponent c : immutableComponents) { if (c.getState() != ComponentState.READABLE_UNWRITABLE) { return false; @@ -133,7 +134,7 @@ public class ConstantMergePolicy implements ILSMMergePolicy { * * @return true if there is an ongoing merge operation, false otherwise. */ - private boolean isMergeOngoing(List<ILSMComponent> immutableComponents) { + private boolean isMergeOngoing(List<ILSMDiskComponent> immutableComponents) { int size = immutableComponents.size(); for (int i = 0; i < size; i++) { if (immutableComponents.get(i).getState() == ComponentState.READABLE_MERGING) { http://git-wip-us.apache.org/repos/asf/asterixdb/blob/d718dc4a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/DiskComponentMetadata.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/DiskComponentMetadata.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/DiskComponentMetadata.java new file mode 100644 index 0000000..2da954d --- /dev/null +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/DiskComponentMetadata.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hyracks.storage.am.lsm.common.impls; + +import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.data.std.api.IPointable; +import org.apache.hyracks.data.std.api.IValueReference; +import org.apache.hyracks.data.std.primitive.VoidPointable; +import org.apache.hyracks.storage.am.common.api.IMetadataPageManager; +import org.apache.hyracks.storage.am.lsm.common.api.IComponentMetadata; + +public class DiskComponentMetadata implements IComponentMetadata { + + private final IMetadataPageManager mdpManager; + + public DiskComponentMetadata(IMetadataPageManager mdpManager) { + this.mdpManager = mdpManager; + } + + @Override + public void put(IValueReference key, IValueReference value) throws HyracksDataException { + mdpManager.put(mdpManager.createMetadataFrame(), key, value); + } + + @Override + public void get(IValueReference key, IPointable value) throws HyracksDataException { + mdpManager.get(mdpManager.createMetadataFrame(), key, value); + } + + @Override + public IValueReference get(IValueReference key) throws HyracksDataException { + IPointable value = VoidPointable.FACTORY.createPointable(); + get(key, value); + return value; + } + + public void put(MemoryComponentMetadata metadata) throws HyracksDataException { + metadata.copy(mdpManager); + } +} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/d718dc4a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/ExternalIndexHarness.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/ExternalIndexHarness.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/ExternalIndexHarness.java index e4be66b..c6346cc 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/ExternalIndexHarness.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/ExternalIndexHarness.java @@ -29,10 +29,11 @@ import org.apache.hyracks.storage.am.common.api.IIndexCursor; import org.apache.hyracks.storage.am.common.api.ISearchPredicate; import org.apache.hyracks.storage.am.common.api.IndexException; import org.apache.hyracks.storage.am.common.ophelpers.IndexOperation; +import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent; import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponent; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallback; -import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexInternal; +import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexOperationContext; import org.apache.hyracks.storage.am.lsm.common.api.ILSMMergePolicy; import org.apache.hyracks.storage.am.lsm.common.api.ILSMOperationTracker; @@ -42,7 +43,7 @@ import org.apache.hyracks.storage.am.lsm.common.api.LSMOperationType; public class ExternalIndexHarness extends LSMHarness { private static final Logger LOGGER = Logger.getLogger(ExternalIndexHarness.class.getName()); - public ExternalIndexHarness(ILSMIndexInternal lsmIndex, ILSMMergePolicy mergePolicy, ILSMOperationTracker opTracker, + public ExternalIndexHarness(ILSMIndex lsmIndex, ILSMMergePolicy mergePolicy, ILSMOperationTracker opTracker, boolean replicationEnabled) { super(lsmIndex, mergePolicy, opTracker, replicationEnabled); } @@ -112,7 +113,7 @@ public class ExternalIndexHarness extends LSMHarness { return true; } - private void exitComponents(ILSMIndexOperationContext ctx, LSMOperationType opType, ILSMComponent newComponent, + private void exitComponents(ILSMIndexOperationContext ctx, LSMOperationType opType, ILSMDiskComponent newComponent, boolean failedOperation) throws HyracksDataException, IndexException { /** * FLUSH and MERGE operations should always exit the components @@ -130,11 +131,11 @@ public class ExternalIndexHarness extends LSMHarness { case INACTIVE: if (replicationEnabled) { componentsToBeReplicated.clear(); - componentsToBeReplicated.add(c); + componentsToBeReplicated.add((ILSMDiskComponent) c); lsmIndex.scheduleReplication(null, componentsToBeReplicated, false, ReplicationOperation.DELETE, opType); } - ((AbstractDiskLSMComponent) c).destroy(); + ((ILSMDiskComponent) c).destroy(); break; default: break; @@ -233,7 +234,7 @@ public class ExternalIndexHarness extends LSMHarness { LOGGER.info("Started a merge operation for index: " + lsmIndex + " ..."); } - ILSMComponent newComponent = null; + ILSMDiskComponent newComponent = null; try { newComponent = lsmIndex.merge(operation); operation.getCallback().afterOperation(LSMOperationType.MERGE, ctx.getComponentHolder(), newComponent); @@ -248,10 +249,10 @@ public class ExternalIndexHarness extends LSMHarness { } @Override - public void addBulkLoadedComponent(ILSMComponent c) throws HyracksDataException, IndexException { + public void addBulkLoadedComponent(ILSMDiskComponent c) throws HyracksDataException, IndexException { lsmIndex.markAsValid(c); synchronized (opTracker) { - lsmIndex.addComponent(c); + lsmIndex.addDiskComponent(c); if (replicationEnabled) { componentsToBeReplicated.clear(); componentsToBeReplicated.add(c); @@ -267,11 +268,11 @@ public class ExternalIndexHarness extends LSMHarness { // 1. this needs synchronization since others might be accessing the index (specifically merge operations that might change the lists of components) // 2. the actions taken by the index itself are different // 3. the component has already been marked valid by the bulk update operation - public void addTransactionComponents(ILSMComponent newComponent) throws HyracksDataException, IndexException { + public void addTransactionComponents(ILSMDiskComponent newComponent) throws HyracksDataException, IndexException { ITwoPCIndex index = (ITwoPCIndex) lsmIndex; synchronized (opTracker) { - List<ILSMComponent> newerList; - List<ILSMComponent> olderList; + List<ILSMDiskComponent> newerList; + List<ILSMDiskComponent> olderList; if (index.getCurrentVersion() == 0) { newerList = index.getFirstComponentList(); olderList = index.getSecondComponentList(); @@ -281,11 +282,11 @@ public class ExternalIndexHarness extends LSMHarness { } // Exit components in old version of the index so they are ready to be // deleted if they are not needed anymore - for (ILSMComponent c : olderList) { + for (ILSMDiskComponent c : olderList) { exitComponent(c); } // Enter components in the newer list - for (ILSMComponent c : newerList) { + for (ILSMDiskComponent c : newerList) { enterComponent(c); } if (newComponent != null) { @@ -320,7 +321,7 @@ public class ExternalIndexHarness extends LSMHarness { if (index.getFirstComponentList().containsAll(mergedComponents)) { // exit un-needed components for (ILSMComponent c : mergedComponents) { - exitComponent(c); + exitComponent((ILSMDiskComponent) c); } // enter new component enterComponent(newComponent); @@ -329,7 +330,7 @@ public class ExternalIndexHarness extends LSMHarness { if (index.getSecondComponentList().containsAll(mergedComponents)) { // exit un-needed components for (ILSMComponent c : mergedComponents) { - exitComponent(c); + exitComponent((ILSMDiskComponent) c); } // enter new component enterComponent(newComponent); @@ -342,7 +343,7 @@ public class ExternalIndexHarness extends LSMHarness { diskComponent.threadEnter(LSMOperationType.SEARCH, false); } - private void exitComponent(ILSMComponent diskComponent) throws HyracksDataException { + private void exitComponent(ILSMDiskComponent diskComponent) throws HyracksDataException { diskComponent.threadExit(LSMOperationType.SEARCH, false, false); if (diskComponent.getState() == ILSMComponent.ComponentState.INACTIVE) { if (replicationEnabled) { @@ -350,7 +351,7 @@ public class ExternalIndexHarness extends LSMHarness { componentsToBeReplicated.add(diskComponent); lsmIndex.scheduleReplication(null, componentsToBeReplicated, false, ReplicationOperation.DELETE, null); } - ((AbstractDiskLSMComponent) diskComponent).destroy(); + diskComponent.destroy(); } } @@ -368,10 +369,10 @@ public class ExternalIndexHarness extends LSMHarness { public void indexClear() throws HyracksDataException { ITwoPCIndex index = (ITwoPCIndex) lsmIndex; - for (ILSMComponent c : index.getFirstComponentList()) { + for (ILSMDiskComponent c : index.getFirstComponentList()) { exitComponent(c); } - for (ILSMComponent c : index.getSecondComponentList()) { + for (ILSMDiskComponent c : index.getSecondComponentList()) { exitComponent(c); } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/d718dc4a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/IndexFactory.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/IndexFactory.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/IndexFactory.java index f6d21d8..1f7d7a3 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/IndexFactory.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/IndexFactory.java @@ -24,7 +24,6 @@ import org.apache.hyracks.api.io.FileReference; import org.apache.hyracks.api.io.IIOManager; import org.apache.hyracks.storage.am.common.api.IIndex; import org.apache.hyracks.storage.am.common.api.IPageManagerFactory; -import org.apache.hyracks.storage.am.common.api.IndexException; import org.apache.hyracks.storage.common.buffercache.IBufferCache; import org.apache.hyracks.storage.common.file.IFileMapProvider; @@ -43,7 +42,7 @@ public abstract class IndexFactory<T extends IIndex> { this.freePageManagerFactory = freePageManagerFactory; } - public abstract T createIndexInstance(FileReference file) throws IndexException, HyracksDataException; + public abstract T createIndexInstance(FileReference file) throws HyracksDataException; public IBufferCache getBufferCache() { return bufferCache; http://git-wip-us.apache.org/repos/asf/asterixdb/blob/d718dc4a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMComponentFilterFactory.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMComponentFilterFactory.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMComponentFilterFactory.java index b69ecf2..46a629a 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMComponentFilterFactory.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMComponentFilterFactory.java @@ -35,7 +35,7 @@ public class LSMComponentFilterFactory implements ILSMComponentFilterFactory { } @Override - public ILSMComponentFilter createLSMComponentFilter() { + public ILSMComponentFilter createFilter() { return new LSMComponentFilter(tupleWriterFactory.createTupleWriter(), filterCmpFactories); } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/d718dc4a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMComponentFilterManager.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMComponentFilterManager.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMComponentFilterManager.java index a8b4c70..7f8e990 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMComponentFilterManager.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMComponentFilterManager.java @@ -42,7 +42,7 @@ public class LSMComponentFilterManager implements ILSMComponentFilterManager { } @Override - public void updateFilterInfo(ILSMComponentFilter filter, List<ITupleReference> filterTuples) + public void updateFilter(ILSMComponentFilter filter, List<ITupleReference> filterTuples) throws HyracksDataException { MultiComparator filterCmp = MultiComparator.create(filter.getFilterCmpFactories()); for (ITupleReference tuple : filterTuples) { @@ -51,7 +51,7 @@ public class LSMComponentFilterManager implements ILSMComponentFilterManager { } @Override - public void writeFilterInfo(ILSMComponentFilter filter, ITreeIndex treeIndex) throws HyracksDataException { + public void writeFilter(ILSMComponentFilter filter, ITreeIndex treeIndex) throws HyracksDataException { IMetadataPageManager treeMetaManager = (IMetadataPageManager) treeIndex.getPageManager(); ILSMComponentFilterReference filterFrame = filterFrameFactory.createFrame(); try { @@ -67,7 +67,7 @@ public class LSMComponentFilterManager implements ILSMComponentFilterManager { } @Override - public boolean readFilterInfo(ILSMComponentFilter filter, ITreeIndex treeIndex) throws HyracksDataException { + public boolean readFilter(ILSMComponentFilter filter, ITreeIndex treeIndex) throws HyracksDataException { IMetadataPageManager treeMetaManager = (IMetadataPageManager) treeIndex.getPageManager(); ILSMComponentFilterReference filterFrame = filterFrameFactory.createFrame(); treeMetaManager.get(treeMetaManager.createMetadataFrame(), FILTER_KEY, filterFrame); @@ -78,7 +78,7 @@ public class LSMComponentFilterManager implements ILSMComponentFilterManager { List<ITupleReference> filterTuples = new ArrayList<>(); filterTuples.add(filterFrame.getMinTuple()); filterTuples.add(filterFrame.getMaxTuple()); - updateFilterInfo(filter, filterTuples); + updateFilter(filter, filterTuples); return true; } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/d718dc4a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java index a6868f0..6bf9312 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java @@ -34,14 +34,15 @@ import org.apache.hyracks.storage.am.common.api.ISearchPredicate; import org.apache.hyracks.storage.am.common.api.IndexException; import org.apache.hyracks.storage.am.common.impls.NoOpOperationCallback; import org.apache.hyracks.storage.am.common.ophelpers.IndexOperation; +import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent; import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponent; import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponent.ComponentState; import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponent.LSMComponentType; import org.apache.hyracks.storage.am.lsm.common.api.ILSMHarness; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallback; -import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexAccessorInternal; -import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexInternal; +import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex; +import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexOperationContext; import org.apache.hyracks.storage.am.lsm.common.api.ILSMMergePolicy; import org.apache.hyracks.storage.am.lsm.common.api.ILSMOperationTracker; @@ -50,14 +51,14 @@ import org.apache.hyracks.storage.am.lsm.common.api.LSMOperationType; public class LSMHarness implements ILSMHarness { private static final Logger LOGGER = Logger.getLogger(LSMHarness.class.getName()); - protected final ILSMIndexInternal lsmIndex; + protected final ILSMIndex lsmIndex; protected final ILSMMergePolicy mergePolicy; protected final ILSMOperationTracker opTracker; protected final AtomicBoolean fullMergeIsRequested; protected final boolean replicationEnabled; - protected List<ILSMComponent> componentsToBeReplicated; + protected List<ILSMDiskComponent> componentsToBeReplicated; - public LSMHarness(ILSMIndexInternal lsmIndex, ILSMMergePolicy mergePolicy, ILSMOperationTracker opTracker, + public LSMHarness(ILSMIndex lsmIndex, ILSMMergePolicy mergePolicy, ILSMOperationTracker opTracker, boolean replicationEnabled) { this.lsmIndex = lsmIndex; this.opTracker = opTracker; @@ -80,7 +81,7 @@ public class LSMHarness implements ILSMHarness { switch (opType) { case FLUSH: ILSMComponent flushingComponent = ctx.getComponentHolder().get(0); - if (!((AbstractMemoryLSMComponent) flushingComponent).isModified()) { + if (!((AbstractLSMMemoryComponent) flushingComponent).isModified()) { //The mutable component has not been modified by any writer. There is nothing to flush. //since the component is empty, set its state back to READABLE_WRITABLE if (((AbstractLSMIndex) lsmIndex) @@ -90,7 +91,7 @@ public class LSMHarness implements ILSMHarness { } return false; } - if (((AbstractMemoryLSMComponent) flushingComponent).getWriterCount() > 0) { + if (((AbstractLSMMemoryComponent) flushingComponent).getWriterCount() > 0) { /* * This case is a case where even though FLUSH log was flushed to disk and scheduleFlush is triggered, * the current in-memory component (whose state was changed to READABLE_WRITABLE (RW) @@ -172,11 +173,7 @@ public class LSMHarness implements ILSMHarness { lsmIndex.getIOOperationCallback().beforeOperation(LSMOperationType.FLUSH); // Changing the flush status should *always* precede changing the mutable component. lsmIndex.changeFlushStatusForCurrentMutableCompoent(false); - // Flushing! => carry over the marker lsn to the next component - long mostRecentMarkerLSN = - ((AbstractLSMIndex) lsmIndex).getCurrentMemoryComponent().getMostRecentMarkerLSN(); lsmIndex.changeMutableComponent(); - ((AbstractLSMIndex) lsmIndex).getCurrentMemoryComponent().setMostRecentMarkerLSN(mostRecentMarkerLSN); // Notify all waiting threads whenever a flush has been scheduled since they will check // again if they can grab and enter the mutable component. opTracker.notifyAll(); @@ -190,7 +187,7 @@ public class LSMHarness implements ILSMHarness { return true; } - private void exitComponents(ILSMIndexOperationContext ctx, LSMOperationType opType, ILSMComponent newComponent, + private void exitComponents(ILSMIndexOperationContext ctx, LSMOperationType opType, ILSMDiskComponent newComponent, boolean failedOperation) throws HyracksDataException, IndexException { /** * FLUSH and MERGE operations should always exit the components @@ -199,8 +196,8 @@ public class LSMHarness implements ILSMHarness { if (!ctx.isAccessingComponents() && opType != LSMOperationType.FLUSH && opType != LSMOperationType.MERGE) { return; } - List<ILSMComponent> inactiveDiskComponents = null; - List<ILSMComponent> inactiveDiskComponentsToBeDeleted = null; + List<ILSMDiskComponent> inactiveDiskComponents = null; + List<ILSMDiskComponent> inactiveDiskComponentsToBeDeleted = null; try { synchronized (opTracker) { try { @@ -237,7 +234,7 @@ public class LSMHarness implements ILSMHarness { } break; case INACTIVE: - ((AbstractMemoryLSMComponent) c).reset(); + ((AbstractLSMMemoryComponent) c).reset(); // Notify all waiting threads whenever the mutable component's has change to inactive. This is important because // even though we switched the mutable components, it is possible that the component that we just switched // to is still busy flushing its data to disk. Thus, the notification that was issued upon scheduling the flush @@ -250,7 +247,7 @@ public class LSMHarness implements ILSMHarness { } else { switch (c.getState()) { case INACTIVE: - lsmIndex.addInactiveDiskComponent(c); + lsmIndex.addInactiveDiskComponent((AbstractLSMDiskComponent) c); break; default: break; @@ -264,7 +261,7 @@ public class LSMHarness implements ILSMHarness { case FLUSH: // newComponent is null if the flush op. was not performed. if (newComponent != null) { - lsmIndex.addComponent(newComponent); + lsmIndex.addDiskComponent(newComponent); if (replicationEnabled) { componentsToBeReplicated.clear(); componentsToBeReplicated.add(newComponent); @@ -311,8 +308,8 @@ public class LSMHarness implements ILSMHarness { */ inactiveDiskComponents = lsmIndex.getInactiveDiskComponents(); if (!inactiveDiskComponents.isEmpty()) { - for (ILSMComponent inactiveComp : inactiveDiskComponents) { - if (((AbstractDiskLSMComponent) inactiveComp).getFileReferenceCount() == 1) { + for (ILSMDiskComponent inactiveComp : inactiveDiskComponents) { + if (inactiveComp.getFileReferenceCount() == 1) { if (inactiveDiskComponentsToBeDeleted == null) { inactiveDiskComponentsToBeDeleted = new LinkedList<>(); } @@ -338,7 +335,7 @@ public class LSMHarness implements ILSMHarness { } for (ILSMComponent c : inactiveDiskComponentsToBeDeleted) { - ((AbstractDiskLSMComponent) c).destroy(); + ((AbstractLSMDiskComponent) c).destroy(); } } catch (Throwable e) { e.printStackTrace(); @@ -375,8 +372,8 @@ public class LSMHarness implements ILSMHarness { try { lsmIndex.modify(ctx, tuple); // The mutable component is always in the first index. - AbstractMemoryLSMComponent mutableComponent = (AbstractMemoryLSMComponent) ctx.getComponentHolder().get(0); - mutableComponent.setIsModified(); + AbstractLSMMemoryComponent mutableComponent = (AbstractLSMMemoryComponent) ctx.getComponentHolder().get(0); + mutableComponent.setModified(); } catch (Exception e) { failedOperation = true; throw e; @@ -429,7 +426,7 @@ public class LSMHarness implements ILSMHarness { LOGGER.info("Started a flush operation for index: " + lsmIndex + " ..."); } - ILSMComponent newComponent = null; + ILSMDiskComponent newComponent = null; try { newComponent = lsmIndex.flush(operation); operation.getCallback().afterOperation(LSMOperationType.FLUSH, null, newComponent); @@ -477,7 +474,7 @@ public class LSMHarness implements ILSMHarness { LOGGER.info("Started a merge operation for index: " + lsmIndex + " ..."); } - ILSMComponent newComponent = null; + ILSMDiskComponent newComponent = null; try { newComponent = lsmIndex.merge(operation); operation.getCallback().afterOperation(LSMOperationType.MERGE, ctx.getComponentHolder(), newComponent); @@ -495,10 +492,10 @@ public class LSMHarness implements ILSMHarness { } @Override - public void addBulkLoadedComponent(ILSMComponent c) throws HyracksDataException, IndexException { + public void addBulkLoadedComponent(ILSMDiskComponent c) throws HyracksDataException, IndexException { lsmIndex.markAsValid(c); synchronized (opTracker) { - lsmIndex.addComponent(c); + lsmIndex.addDiskComponent(c); if (replicationEnabled) { componentsToBeReplicated.clear(); componentsToBeReplicated.add(c); @@ -513,16 +510,16 @@ public class LSMHarness implements ILSMHarness { return opTracker; } - protected void triggerReplication(List<ILSMComponent> lsmComponents, boolean bulkload, LSMOperationType opType) + protected void triggerReplication(List<ILSMDiskComponent> lsmComponents, boolean bulkload, LSMOperationType opType) throws HyracksDataException { - ILSMIndexAccessorInternal accessor = + ILSMIndexAccessor accessor = lsmIndex.createAccessor(NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE); accessor.scheduleReplication(lsmComponents, bulkload, opType); } @Override - public void scheduleReplication(ILSMIndexOperationContext ctx, List<ILSMComponent> lsmComponents, boolean bulkload, - LSMOperationType opType) throws HyracksDataException { + public void scheduleReplication(ILSMIndexOperationContext ctx, List<ILSMDiskComponent> lsmComponents, + boolean bulkload, LSMOperationType opType) throws HyracksDataException { //enter the LSM components to be replicated to prevent them from being deleted until they are replicated if (!getAndEnterComponents(ctx, LSMOperationType.REPLICATE, false)) { http://git-wip-us.apache.org/repos/asf/asterixdb/blob/d718dc4a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMTreeIndexAccessor.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMTreeIndexAccessor.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMTreeIndexAccessor.java index 92155e4..4199cfb 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMTreeIndexAccessor.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMTreeIndexAccessor.java @@ -27,15 +27,15 @@ import org.apache.hyracks.storage.am.common.api.IIndexCursor; import org.apache.hyracks.storage.am.common.api.ISearchPredicate; import org.apache.hyracks.storage.am.common.api.IndexException; import org.apache.hyracks.storage.am.common.ophelpers.IndexOperation; -import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponent; +import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent; import org.apache.hyracks.storage.am.lsm.common.api.ILSMHarness; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallback; -import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexAccessorInternal; +import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexOperationContext; import org.apache.hyracks.storage.am.lsm.common.api.LSMOperationType; -public abstract class LSMTreeIndexAccessor implements ILSMIndexAccessorInternal { +public abstract class LSMTreeIndexAccessor implements ILSMIndexAccessor { protected ILSMHarness lsmHarness; protected ILSMIndexOperationContext ctx; @@ -124,7 +124,7 @@ public abstract class LSMTreeIndexAccessor implements ILSMIndexAccessorInternal } @Override - public void scheduleMerge(ILSMIOOperationCallback callback, List<ILSMComponent> components) + public void scheduleMerge(ILSMIOOperationCallback callback, List<ILSMDiskComponent> components) throws HyracksDataException, IndexException { ctx.setOperation(IndexOperation.MERGE); ctx.getComponentsToBeMerged().clear(); @@ -133,7 +133,7 @@ public abstract class LSMTreeIndexAccessor implements ILSMIndexAccessorInternal } @Override - public void scheduleReplication(List<ILSMComponent> lsmComponents, boolean bulkload, LSMOperationType opType) + public void scheduleReplication(List<ILSMDiskComponent> lsmComponents, boolean bulkload, LSMOperationType opType) throws HyracksDataException { ctx.setOperation(IndexOperation.REPLICATE); ctx.getComponentsToBeReplicated().clear(); http://git-wip-us.apache.org/repos/asf/asterixdb/blob/d718dc4a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/MemoryComponentMetadata.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/MemoryComponentMetadata.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/MemoryComponentMetadata.java new file mode 100644 index 0000000..dcc9355 --- /dev/null +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/MemoryComponentMetadata.java @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hyracks.storage.am.lsm.common.impls; + +import java.util.ArrayList; +import java.util.List; + +import org.apache.commons.lang3.tuple.Pair; +import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.data.std.api.IPointable; +import org.apache.hyracks.data.std.api.IValueReference; +import org.apache.hyracks.data.std.util.ArrayBackedValueStorage; +import org.apache.hyracks.storage.am.common.api.IMetadataPageManager; +import org.apache.hyracks.storage.am.common.api.ITreeIndexMetadataFrame; +import org.apache.hyracks.storage.am.lsm.common.api.IComponentMetadata; + +public class MemoryComponentMetadata implements IComponentMetadata { + private static final byte[] empty = new byte[0]; + private final List<org.apache.commons.lang3.tuple.Pair<IValueReference, ArrayBackedValueStorage>> store = + new ArrayList<>(); + + /** + * Note: for memory metadata, it is expected that the key will be constant + */ + @Override + public void put(IValueReference key, IValueReference value) { + ArrayBackedValueStorage stored = get(key); + if (stored == null) { + stored = new ArrayBackedValueStorage(); + } + stored.assign(value); + store.add(Pair.of(key, stored)); + } + + /** + * Note: for memory metadata, it is expected that the key will be constant + */ + @Override + public void get(IValueReference key, IPointable value) { + value.set(empty, 0, 0); + ArrayBackedValueStorage stored = get(key); + if (stored != null) { + value.set(stored); + } + } + + @Override + public ArrayBackedValueStorage get(IValueReference key) { + for (Pair<IValueReference, ArrayBackedValueStorage> pair : store) { + if (pair.getKey().equals(key)) { + return pair.getValue(); + } + } + return null; + } + + public void copy(IMetadataPageManager mdpManager) throws HyracksDataException { + ITreeIndexMetadataFrame frame = mdpManager.createMetadataFrame(); + for (Pair<IValueReference, ArrayBackedValueStorage> pair : store) { + mdpManager.put(frame, pair.getKey(), pair.getValue()); + } + } + + public void copy(DiskComponentMetadata metadata) throws HyracksDataException { + metadata.put(this); + } + + public void reset() { + store.clear(); + } +} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/d718dc4a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/NoOpIOOperationCallback.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/NoOpIOOperationCallback.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/NoOpIOOperationCallback.java index 0713d7d..dc69092 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/NoOpIOOperationCallback.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/NoOpIOOperationCallback.java @@ -22,13 +22,15 @@ import java.util.List; import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponent; +import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallback; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallbackFactory; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallbackProvider; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex; import org.apache.hyracks.storage.am.lsm.common.api.LSMOperationType; -public enum NoOpIOOperationCallback implements ILSMIOOperationCallback, ILSMIOOperationCallbackProvider, ILSMIOOperationCallbackFactory { +public enum NoOpIOOperationCallback + implements ILSMIOOperationCallback, ILSMIOOperationCallbackProvider, ILSMIOOperationCallbackFactory { INSTANCE; @Override @@ -37,13 +39,13 @@ public enum NoOpIOOperationCallback implements ILSMIOOperationCallback, ILSMIOOp } @Override - public void afterOperation(LSMOperationType opType, List<ILSMComponent> oldComponents, ILSMComponent newComponent) - throws HyracksDataException { + public void afterOperation(LSMOperationType opType, List<ILSMComponent> oldComponents, + ILSMDiskComponent newComponent) throws HyracksDataException { // Do nothing. } @Override - public void afterFinalize(LSMOperationType opType, ILSMComponent newComponent) throws HyracksDataException { + public void afterFinalize(LSMOperationType opType, ILSMDiskComponent newComponent) throws HyracksDataException { // Do nothing. } @@ -53,7 +55,7 @@ public enum NoOpIOOperationCallback implements ILSMIOOperationCallback, ILSMIOOp } @Override - public ILSMIOOperationCallback createIOOperationCallback() { + public ILSMIOOperationCallback createIoOpCallback() { return INSTANCE; }
