[ASTERIXDB-2115] Add Component Ids to LSM Indexes - user model changes: no - storage format changes: no - interface changes: yes
Details: - Add LSMComponentId to all LSM components. Component Ids are managed through IO operation callbacks. - For memory component, it's ID is reset every time it's recycled. - For disk component, it's ID is copied from the source component(s) during flush/merge - For indexes of a dataset, we need to guarantee all their memory components should recieve the same ID. This is achieved using a shared component Id generator. - Fix memory component recycled callback, make sure it's called only when we've indeed recycled the memory component A design wiki for this patch: https://cwiki.apache.org/confluence/display/ ASTERIXDB/Component+Id-based+secondary-to-primary+index+acceleration Change-Id: I8aec6261a84a0729ce35f4b1cb708be299ddb98d Reviewed-on: https://asterix-gerrit.ics.uci.edu/2025 Sonar-Qube: Jenkins <[email protected]> Integration-Tests: Jenkins <[email protected]> Tested-by: Jenkins <[email protected]> Reviewed-by: abdullah alamoudi <[email protected]> Contrib: Jenkins <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/asterixdb/repo Commit: http://git-wip-us.apache.org/repos/asf/asterixdb/commit/39390edc Tree: http://git-wip-us.apache.org/repos/asf/asterixdb/tree/39390edc Diff: http://git-wip-us.apache.org/repos/asf/asterixdb/diff/39390edc Branch: refs/heads/master Commit: 39390edc9d9a6a95fd312acf63fee9801c17a98b Parents: 23761dd Author: luochen01 <[email protected]> Authored: Thu Nov 9 21:14:06 2017 -0800 Committer: Luo Chen <[email protected]> Committed: Fri Nov 10 08:57:45 2017 -0800 ---------------------------------------------------------------------- .../asterix/app/translator/QueryTranslator.java | 7 +- .../test/dataflow/ComponentRollbackTest.java | 11 +- .../asterix/test/dataflow/TestDataset.java | 3 +- .../TestLsmBtreeIoOpCallbackFactory.java | 20 ++-- .../common/api/IDatasetLifecycleManager.java | 9 ++ .../context/CorrelatedPrefixMergePolicy.java | 32 +++--- .../DatasetLSMComponentIdGeneratorFactory.java | 50 +++++++++ .../common/context/DatasetLifecycleManager.java | 16 ++- .../asterix/common/context/DatasetResource.java | 10 +- .../context/PrimaryIndexOperationTracker.java | 7 +- .../AbstractLSMIOOperationCallback.java | 65 ++++------- ...tractLSMIndexIOOperationCallbackFactory.java | 48 ++++++++ .../LSMBTreeIOOperationCallback.java | 5 +- .../LSMBTreeIOOperationCallbackFactory.java | 11 +- .../LSMBTreeWithBuddyIOOperationCallback.java | 5 +- ...TreeWithBuddyIOOperationCallbackFactory.java | 12 +- .../LSMInvertedIndexIOOperationCallback.java | 5 +- ...InvertedIndexIOOperationCallbackFactory.java | 12 +- .../LSMRTreeIOOperationCallback.java | 5 +- .../LSMRTreeIOOperationCallbackFactory.java | 11 +- .../CorrelatedPrefixMergePolicyTest.java | 90 ++++++++------- .../LSMBTreeIOOperationCallbackTest.java | 7 +- ...SMBTreeWithBuddyIOOperationCallbackTest.java | 7 +- ...LSMInvertedIndexIOOperationCallbackTest.java | 7 +- .../LSMRTreeIOOperationCallbackTest.java | 7 +- .../metadata/bootstrap/MetadataBootstrap.java | 7 +- .../metadata/declared/MetadataProvider.java | 28 +++-- .../asterix/metadata/entities/Dataset.java | 14 ++- .../utils/SecondaryIndexOperationsHelper.java | 20 ++-- .../LSMIndexBulkLoadOperatorDescriptor.java | 64 +++++++++++ .../LSMIndexBulkLoadOperatorNodePushable.java | 111 +++++++++++++++++++ .../LSMSecondaryIndexBulkLoadNodePushable.java | 56 ++-------- .../IndexBulkLoadOperatorNodePushable.java | 6 +- .../dataflow/ExternalBTreeLocalResource.java | 1 + .../ExternalBTreeWithBuddyLocalResource.java | 1 + .../btree/dataflow/LSMBTreeLocalResource.java | 1 + .../lsm/btree/impls/LSMBTreeDiskComponent.java | 3 +- .../am/lsm/common/api/ILSMComponent.java | 7 ++ .../am/lsm/common/api/ILSMComponentId.java | 41 +++++++ .../common/api/ILSMComponentIdGenerator.java | 37 +++++++ .../api/ILSMComponentIdGeneratorFactory.java | 29 +++++ .../am/lsm/common/api/ILSMDiskComponent.java | 8 -- .../am/lsm/common/api/ILSMDiskComponentId.java | 51 --------- .../api/ILSMIOOperationCallbackFactory.java | 10 +- .../am/lsm/common/api/ILSMMemoryComponent.java | 8 ++ .../common/impls/AbstractLSMDiskComponent.java | 36 ++++-- .../am/lsm/common/impls/AbstractLSMIndex.java | 22 ++++ .../impls/AbstractLSMMemoryComponent.java | 20 ++++ .../am/lsm/common/impls/EmptyComponent.java | 6 +- .../am/lsm/common/impls/LSMComponentId.java | 107 ++++++++++++++++++ .../common/impls/LSMComponentIdGenerator.java | 66 +++++++++++ .../impls/LSMComponentIdGeneratorFactory.java | 37 +++++++ .../am/lsm/common/impls/LSMDiskComponentId.java | 73 ------------ .../storage/am/lsm/common/impls/LSMHarness.java | 15 ++- .../impls/LSMIndexDiskComponentBulkLoader.java | 11 +- .../impls/NoOpIOOperationCallbackFactory.java | 6 + .../am/lsm/common/util/LSMComponentIdUtils.java | 62 +++++++++++ .../dataflow/LSMInvertedIndexLocalResource.java | 1 + .../dataflow/ExternalRTreeLocalResource.java | 1 + .../rtree/dataflow/LSMRTreeLocalResource.java | 1 + .../LSMRTreeWithAntiMatterLocalResource.java | 1 + .../impls/LSMRTreeWithAntiMatterTuples.java | 4 +- .../btree/impl/TestLsmBtreeLocalResource.java | 1 + 63 files changed, 1050 insertions(+), 385 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/asterixdb/blob/39390edc/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java index d6457f2..494eb65 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java @@ -993,8 +993,11 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen // #. create the index artifact in NC. runJob(hcc, spec, jobFlags); - // #. flush the internal dataset for correlated policy - if (ds.isCorrelated() && ds.getDatasetType() == DatasetType.INTERNAL) { + // #. flush the internal dataset + // We need this to guarantee the correctness of component Id acceleration for secondary-to-primary index. + // Otherwise, the new secondary index component would corresponding to a partial memory component + // of the primary index, which is incorrect. + if (ds.getDatasetType() == DatasetType.INTERNAL) { FlushDatasetUtil.flushDataset(hcc, metadataProvider, index.getDataverseName(), index.getDatasetName()); } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/39390edc/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/ComponentRollbackTest.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/ComponentRollbackTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/ComponentRollbackTest.java index 6bff50d..00b185d 100644 --- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/ComponentRollbackTest.java +++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/ComponentRollbackTest.java @@ -197,6 +197,7 @@ public class ComponentRollbackTest { Assert.assertTrue(memComponents.get(lsmBtree.getCurrentMemoryComponentIndex()).isModified()); searchAndAssertCount(nc, ctx, dataset, storageManager, TOTAL_NUM_OF_RECORDS); ILSMIndexAccessor lsmAccessor = lsmBtree.createAccessor(NoOpIndexAccessParameters.INSTANCE); + dsLifecycleMgr.getComponentIdGenerator(DATASET_ID).refresh(); // rollback a memory component lsmAccessor.deleteComponents(memoryComponentsPredicate); searchAndAssertCount(nc, ctx, dataset, storageManager, TOTAL_NUM_OF_RECORDS - RECORDS_PER_COMPONENT); @@ -204,6 +205,7 @@ public class ComponentRollbackTest { lsmAccessor = lsmBtree.createAccessor(NoOpIndexAccessParameters.INSTANCE); long lsn = AbstractLSMIOOperationCallback.getTreeIndexLSN(diskComponents.get(0).getMetadata()); DiskComponentLsnPredicate pred = new DiskComponentLsnPredicate(lsn); + dsLifecycleMgr.getComponentIdGenerator(DATASET_ID).refresh(); lsmAccessor.deleteComponents(pred); searchAndAssertCount(nc, ctx, dataset, storageManager, TOTAL_NUM_OF_RECORDS - (2 * RECORDS_PER_COMPONENT)); } catch (Throwable e) { @@ -246,6 +248,7 @@ public class ComponentRollbackTest { Assert.assertTrue(memComponents.get(lsmBtree.getCurrentMemoryComponentIndex()).isModified()); searchAndAssertCount(nc, ctx, dataset, storageManager, TOTAL_NUM_OF_RECORDS); ILSMIndexAccessor lsmAccessor = lsmBtree.createAccessor(NoOpIndexAccessParameters.INSTANCE); + dsLifecycleMgr.getComponentIdGenerator(DATASET_ID).refresh(); // rollback a memory component lsmAccessor.deleteComponents(memoryComponentsPredicate); searchAndAssertCount(nc, ctx, dataset, storageManager, TOTAL_NUM_OF_RECORDS - RECORDS_PER_COMPONENT); @@ -270,6 +273,7 @@ public class ComponentRollbackTest { lsmAccessor = lsmBtree.createAccessor(NoOpIndexAccessParameters.INSTANCE); long lsn = AbstractLSMIOOperationCallback.getTreeIndexLSN(diskComponents.get(0).getMetadata()); DiskComponentLsnPredicate pred = new DiskComponentLsnPredicate(lsn); + dsLifecycleMgr.getComponentIdGenerator(DATASET_ID).refresh(); lsmAccessor.deleteComponents(pred); searchAndAssertCount(nc, ctx, dataset, storageManager, TOTAL_NUM_OF_RECORDS - (2 * RECORDS_PER_COMPONENT)); } catch (Throwable e) { @@ -316,6 +320,7 @@ public class ComponentRollbackTest { firstSearcher.waitUntilEntered(); // now that we enetered, we will rollback ILSMIndexAccessor lsmAccessor = lsmBtree.createAccessor(NoOpIndexAccessParameters.INSTANCE); + dsLifecycleMgr.getComponentIdGenerator(DATASET_ID).refresh(); // rollback a memory component lsmAccessor.deleteComponents( c -> (c instanceof ILSMMemoryComponent && ((ILSMMemoryComponent) c).isModified())); @@ -335,6 +340,7 @@ public class ComponentRollbackTest { lsmAccessor = lsmBtree.createAccessor(NoOpIndexAccessParameters.INSTANCE); long lsn = AbstractLSMIOOperationCallback.getTreeIndexLSN(diskComponents.get(0).getMetadata()); DiskComponentLsnPredicate pred = new DiskComponentLsnPredicate(lsn); + dsLifecycleMgr.getComponentIdGenerator(DATASET_ID).refresh(); lsmAccessor.deleteComponents(pred); // now that the rollback has completed, we will unblock the search lsmBtree.addSearchCallback(sem -> sem.release()); @@ -731,7 +737,7 @@ public class ComponentRollbackTest { } private class Rollerback { - private Thread task; + private final Thread task; private Exception failure; public Rollerback(TestLsmBtree lsmBtree, Predicate<ILSMComponent> predicate) { @@ -740,6 +746,7 @@ public class ComponentRollbackTest { @Override public void run() { ILSMIndexAccessor lsmAccessor = lsmBtree.createAccessor(NoOpIndexAccessParameters.INSTANCE); + dsLifecycleMgr.getComponentIdGenerator(DATASET_ID).refresh(); try { lsmAccessor.deleteComponents(predicate); } catch (HyracksDataException e) { @@ -760,7 +767,7 @@ public class ComponentRollbackTest { } private class Searcher { - private ExecutorService executor = Executors.newSingleThreadExecutor(); + private final ExecutorService executor = Executors.newSingleThreadExecutor(); private Future<Boolean> task; private volatile boolean entered = false; http://git-wip-us.apache.org/repos/asf/asterixdb/blob/39390edc/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestDataset.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestDataset.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestDataset.java index 893b428..e0502de 100644 --- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestDataset.java +++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestDataset.java @@ -21,6 +21,7 @@ package org.apache.asterix.test.dataflow; import java.util.Map; import org.apache.asterix.common.config.DatasetConfig.DatasetType; +import org.apache.asterix.common.context.DatasetLSMComponentIdGeneratorFactory; import org.apache.asterix.metadata.IDatasetDetails; import org.apache.asterix.metadata.declared.MetadataProvider; import org.apache.asterix.metadata.entities.Dataset; @@ -62,6 +63,6 @@ public class TestDataset extends Dataset { @Override public ILSMIOOperationCallbackFactory getIoOperationCallbackFactory(Index index) throws AlgebricksException { - return TestLsmBtreeIoOpCallbackFactory.INSTANCE; + return new TestLsmBtreeIoOpCallbackFactory(new DatasetLSMComponentIdGeneratorFactory(getDatasetId())); } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/39390edc/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestLsmBtreeIoOpCallbackFactory.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestLsmBtreeIoOpCallbackFactory.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestLsmBtreeIoOpCallbackFactory.java index 142bcc5..fa37c20 100644 --- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestLsmBtreeIoOpCallbackFactory.java +++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestLsmBtreeIoOpCallbackFactory.java @@ -18,19 +18,20 @@ */ package org.apache.asterix.test.dataflow; +import org.apache.asterix.common.ioopcallbacks.AbstractLSMIndexIOOperationCallbackFactory; import org.apache.asterix.common.ioopcallbacks.LSMBTreeIOOperationCallback; +import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentIdGenerator; +import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentIdGeneratorFactory; import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation.LSMIOOperationType; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallback; -import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallbackFactory; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex; import org.apache.hyracks.storage.am.lsm.common.impls.EmptyComponent; -public class TestLsmBtreeIoOpCallbackFactory implements ILSMIOOperationCallbackFactory { +public class TestLsmBtreeIoOpCallbackFactory extends AbstractLSMIndexIOOperationCallbackFactory { private static final long serialVersionUID = 1L; - public static TestLsmBtreeIoOpCallbackFactory INSTANCE = new TestLsmBtreeIoOpCallbackFactory(); private static volatile int completedFlushes = 0; private static volatile int completedMerges = 0; private static volatile int rollbackFlushes = 0; @@ -38,7 +39,8 @@ public class TestLsmBtreeIoOpCallbackFactory implements ILSMIOOperationCallbackF private static volatile int failedFlushes = 0; private static volatile int failedMerges = 0; - private TestLsmBtreeIoOpCallbackFactory() { + public TestLsmBtreeIoOpCallbackFactory(ILSMComponentIdGeneratorFactory idGeneratorFactory) { + super(idGeneratorFactory); } @Override @@ -50,7 +52,7 @@ public class TestLsmBtreeIoOpCallbackFactory implements ILSMIOOperationCallbackF // Whenever this is called, it resets the counter // However, the counters for the failed operations are never reset since we expect them // To be always 0 - return new TestLsmBtreeIoOpCallback(index); + return new TestLsmBtreeIoOpCallback(index, getComponentIdGenerator()); } public int getTotalFlushes() { @@ -90,14 +92,14 @@ public class TestLsmBtreeIoOpCallbackFactory implements ILSMIOOperationCallbackF } public class TestLsmBtreeIoOpCallback extends LSMBTreeIOOperationCallback { - public TestLsmBtreeIoOpCallback(ILSMIndex index) { - super(index); + public TestLsmBtreeIoOpCallback(ILSMIndex index, ILSMComponentIdGenerator idGenerator) { + super(index, idGenerator); } @Override public void afterFinalize(LSMIOOperationType opType, ILSMDiskComponent newComponent) { super.afterFinalize(opType, newComponent); - synchronized (INSTANCE) { + synchronized (TestLsmBtreeIoOpCallbackFactory.this) { if (newComponent != null) { if (newComponent == EmptyComponent.INSTANCE) { if (opType == LSMIOOperationType.FLUSH) { @@ -115,7 +117,7 @@ public class TestLsmBtreeIoOpCallbackFactory implements ILSMIOOperationCallbackF } else { recordFailure(opType); } - INSTANCE.notifyAll(); + TestLsmBtreeIoOpCallbackFactory.this.notifyAll(); } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/39390edc/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IDatasetLifecycleManager.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IDatasetLifecycleManager.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IDatasetLifecycleManager.java index a32d4dc..41c5ade 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IDatasetLifecycleManager.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IDatasetLifecycleManager.java @@ -25,6 +25,7 @@ import org.apache.asterix.common.context.IndexInfo; import org.apache.asterix.common.context.PrimaryIndexOperationTracker; import org.apache.asterix.common.replication.IReplicationStrategy; import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentIdGenerator; import org.apache.hyracks.storage.am.lsm.common.api.IVirtualBufferCache; import org.apache.hyracks.storage.common.IIndex; import org.apache.hyracks.storage.common.IResourceLifecycleManager; @@ -79,6 +80,14 @@ public interface IDatasetLifecycleManager extends IResourceLifecycleManager<IInd PrimaryIndexOperationTracker getOperationTracker(int datasetId); /** + * creates (if necessary) and returns the component Id generator of a dataset. + * + * @param datasetId + * @return + */ + ILSMComponentIdGenerator getComponentIdGenerator(int datasetId); + + /** * creates (if necessary) and returns the dataset virtual buffer caches. * * @param datasetId http://git-wip-us.apache.org/repos/asf/asterixdb/blob/39390edc/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/CorrelatedPrefixMergePolicy.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/CorrelatedPrefixMergePolicy.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/CorrelatedPrefixMergePolicy.java index b877cfb..e5fc998 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/CorrelatedPrefixMergePolicy.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/CorrelatedPrefixMergePolicy.java @@ -29,11 +29,14 @@ import org.apache.asterix.common.api.IDatasetLifecycleManager; import org.apache.commons.lang3.tuple.Pair; import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.storage.am.common.impls.NoOpIndexAccessParameters; +import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponent; +import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentId; +import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentId.IdCompareResult; import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent; -import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponentId; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor; import org.apache.hyracks.storage.am.lsm.common.impls.PrefixMergePolicy; +import org.apache.hyracks.storage.am.lsm.common.util.LSMComponentIdUtils; public class CorrelatedPrefixMergePolicy extends PrefixMergePolicy { @@ -86,27 +89,26 @@ public class CorrelatedPrefixMergePolicy extends PrefixMergePolicy { //nothing to merge return false; } - long minID = immutableComponents.get(mergeableIndexes.getLeft()).getComponentId().getMinId(); - long maxID = immutableComponents.get(mergeableIndexes.getRight()).getComponentId().getMaxId(); - + ILSMComponent leftComponent = immutableComponents.get(mergeableIndexes.getLeft()); + ILSMComponent rightComponent = immutableComponents.get(mergeableIndexes.getRight()); + ILSMComponentId targetId = LSMComponentIdUtils.union(leftComponent.getId(), rightComponent.getId()); Set<IndexInfo> indexInfos = datasetLifecycleManager.getDatasetInfo(datasetId).getDatsetIndexInfos(); int partition = getIndexPartition(index, indexInfos); - triggerScheduledMerge(minID, maxID, + triggerScheduledMerge(targetId, indexInfos.stream().filter(info -> info.getPartition() == partition).collect(Collectors.toSet())); return true; } /** - * Submit merge requests for all disk components within [minID, maxID] + * Submit merge requests for all disk components within the range specified by targetId * of all indexes of a given dataset in the given partition * - * @param minID - * @param maxID - * @param partition + * @param targetId * @param indexInfos * @throws HyracksDataException */ - private void triggerScheduledMerge(long minID, long maxID, Set<IndexInfo> indexInfos) throws HyracksDataException { + private void triggerScheduledMerge(ILSMComponentId targetId, Set<IndexInfo> indexInfos) + throws HyracksDataException { for (IndexInfo info : indexInfos) { ILSMIndex lsmIndex = info.getIndex(); @@ -116,13 +118,13 @@ public class CorrelatedPrefixMergePolicy extends PrefixMergePolicy { } List<ILSMDiskComponent> mergableComponents = new ArrayList<>(); for (ILSMDiskComponent component : immutableComponents) { - ILSMDiskComponentId id = component.getComponentId(); - if (id.getMinId() >= minID && id.getMaxId() <= maxID) { + ILSMComponentId id = component.getId(); + IdCompareResult cmp = targetId.compareTo(id); + if (cmp == IdCompareResult.INCLUDE) { mergableComponents.add(component); - } - if (id.getMaxId() < minID) { + } else if (cmp == IdCompareResult.GREATER_THAN) { //disk components are ordered from latest (with largest IDs) to oldest (with smallest IDs) - //if the component.maxID < minID, we can safely skip the rest disk components in the list + // if targetId>component.Id, we can safely skip the rest disk components in the list break; } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/39390edc/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLSMComponentIdGeneratorFactory.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLSMComponentIdGeneratorFactory.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLSMComponentIdGeneratorFactory.java new file mode 100644 index 0000000..7b8397c --- /dev/null +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLSMComponentIdGeneratorFactory.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.asterix.common.context; + +import org.apache.asterix.common.api.IDatasetLifecycleManager; +import org.apache.asterix.common.api.INcApplicationContext; +import org.apache.hyracks.api.application.INCServiceContext; +import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentIdGenerator; +import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentIdGeneratorFactory; + +/** + * This factory implementation is used by AsterixDB layer so that indexes of a dataset (/partition) + * use the same Id generator. This guarantees their memory components would receive the same Id upon + * activation. + * + */ +public class DatasetLSMComponentIdGeneratorFactory implements ILSMComponentIdGeneratorFactory { + private static final long serialVersionUID = 1L; + + private final int datasetId; + + public DatasetLSMComponentIdGeneratorFactory(int datasetId) { + this.datasetId = datasetId; + } + + @Override + public ILSMComponentIdGenerator getComponentIdGenerator(INCServiceContext serviceCtx) { + IDatasetLifecycleManager dslcManager = + ((INcApplicationContext) serviceCtx.getApplicationContext()).getDatasetLifecycleManager(); + return dslcManager.getComponentIdGenerator(datasetId); + } + +} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/39390edc/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java index 6282509..1e99fb5 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java @@ -43,11 +43,13 @@ import org.apache.hyracks.api.exceptions.ErrorCode; import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.api.lifecycle.ILifeCycleComponent; import org.apache.hyracks.storage.am.common.impls.NoOpIndexAccessParameters; +import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentIdGenerator; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor; import org.apache.hyracks.storage.am.lsm.common.api.ILSMOperationTracker; import org.apache.hyracks.storage.am.lsm.common.api.IVirtualBufferCache; import org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMIndex; +import org.apache.hyracks.storage.am.lsm.common.impls.LSMComponentIdGenerator; import org.apache.hyracks.storage.common.IIndex; import org.apache.hyracks.storage.common.ILocalResourceRepository; import org.apache.hyracks.storage.common.LocalResource; @@ -233,10 +235,12 @@ public class DatasetLifecycleManager implements IDatasetLifecycleManager, ILifeC dsr = datasets.get(did); if (dsr == null) { DatasetInfo dsInfo = new DatasetInfo(did); - PrimaryIndexOperationTracker opTracker = new PrimaryIndexOperationTracker(did, logManager, dsInfo); + ILSMComponentIdGenerator idGenerator = new LSMComponentIdGenerator(); + PrimaryIndexOperationTracker opTracker = + new PrimaryIndexOperationTracker(did, logManager, dsInfo, idGenerator); DatasetVirtualBufferCaches vbcs = new DatasetVirtualBufferCaches(did, storageProperties, memoryManager.getNumPages(did), numPartitions); - dsr = new DatasetResource(dsInfo, opTracker, vbcs); + dsr = new DatasetResource(dsInfo, opTracker, vbcs, idGenerator); datasets.put(did, dsr); } return dsr; @@ -319,6 +323,11 @@ public class DatasetLifecycleManager implements IDatasetLifecycleManager, ILifeC return datasets.get(datasetId).getOpTracker(); } + @Override + public ILSMComponentIdGenerator getComponentIdGenerator(int datasetId) { + return datasets.get(datasetId).getIdGenerator(); + } + private void validateDatasetLifecycleManagerState() throws HyracksDataException { if (stopped) { throw new HyracksDataException(DatasetLifecycleManager.class.getSimpleName() + " was stopped."); @@ -404,6 +413,9 @@ public class DatasetLifecycleManager implements IDatasetLifecycleManager, ILifeC } } + ILSMComponentIdGenerator idGenerator = getComponentIdGenerator(dsInfo.getDatasetID()); + idGenerator.refresh(); + if (asyncFlush) { for (IndexInfo iInfo : dsInfo.getIndexes().values()) { ILSMIndexAccessor accessor = iInfo.getIndex().createAccessor(NoOpIndexAccessParameters.INSTANCE); http://git-wip-us.apache.org/repos/asf/asterixdb/blob/39390edc/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetResource.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetResource.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetResource.java index 79ae1da..f6e2b0d 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetResource.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetResource.java @@ -23,6 +23,7 @@ import java.util.Map; import org.apache.asterix.common.dataflow.DatasetLocalResource; import org.apache.asterix.common.metadata.MetadataIndexImmutableProperties; import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentIdGenerator; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex; import org.apache.hyracks.storage.common.LocalResource; @@ -42,12 +43,15 @@ public class DatasetResource implements Comparable<DatasetResource> { private final DatasetInfo datasetInfo; private final PrimaryIndexOperationTracker datasetPrimaryOpTracker; private final DatasetVirtualBufferCaches datasetVirtualBufferCaches; + private final ILSMComponentIdGenerator datasetComponentIdGenerator; public DatasetResource(DatasetInfo datasetInfo, PrimaryIndexOperationTracker datasetPrimaryOpTracker, - DatasetVirtualBufferCaches datasetVirtualBufferCaches) { + DatasetVirtualBufferCaches datasetVirtualBufferCaches, + ILSMComponentIdGenerator datasetComponentIdGenerator) { this.datasetInfo = datasetInfo; this.datasetPrimaryOpTracker = datasetPrimaryOpTracker; this.datasetVirtualBufferCaches = datasetVirtualBufferCaches; + this.datasetComponentIdGenerator = datasetComponentIdGenerator; } public boolean isRegistered() { @@ -116,6 +120,10 @@ public class DatasetResource implements Comparable<DatasetResource> { return datasetPrimaryOpTracker; } + public ILSMComponentIdGenerator getIdGenerator() { + return datasetComponentIdGenerator; + } + @Override public int compareTo(DatasetResource o) { return datasetInfo.compareTo(o.datasetInfo); http://git-wip-us.apache.org/repos/asf/asterixdb/blob/39390edc/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java index 01e33a7..6f35a3d 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java @@ -32,6 +32,7 @@ import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.storage.am.common.impls.NoOpIndexAccessParameters; import org.apache.hyracks.storage.am.common.impls.NoOpOperationCallback; import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponent.ComponentState; +import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentIdGenerator; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor; import org.apache.hyracks.storage.am.lsm.common.api.ILSMMemoryComponent; @@ -45,13 +46,16 @@ public class PrimaryIndexOperationTracker extends BaseOperationTracker { // Number of active operations on an ILSMIndex instance. private final AtomicInteger numActiveOperations; private final ILogManager logManager; + private final ILSMComponentIdGenerator idGenerator; private boolean flushOnExit = false; private boolean flushLogCreated = false; - public PrimaryIndexOperationTracker(int datasetID, ILogManager logManager, DatasetInfo dsInfo) { + public PrimaryIndexOperationTracker(int datasetID, ILogManager logManager, DatasetInfo dsInfo, + ILSMComponentIdGenerator idGenerator) { super(datasetID, dsInfo); this.logManager = logManager; this.numActiveOperations = new AtomicInteger(); + this.idGenerator = idGenerator; } @Override @@ -142,6 +146,7 @@ public class PrimaryIndexOperationTracker extends BaseOperationTracker { //This method is called sequentially by LogPage.notifyFlushTerminator in the sequence flushes were scheduled. public synchronized void triggerScheduleFlush(LogRecord logRecord) throws HyracksDataException { + idGenerator.refresh(); for (ILSMIndex lsmIndex : dsInfo.getDatasetIndexes()) { //get resource ILSMIndexAccessor accessor = lsmIndex.createAccessor(NoOpIndexAccessParameters.INSTANCE); http://git-wip-us.apache.org/repos/asf/asterixdb/blob/39390edc/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/AbstractLSMIOOperationCallback.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/AbstractLSMIOOperationCallback.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/AbstractLSMIOOperationCallback.java index 68f42e7..e445fe4 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/AbstractLSMIOOperationCallback.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/AbstractLSMIOOperationCallback.java @@ -20,27 +20,25 @@ package org.apache.asterix.common.ioopcallbacks; import java.util.List; -import java.util.logging.Level; -import java.util.logging.Logger; import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.data.std.primitive.LongPointable; import org.apache.hyracks.storage.am.common.api.IMetadataPageManager; import org.apache.hyracks.storage.am.common.freepage.MutableArrayValueReference; import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponent; +import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentId; +import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentIdGenerator; import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent; -import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponentId; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation.LSMIOOperationType; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallback; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex; import org.apache.hyracks.storage.am.lsm.common.api.ILSMMemoryComponent; import org.apache.hyracks.storage.am.lsm.common.impls.DiskComponentMetadata; -import org.apache.hyracks.storage.am.lsm.common.impls.LSMDiskComponentId; import org.apache.hyracks.storage.am.lsm.common.util.ComponentUtils; +import org.apache.hyracks.storage.am.lsm.common.util.LSMComponentIdUtils; // A single LSMIOOperationCallback per LSM index used to perform actions around Flush and Merge operations public abstract class AbstractLSMIOOperationCallback implements ILSMIOOperationCallback { - private static final Logger LOGGER = Logger.getLogger(AbstractLSMIOOperationCallback.class.getName()); public static final MutableArrayValueReference LSN_KEY = new MutableArrayValueReference("LSN".getBytes()); public static final long INVALID = -1L; @@ -56,8 +54,11 @@ public abstract class AbstractLSMIOOperationCallback implements ILSMIOOperationC // Index of the currently being written to component protected int writeIndex; - public AbstractLSMIOOperationCallback(ILSMIndex lsmIndex) { + protected final ILSMComponentIdGenerator idGenerator; + + public AbstractLSMIOOperationCallback(ILSMIndex lsmIndex, ILSMComponentIdGenerator idGenerator) { this.lsmIndex = lsmIndex; + this.idGenerator = idGenerator; int count = lsmIndex.getNumberOfAllMemoryComponents(); mutableLastLSNs = new long[count]; firstLSNs = new long[count]; @@ -114,40 +115,22 @@ public abstract class AbstractLSMIOOperationCallback implements ILSMIOOperationC return pointable.getLength() == 0 ? INVALID : pointable.longValue(); } - private ILSMDiskComponentId getComponentId(List<ILSMComponent> oldComponents) throws HyracksDataException { - if (oldComponents == null) { - //if oldComponents == null, then getComponentLSN would treat it as a flush operation, - //and return the LSN for the flushed component - long id = getComponentLSN(null); - if (id == 0) { - LOGGER.log(Level.WARNING, "Flushing a memory component without setting the LSN"); - id = ILSMDiskComponentId.NOT_FOUND; - } - return new LSMDiskComponentId(id, id); - } else { - long minId = Long.MAX_VALUE; - long maxId = Long.MIN_VALUE; - for (ILSMComponent oldComponent : oldComponents) { - ILSMDiskComponentId oldComponentId = ((ILSMDiskComponent) oldComponent).getComponentId(); - if (oldComponentId.getMinId() < minId) { - minId = oldComponentId.getMinId(); - } - if (oldComponentId.getMaxId() > maxId) { - maxId = oldComponentId.getMaxId(); - } - } - return new LSMDiskComponentId(minId, maxId); + private ILSMComponentId getMergedComponentId(List<ILSMComponent> mergedComponents) throws HyracksDataException { + if (mergedComponents == null || mergedComponents.isEmpty()) { + return null; } + return LSMComponentIdUtils.union(mergedComponents.get(0).getId(), + mergedComponents.get(mergedComponents.size() - 1).getId()); + } - private void putComponentIdIntoMetadata(ILSMDiskComponent component, List<ILSMComponent> oldComponents) - throws HyracksDataException { - DiskComponentMetadata metadata = component.getMetadata(); - ILSMDiskComponentId componentId = getComponentId(oldComponents); - metadata.put(ILSMDiskComponentId.COMPONENT_ID_MIN_KEY, - LongPointable.FACTORY.createPointable(componentId.getMinId())); - metadata.put(ILSMDiskComponentId.COMPONENT_ID_MAX_KEY, - LongPointable.FACTORY.createPointable(componentId.getMaxId())); + private void putComponentIdIntoMetadata(LSMIOOperationType opType, ILSMDiskComponent newComponent, + List<ILSMComponent> oldComponents) throws HyracksDataException { + // the id of flushed component is set when we copy the metadata of the memory component + if (opType == LSMIOOperationType.MERGE) { + ILSMComponentId componentId = getMergedComponentId(oldComponents); + LSMComponentIdUtils.persist(componentId, newComponent.getMetadata()); + } } public synchronized void updateLastLSN(long lastLSN) { @@ -188,7 +171,7 @@ public abstract class AbstractLSMIOOperationCallback implements ILSMIOOperationC //TODO: Copying Filters and all content of the metadata pages for flush operation should be done here if (newComponent != null) { putLSNIntoMetadata(newComponent, oldComponents); - putComponentIdIntoMetadata(newComponent, oldComponents); + putComponentIdIntoMetadata(opType, newComponent, oldComponents); if (opType == LSMIOOperationType.MERGE) { // In case of merge, oldComponents are never null LongPointable markerLsn = @@ -196,7 +179,6 @@ public abstract class AbstractLSMIOOperationCallback implements ILSMIOOperationC ComponentUtils.MARKER_LSN_KEY, ComponentUtils.NOT_FOUND)); newComponent.getMetadata().put(ComponentUtils.MARKER_LSN_KEY, markerLsn); } - } } @@ -220,12 +202,12 @@ public abstract class AbstractLSMIOOperationCallback implements ILSMIOOperationC @Override public void recycled(ILSMMemoryComponent component) throws HyracksDataException { - // No op + component.resetId(idGenerator.getId()); } @Override public void allocated(ILSMMemoryComponent component) throws HyracksDataException { - // No op + component.resetId(idGenerator.getId()); } /** @@ -237,4 +219,5 @@ public abstract class AbstractLSMIOOperationCallback implements ILSMIOOperationC */ public abstract long getComponentFileLSNOffset(ILSMDiskComponent component, String componentFilePath) throws HyracksDataException; + } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/39390edc/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/AbstractLSMIndexIOOperationCallbackFactory.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/AbstractLSMIndexIOOperationCallbackFactory.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/AbstractLSMIndexIOOperationCallbackFactory.java new file mode 100644 index 0000000..16447fd --- /dev/null +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/AbstractLSMIndexIOOperationCallbackFactory.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.asterix.common.ioopcallbacks; + +import org.apache.hyracks.api.application.INCServiceContext; +import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentIdGenerator; +import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentIdGeneratorFactory; +import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallbackFactory; + +public abstract class AbstractLSMIndexIOOperationCallbackFactory implements ILSMIOOperationCallbackFactory { + + private static final long serialVersionUID = 1L; + + protected final ILSMComponentIdGeneratorFactory idGeneratorFactory; + + protected transient INCServiceContext ncCtx; + + public AbstractLSMIndexIOOperationCallbackFactory(ILSMComponentIdGeneratorFactory idGeneratorFactory) { + this.idGeneratorFactory = idGeneratorFactory; + } + + @Override + public void initialize(INCServiceContext ncCtx) { + this.ncCtx = ncCtx; + } + + protected ILSMComponentIdGenerator getComponentIdGenerator() { + assert ncCtx != null; + return idGeneratorFactory.getComponentIdGenerator(ncCtx); + } +} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/39390edc/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMBTreeIOOperationCallback.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMBTreeIOOperationCallback.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMBTreeIOOperationCallback.java index c7fbb65..c1ee03b 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMBTreeIOOperationCallback.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMBTreeIOOperationCallback.java @@ -23,13 +23,14 @@ import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.storage.am.btree.impls.BTree; import org.apache.hyracks.storage.am.common.api.IMetadataPageManager; import org.apache.hyracks.storage.am.lsm.btree.impls.LSMBTreeFileManager; +import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentIdGenerator; import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex; public class LSMBTreeIOOperationCallback extends AbstractLSMIOOperationCallback { - public LSMBTreeIOOperationCallback(ILSMIndex index) { - super(index); + public LSMBTreeIOOperationCallback(ILSMIndex index, ILSMComponentIdGenerator idGenerator) { + super(index, idGenerator); } @Override http://git-wip-us.apache.org/repos/asf/asterixdb/blob/39390edc/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMBTreeIOOperationCallbackFactory.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMBTreeIOOperationCallbackFactory.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMBTreeIOOperationCallbackFactory.java index e3abb6b..4ef12ef 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMBTreeIOOperationCallbackFactory.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMBTreeIOOperationCallbackFactory.java @@ -19,21 +19,20 @@ package org.apache.asterix.common.ioopcallbacks; +import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentIdGeneratorFactory; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallback; -import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallbackFactory; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex; -public class LSMBTreeIOOperationCallbackFactory implements ILSMIOOperationCallbackFactory { +public class LSMBTreeIOOperationCallbackFactory extends AbstractLSMIndexIOOperationCallbackFactory { private static final long serialVersionUID = 1L; - public static LSMBTreeIOOperationCallbackFactory INSTANCE = new LSMBTreeIOOperationCallbackFactory(); - - private LSMBTreeIOOperationCallbackFactory() { + public LSMBTreeIOOperationCallbackFactory(ILSMComponentIdGeneratorFactory idGeneratorFactory) { + super(idGeneratorFactory); } @Override public ILSMIOOperationCallback createIoOpCallback(ILSMIndex index) { - return new LSMBTreeIOOperationCallback(index); + return new LSMBTreeIOOperationCallback(index, getComponentIdGenerator()); } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/39390edc/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMBTreeWithBuddyIOOperationCallback.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMBTreeWithBuddyIOOperationCallback.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMBTreeWithBuddyIOOperationCallback.java index 67d623a..b43fb2f 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMBTreeWithBuddyIOOperationCallback.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMBTreeWithBuddyIOOperationCallback.java @@ -22,13 +22,14 @@ import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.storage.am.btree.impls.BTree; import org.apache.hyracks.storage.am.common.api.IMetadataPageManager; import org.apache.hyracks.storage.am.lsm.btree.impls.LSMBTreeWithBuddyFileManager; +import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentIdGenerator; import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex; public class LSMBTreeWithBuddyIOOperationCallback extends AbstractLSMIOOperationCallback { - public LSMBTreeWithBuddyIOOperationCallback(ILSMIndex lsmIndex) { - super(lsmIndex); + public LSMBTreeWithBuddyIOOperationCallback(ILSMIndex lsmIndex, ILSMComponentIdGenerator idGenerator) { + super(lsmIndex, idGenerator); } @Override http://git-wip-us.apache.org/repos/asf/asterixdb/blob/39390edc/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMBTreeWithBuddyIOOperationCallbackFactory.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMBTreeWithBuddyIOOperationCallbackFactory.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMBTreeWithBuddyIOOperationCallbackFactory.java index 93f505c..6727bf6 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMBTreeWithBuddyIOOperationCallbackFactory.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMBTreeWithBuddyIOOperationCallbackFactory.java @@ -18,22 +18,20 @@ */ package org.apache.asterix.common.ioopcallbacks; +import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentIdGeneratorFactory; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallback; -import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallbackFactory; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex; -public class LSMBTreeWithBuddyIOOperationCallbackFactory implements ILSMIOOperationCallbackFactory { +public class LSMBTreeWithBuddyIOOperationCallbackFactory extends AbstractLSMIndexIOOperationCallbackFactory { private static final long serialVersionUID = 1L; - public static final LSMBTreeWithBuddyIOOperationCallbackFactory INSTANCE = - new LSMBTreeWithBuddyIOOperationCallbackFactory(); - - private LSMBTreeWithBuddyIOOperationCallbackFactory() { + public LSMBTreeWithBuddyIOOperationCallbackFactory(ILSMComponentIdGeneratorFactory idGeneratorFactory) { + super(idGeneratorFactory); } @Override public ILSMIOOperationCallback createIoOpCallback(ILSMIndex index) { - return new LSMBTreeWithBuddyIOOperationCallback(index); + return new LSMBTreeWithBuddyIOOperationCallback(index, getComponentIdGenerator()); } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/39390edc/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMInvertedIndexIOOperationCallback.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMInvertedIndexIOOperationCallback.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMInvertedIndexIOOperationCallback.java index 2d27b78..015cd38 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMInvertedIndexIOOperationCallback.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMInvertedIndexIOOperationCallback.java @@ -21,6 +21,7 @@ package org.apache.asterix.common.ioopcallbacks; import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.storage.am.common.api.IMetadataPageManager; +import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentIdGenerator; import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex; import org.apache.hyracks.storage.am.lsm.invertedindex.impls.LSMInvertedIndexDiskComponent; @@ -28,8 +29,8 @@ import org.apache.hyracks.storage.am.lsm.invertedindex.impls.LSMInvertedIndexFil public class LSMInvertedIndexIOOperationCallback extends AbstractLSMIOOperationCallback { - public LSMInvertedIndexIOOperationCallback(ILSMIndex index) { - super(index); + public LSMInvertedIndexIOOperationCallback(ILSMIndex index, ILSMComponentIdGenerator idGenerator) { + super(index, idGenerator); } @Override http://git-wip-us.apache.org/repos/asf/asterixdb/blob/39390edc/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMInvertedIndexIOOperationCallbackFactory.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMInvertedIndexIOOperationCallbackFactory.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMInvertedIndexIOOperationCallbackFactory.java index 47a67b2..a2712d1 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMInvertedIndexIOOperationCallbackFactory.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMInvertedIndexIOOperationCallbackFactory.java @@ -19,22 +19,20 @@ package org.apache.asterix.common.ioopcallbacks; +import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentIdGeneratorFactory; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallback; -import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallbackFactory; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex; -public class LSMInvertedIndexIOOperationCallbackFactory implements ILSMIOOperationCallbackFactory { +public class LSMInvertedIndexIOOperationCallbackFactory extends AbstractLSMIndexIOOperationCallbackFactory { private static final long serialVersionUID = 1L; - public static final LSMInvertedIndexIOOperationCallbackFactory INSTANCE = - new LSMInvertedIndexIOOperationCallbackFactory(); - - private LSMInvertedIndexIOOperationCallbackFactory() { + public LSMInvertedIndexIOOperationCallbackFactory(ILSMComponentIdGeneratorFactory idGeneratorFactory) { + super(idGeneratorFactory); } @Override public ILSMIOOperationCallback createIoOpCallback(ILSMIndex index) { - return new LSMInvertedIndexIOOperationCallback(index); + return new LSMInvertedIndexIOOperationCallback(index, getComponentIdGenerator()); } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/39390edc/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMRTreeIOOperationCallback.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMRTreeIOOperationCallback.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMRTreeIOOperationCallback.java index 9ba99f9..bc79074 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMRTreeIOOperationCallback.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMRTreeIOOperationCallback.java @@ -21,6 +21,7 @@ package org.apache.asterix.common.ioopcallbacks; import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.storage.am.common.api.IMetadataPageManager; +import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentIdGenerator; import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex; import org.apache.hyracks.storage.am.lsm.rtree.impls.LSMRTreeFileManager; @@ -28,8 +29,8 @@ import org.apache.hyracks.storage.am.rtree.impls.RTree; public class LSMRTreeIOOperationCallback extends AbstractLSMIOOperationCallback { - public LSMRTreeIOOperationCallback(ILSMIndex index) { - super(index); + public LSMRTreeIOOperationCallback(ILSMIndex index, ILSMComponentIdGenerator idGenerator) { + super(index, idGenerator); } @Override http://git-wip-us.apache.org/repos/asf/asterixdb/blob/39390edc/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMRTreeIOOperationCallbackFactory.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMRTreeIOOperationCallbackFactory.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMRTreeIOOperationCallbackFactory.java index 14cf648..087aaae 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMRTreeIOOperationCallbackFactory.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMRTreeIOOperationCallbackFactory.java @@ -19,21 +19,20 @@ package org.apache.asterix.common.ioopcallbacks; +import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentIdGeneratorFactory; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallback; -import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallbackFactory; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex; -public class LSMRTreeIOOperationCallbackFactory implements ILSMIOOperationCallbackFactory { +public class LSMRTreeIOOperationCallbackFactory extends AbstractLSMIndexIOOperationCallbackFactory { private static final long serialVersionUID = 1L; - public static final LSMRTreeIOOperationCallbackFactory INSTANCE = new LSMRTreeIOOperationCallbackFactory(); - - private LSMRTreeIOOperationCallbackFactory() { + public LSMRTreeIOOperationCallbackFactory(ILSMComponentIdGeneratorFactory idGeneratorFactory) { + super(idGeneratorFactory); } @Override public ILSMIOOperationCallback createIoOpCallback(ILSMIndex index) { - return new LSMRTreeIOOperationCallback(index); + return new LSMRTreeIOOperationCallback(index, getComponentIdGenerator()); } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/39390edc/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/context/CorrelatedPrefixMergePolicyTest.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/context/CorrelatedPrefixMergePolicyTest.java b/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/context/CorrelatedPrefixMergePolicyTest.java index 01f08db..2928d90 100644 --- a/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/context/CorrelatedPrefixMergePolicyTest.java +++ b/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/context/CorrelatedPrefixMergePolicyTest.java @@ -33,13 +33,13 @@ import org.apache.asterix.common.context.DatasetInfo; import org.apache.asterix.common.context.IndexInfo; import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponent.ComponentState; +import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentId; import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent; -import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponentId; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallback; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor; import org.apache.hyracks.storage.am.lsm.common.api.ILSMMergePolicy; -import org.apache.hyracks.storage.am.lsm.common.impls.LSMDiskComponentId; +import org.apache.hyracks.storage.am.lsm.common.impls.LSMComponentId; import org.apache.hyracks.storage.common.IIndexAccessParameters; import org.junit.Assert; import org.junit.Test; @@ -62,14 +62,13 @@ public class CorrelatedPrefixMergePolicyTest extends TestCase { @Test public void testBasic() { try { - List<ILSMDiskComponentId> componentIDs = - Arrays.asList(new LSMDiskComponentId(5, 5), new LSMDiskComponentId(4, 4), - new LSMDiskComponentId(3, 3), new LSMDiskComponentId(2, 2), new LSMDiskComponentId(1, 1)); + List<ILSMComponentId> componentIDs = Arrays.asList(new LSMComponentId(5, 5), new LSMComponentId(4, 4), + new LSMComponentId(3, 3), new LSMComponentId(2, 2), new LSMComponentId(1, 1)); - List<ILSMDiskComponentId> resultPrimaryIDs = new ArrayList<>(); + List<ILSMComponentId> resultPrimaryIDs = new ArrayList<>(); IndexInfo primary = mockIndex(true, componentIDs, resultPrimaryIDs, 0); - List<ILSMDiskComponentId> resultSecondaryIDs = new ArrayList<>(); + List<ILSMComponentId> resultSecondaryIDs = new ArrayList<>(); IndexInfo secondary = mockIndex(false, componentIDs, resultSecondaryIDs, 0); ILSMMergePolicy policy = mockMergePolicy(primary, secondary); @@ -79,10 +78,10 @@ public class CorrelatedPrefixMergePolicyTest extends TestCase { policy.diskComponentAdded(primary.getIndex(), false); - Assert.assertEquals(Arrays.asList(new LSMDiskComponentId(4, 4), new LSMDiskComponentId(3, 3), - new LSMDiskComponentId(2, 2), new LSMDiskComponentId(1, 1)), resultPrimaryIDs); - Assert.assertEquals(Arrays.asList(new LSMDiskComponentId(4, 4), new LSMDiskComponentId(3, 3), - new LSMDiskComponentId(2, 2), new LSMDiskComponentId(1, 1)), resultSecondaryIDs); + Assert.assertEquals(Arrays.asList(new LSMComponentId(4, 4), new LSMComponentId(3, 3), + new LSMComponentId(2, 2), new LSMComponentId(1, 1)), resultPrimaryIDs); + Assert.assertEquals(Arrays.asList(new LSMComponentId(4, 4), new LSMComponentId(3, 3), + new LSMComponentId(2, 2), new LSMComponentId(1, 1)), resultSecondaryIDs); } catch (HyracksDataException e) { Assert.fail(e.getMessage()); @@ -93,14 +92,13 @@ public class CorrelatedPrefixMergePolicyTest extends TestCase { @Test public void testIDIntervals() { try { - List<ILSMDiskComponentId> componentIDs = Arrays.asList(new LSMDiskComponentId(40, 50), - new LSMDiskComponentId(30, 35), new LSMDiskComponentId(25, 29), new LSMDiskComponentId(20, 24), - new LSMDiskComponentId(10, 19)); + List<ILSMComponentId> componentIDs = Arrays.asList(new LSMComponentId(40, 50), new LSMComponentId(30, 35), + new LSMComponentId(25, 29), new LSMComponentId(20, 24), new LSMComponentId(10, 19)); - List<ILSMDiskComponentId> resultPrimaryIDs = new ArrayList<>(); + List<ILSMComponentId> resultPrimaryIDs = new ArrayList<>(); IndexInfo primary = mockIndex(true, componentIDs, resultPrimaryIDs, 0); - List<ILSMDiskComponentId> resultSecondaryIDs = new ArrayList<>(); + List<ILSMComponentId> resultSecondaryIDs = new ArrayList<>(); IndexInfo secondary = mockIndex(false, componentIDs, resultSecondaryIDs, 0); ILSMMergePolicy policy = mockMergePolicy(primary, secondary); @@ -110,10 +108,10 @@ public class CorrelatedPrefixMergePolicyTest extends TestCase { policy.diskComponentAdded(primary.getIndex(), false); - Assert.assertEquals(Arrays.asList(new LSMDiskComponentId(30, 35), new LSMDiskComponentId(25, 29), - new LSMDiskComponentId(20, 24), new LSMDiskComponentId(10, 19)), resultPrimaryIDs); - Assert.assertEquals(Arrays.asList(new LSMDiskComponentId(30, 35), new LSMDiskComponentId(25, 29), - new LSMDiskComponentId(20, 24), new LSMDiskComponentId(10, 19)), resultSecondaryIDs); + Assert.assertEquals(Arrays.asList(new LSMComponentId(30, 35), new LSMComponentId(25, 29), + new LSMComponentId(20, 24), new LSMComponentId(10, 19)), resultPrimaryIDs); + Assert.assertEquals(Arrays.asList(new LSMComponentId(30, 35), new LSMComponentId(25, 29), + new LSMComponentId(20, 24), new LSMComponentId(10, 19)), resultSecondaryIDs); } catch (HyracksDataException e) { Assert.fail(e.getMessage()); @@ -123,15 +121,15 @@ public class CorrelatedPrefixMergePolicyTest extends TestCase { @Test public void testSecondaryMissing() { try { - List<ILSMDiskComponentId> primaryComponentIDs = Arrays.asList(new LSMDiskComponentId(40, 50), - new LSMDiskComponentId(30, 35), new LSMDiskComponentId(25, 29), new LSMDiskComponentId(20, 24), - new LSMDiskComponentId(10, 19)); - List<ILSMDiskComponentId> resultPrimaryIDs = new ArrayList<>(); + List<ILSMComponentId> primaryComponentIDs = + Arrays.asList(new LSMComponentId(40, 50), new LSMComponentId(30, 35), new LSMComponentId(25, 29), + new LSMComponentId(20, 24), new LSMComponentId(10, 19)); + List<ILSMComponentId> resultPrimaryIDs = new ArrayList<>(); IndexInfo primary = mockIndex(true, primaryComponentIDs, resultPrimaryIDs, 0); - List<ILSMDiskComponentId> secondaryComponentIDs = Arrays.asList(new LSMDiskComponentId(30, 35), - new LSMDiskComponentId(25, 29), new LSMDiskComponentId(20, 24)); - List<ILSMDiskComponentId> resultSecondaryIDs = new ArrayList<>(); + List<ILSMComponentId> secondaryComponentIDs = + Arrays.asList(new LSMComponentId(30, 35), new LSMComponentId(25, 29), new LSMComponentId(20, 24)); + List<ILSMComponentId> resultSecondaryIDs = new ArrayList<>(); IndexInfo secondary = mockIndex(false, secondaryComponentIDs, resultSecondaryIDs, 0); ILSMMergePolicy policy = mockMergePolicy(primary, secondary); @@ -141,10 +139,11 @@ public class CorrelatedPrefixMergePolicyTest extends TestCase { Assert.assertTrue(resultSecondaryIDs.isEmpty()); policy.diskComponentAdded(primary.getIndex(), false); - Assert.assertEquals(Arrays.asList(new LSMDiskComponentId(30, 35), new LSMDiskComponentId(25, 29), - new LSMDiskComponentId(20, 24), new LSMDiskComponentId(10, 19)), resultPrimaryIDs); - Assert.assertEquals(Arrays.asList(new LSMDiskComponentId(30, 35), new LSMDiskComponentId(25, 29), - new LSMDiskComponentId(20, 24)), resultSecondaryIDs); + Assert.assertEquals(Arrays.asList(new LSMComponentId(30, 35), new LSMComponentId(25, 29), + new LSMComponentId(20, 24), new LSMComponentId(10, 19)), resultPrimaryIDs); + Assert.assertEquals( + Arrays.asList(new LSMComponentId(30, 35), new LSMComponentId(25, 29), new LSMComponentId(20, 24)), + resultSecondaryIDs); } catch (HyracksDataException e) { Assert.fail(e.getMessage()); @@ -154,17 +153,16 @@ public class CorrelatedPrefixMergePolicyTest extends TestCase { @Test public void testMultiPartition() { try { - List<ILSMDiskComponentId> componentIDs = Arrays.asList(new LSMDiskComponentId(40, 50), - new LSMDiskComponentId(30, 35), new LSMDiskComponentId(25, 29), new LSMDiskComponentId(20, 24), - new LSMDiskComponentId(10, 19)); + List<ILSMComponentId> componentIDs = Arrays.asList(new LSMComponentId(40, 50), new LSMComponentId(30, 35), + new LSMComponentId(25, 29), new LSMComponentId(20, 24), new LSMComponentId(10, 19)); - List<ILSMDiskComponentId> resultPrimaryIDs = new ArrayList<>(); + List<ILSMComponentId> resultPrimaryIDs = new ArrayList<>(); IndexInfo primary = mockIndex(true, componentIDs, resultPrimaryIDs, 0); - List<ILSMDiskComponentId> resultSecondaryIDs = new ArrayList<>(); + List<ILSMComponentId> resultSecondaryIDs = new ArrayList<>(); IndexInfo secondary = mockIndex(false, componentIDs, resultSecondaryIDs, 0); - List<ILSMDiskComponentId> resultSecondaryIDs1 = new ArrayList<>(); + List<ILSMComponentId> resultSecondaryIDs1 = new ArrayList<>(); IndexInfo secondary1 = mockIndex(false, componentIDs, resultSecondaryIDs, 1); ILSMMergePolicy policy = mockMergePolicy(primary, secondary, secondary1); @@ -174,10 +172,10 @@ public class CorrelatedPrefixMergePolicyTest extends TestCase { policy.diskComponentAdded(primary.getIndex(), false); - Assert.assertEquals(Arrays.asList(new LSMDiskComponentId(30, 35), new LSMDiskComponentId(25, 29), - new LSMDiskComponentId(20, 24), new LSMDiskComponentId(10, 19)), resultPrimaryIDs); - Assert.assertEquals(Arrays.asList(new LSMDiskComponentId(30, 35), new LSMDiskComponentId(25, 29), - new LSMDiskComponentId(20, 24), new LSMDiskComponentId(10, 19)), resultSecondaryIDs); + Assert.assertEquals(Arrays.asList(new LSMComponentId(30, 35), new LSMComponentId(25, 29), + new LSMComponentId(20, 24), new LSMComponentId(10, 19)), resultPrimaryIDs); + Assert.assertEquals(Arrays.asList(new LSMComponentId(30, 35), new LSMComponentId(25, 29), + new LSMComponentId(20, 24), new LSMComponentId(10, 19)), resultSecondaryIDs); Assert.assertTrue(resultSecondaryIDs1.isEmpty()); } catch (HyracksDataException e) { Assert.fail(e.getMessage()); @@ -205,12 +203,12 @@ public class CorrelatedPrefixMergePolicyTest extends TestCase { return policy; } - private IndexInfo mockIndex(boolean isPrimary, List<ILSMDiskComponentId> componentIDs, - List<ILSMDiskComponentId> resultComponentIDs, int partition) throws HyracksDataException { + private IndexInfo mockIndex(boolean isPrimary, List<ILSMComponentId> componentIDs, + List<ILSMComponentId> resultComponentIDs, int partition) throws HyracksDataException { List<ILSMDiskComponent> components = new ArrayList<>(); - for (ILSMDiskComponentId id : componentIDs) { + for (ILSMComponentId id : componentIDs) { ILSMDiskComponent component = Mockito.mock(ILSMDiskComponent.class); - Mockito.when(component.getComponentId()).thenReturn(id); + Mockito.when(component.getId()).thenReturn(id); Mockito.when(component.getComponentSize()).thenReturn(DEFAULT_COMPONENT_SIZE); Mockito.when(component.getState()).thenReturn(ComponentState.READABLE_UNWRITABLE); components.add(component); @@ -227,7 +225,7 @@ public class CorrelatedPrefixMergePolicyTest extends TestCase { List<ILSMDiskComponent> mergedComponents = invocation.getArgumentAt(1, List.class); mergedComponents.forEach(component -> { try { - resultComponentIDs.add(component.getComponentId()); + resultComponentIDs.add(component.getId()); } catch (HyracksDataException e) { e.printStackTrace(); } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/39390edc/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/ioopcallbacks/LSMBTreeIOOperationCallbackTest.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/ioopcallbacks/LSMBTreeIOOperationCallbackTest.java b/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/ioopcallbacks/LSMBTreeIOOperationCallbackTest.java index d48227f..f467ee8 100644 --- a/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/ioopcallbacks/LSMBTreeIOOperationCallbackTest.java +++ b/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/ioopcallbacks/LSMBTreeIOOperationCallbackTest.java @@ -23,6 +23,7 @@ import org.apache.asterix.common.ioopcallbacks.LSMBTreeIOOperationCallback; import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation.LSMIOOperationType; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex; +import org.apache.hyracks.storage.am.lsm.common.impls.LSMComponentIdGenerator; import org.junit.Assert; import org.mockito.Mockito; @@ -34,7 +35,8 @@ public class LSMBTreeIOOperationCallbackTest extends TestCase { try { ILSMIndex mockIndex = Mockito.mock(ILSMIndex.class); Mockito.when(mockIndex.getNumberOfAllMemoryComponents()).thenReturn(2); - LSMBTreeIOOperationCallback callback = new LSMBTreeIOOperationCallback(mockIndex); + LSMBTreeIOOperationCallback callback = + new LSMBTreeIOOperationCallback(mockIndex, new LSMComponentIdGenerator()); //request to flush first component callback.updateLastLSN(1); @@ -58,7 +60,8 @@ public class LSMBTreeIOOperationCallbackTest extends TestCase { try { ILSMIndex mockIndex = Mockito.mock(ILSMIndex.class); Mockito.when(mockIndex.getNumberOfAllMemoryComponents()).thenReturn(2); - LSMBTreeIOOperationCallback callback = new LSMBTreeIOOperationCallback(mockIndex); + LSMBTreeIOOperationCallback callback = + new LSMBTreeIOOperationCallback(mockIndex, new LSMComponentIdGenerator()); //request to flush first component callback.updateLastLSN(1); http://git-wip-us.apache.org/repos/asf/asterixdb/blob/39390edc/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/ioopcallbacks/LSMBTreeWithBuddyIOOperationCallbackTest.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/ioopcallbacks/LSMBTreeWithBuddyIOOperationCallbackTest.java b/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/ioopcallbacks/LSMBTreeWithBuddyIOOperationCallbackTest.java index 94ef0a3..63c46f7 100644 --- a/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/ioopcallbacks/LSMBTreeWithBuddyIOOperationCallbackTest.java +++ b/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/ioopcallbacks/LSMBTreeWithBuddyIOOperationCallbackTest.java @@ -23,6 +23,7 @@ import org.apache.asterix.common.ioopcallbacks.LSMBTreeWithBuddyIOOperationCallb import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation.LSMIOOperationType; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex; +import org.apache.hyracks.storage.am.lsm.common.impls.LSMComponentIdGenerator; import org.junit.Assert; import org.mockito.Mockito; @@ -34,7 +35,8 @@ public class LSMBTreeWithBuddyIOOperationCallbackTest extends TestCase { try { ILSMIndex mockIndex = Mockito.mock(ILSMIndex.class); Mockito.when(mockIndex.getNumberOfAllMemoryComponents()).thenReturn(2); - LSMBTreeWithBuddyIOOperationCallback callback = new LSMBTreeWithBuddyIOOperationCallback(mockIndex); + LSMBTreeWithBuddyIOOperationCallback callback = + new LSMBTreeWithBuddyIOOperationCallback(mockIndex, new LSMComponentIdGenerator()); //request to flush first component callback.updateLastLSN(1); @@ -58,7 +60,8 @@ public class LSMBTreeWithBuddyIOOperationCallbackTest extends TestCase { try { ILSMIndex mockIndex = Mockito.mock(ILSMIndex.class); Mockito.when(mockIndex.getNumberOfAllMemoryComponents()).thenReturn(2); - LSMBTreeWithBuddyIOOperationCallback callback = new LSMBTreeWithBuddyIOOperationCallback(mockIndex); + LSMBTreeWithBuddyIOOperationCallback callback = + new LSMBTreeWithBuddyIOOperationCallback(mockIndex, new LSMComponentIdGenerator()); //request to flush first component callback.updateLastLSN(1); http://git-wip-us.apache.org/repos/asf/asterixdb/blob/39390edc/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/ioopcallbacks/LSMInvertedIndexIOOperationCallbackTest.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/ioopcallbacks/LSMInvertedIndexIOOperationCallbackTest.java b/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/ioopcallbacks/LSMInvertedIndexIOOperationCallbackTest.java index b213da0..1e961d8 100644 --- a/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/ioopcallbacks/LSMInvertedIndexIOOperationCallbackTest.java +++ b/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/ioopcallbacks/LSMInvertedIndexIOOperationCallbackTest.java @@ -23,6 +23,7 @@ import org.apache.asterix.common.ioopcallbacks.LSMInvertedIndexIOOperationCallba import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation.LSMIOOperationType; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex; +import org.apache.hyracks.storage.am.lsm.common.impls.LSMComponentIdGenerator; import org.junit.Assert; import org.mockito.Mockito; @@ -34,7 +35,8 @@ public class LSMInvertedIndexIOOperationCallbackTest extends TestCase { try { ILSMIndex mockIndex = Mockito.mock(ILSMIndex.class); Mockito.when(mockIndex.getNumberOfAllMemoryComponents()).thenReturn(2); - LSMInvertedIndexIOOperationCallback callback = new LSMInvertedIndexIOOperationCallback(mockIndex); + LSMInvertedIndexIOOperationCallback callback = + new LSMInvertedIndexIOOperationCallback(mockIndex, new LSMComponentIdGenerator()); //request to flush first component callback.updateLastLSN(1); @@ -58,7 +60,8 @@ public class LSMInvertedIndexIOOperationCallbackTest extends TestCase { try { ILSMIndex mockIndex = Mockito.mock(ILSMIndex.class); Mockito.when(mockIndex.getNumberOfAllMemoryComponents()).thenReturn(2); - LSMInvertedIndexIOOperationCallback callback = new LSMInvertedIndexIOOperationCallback(mockIndex); + LSMInvertedIndexIOOperationCallback callback = + new LSMInvertedIndexIOOperationCallback(mockIndex, new LSMComponentIdGenerator()); //request to flush first component callback.updateLastLSN(1); http://git-wip-us.apache.org/repos/asf/asterixdb/blob/39390edc/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/ioopcallbacks/LSMRTreeIOOperationCallbackTest.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/ioopcallbacks/LSMRTreeIOOperationCallbackTest.java b/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/ioopcallbacks/LSMRTreeIOOperationCallbackTest.java index df26ef9..618f2a3 100644 --- a/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/ioopcallbacks/LSMRTreeIOOperationCallbackTest.java +++ b/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/ioopcallbacks/LSMRTreeIOOperationCallbackTest.java @@ -23,6 +23,7 @@ import org.apache.asterix.common.ioopcallbacks.LSMRTreeIOOperationCallback; import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation.LSMIOOperationType; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex; +import org.apache.hyracks.storage.am.lsm.common.impls.LSMComponentIdGenerator; import org.junit.Assert; import org.mockito.Mockito; @@ -34,7 +35,8 @@ public class LSMRTreeIOOperationCallbackTest extends TestCase { try { ILSMIndex mockIndex = Mockito.mock(ILSMIndex.class); Mockito.when(mockIndex.getNumberOfAllMemoryComponents()).thenReturn(2); - LSMRTreeIOOperationCallback callback = new LSMRTreeIOOperationCallback(mockIndex); + LSMRTreeIOOperationCallback callback = + new LSMRTreeIOOperationCallback(mockIndex, new LSMComponentIdGenerator()); //request to flush first component callback.updateLastLSN(1); @@ -58,7 +60,8 @@ public class LSMRTreeIOOperationCallbackTest extends TestCase { try { ILSMIndex mockIndex = Mockito.mock(ILSMIndex.class); Mockito.when(mockIndex.getNumberOfAllMemoryComponents()).thenReturn(2); - LSMRTreeIOOperationCallback callback = new LSMRTreeIOOperationCallback(mockIndex); + LSMRTreeIOOperationCallback callback = + new LSMRTreeIOOperationCallback(mockIndex, new LSMComponentIdGenerator()); //request to flush first component callback.updateLastLSN(1); http://git-wip-us.apache.org/repos/asf/asterixdb/blob/39390edc/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataBootstrap.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataBootstrap.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataBootstrap.java index a6f1ad0..d2622c4 100644 --- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataBootstrap.java +++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataBootstrap.java @@ -33,6 +33,7 @@ import org.apache.asterix.common.config.DatasetConfig.DatasetType; import org.apache.asterix.common.config.GlobalConfig; import org.apache.asterix.common.config.MetadataProperties; import org.apache.asterix.common.context.AsterixVirtualBufferCacheProvider; +import org.apache.asterix.common.context.DatasetLSMComponentIdGeneratorFactory; import org.apache.asterix.common.context.CorrelatedPrefixMergePolicyFactory; import org.apache.asterix.common.context.IStorageComponentProvider; import org.apache.asterix.common.exceptions.ACIDException; @@ -79,6 +80,7 @@ import org.apache.hyracks.storage.am.common.api.IIndexBuilder; import org.apache.hyracks.storage.am.common.build.IndexBuilder; import org.apache.hyracks.storage.am.common.dataflow.IndexDataflowHelper; import org.apache.hyracks.storage.am.lsm.btree.dataflow.LSMBTreeLocalResourceFactory; +import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentIdGeneratorFactory; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallbackFactory; import org.apache.hyracks.storage.am.lsm.common.api.ILSMMergePolicyFactory; import org.apache.hyracks.storage.am.lsm.common.api.ILSMOperationTrackerFactory; @@ -324,7 +326,10 @@ public class MetadataBootstrap { ILSMOperationTrackerFactory opTrackerFactory = index.isPrimaryIndex() ? new PrimaryIndexOperationTrackerFactory(datasetId) : new SecondaryIndexOperationTrackerFactory(datasetId); - ILSMIOOperationCallbackFactory ioOpCallbackFactory = LSMBTreeIOOperationCallbackFactory.INSTANCE; + ILSMComponentIdGeneratorFactory idGeneratorProvider = + new DatasetLSMComponentIdGeneratorFactory(index.getDatasetId().getId()); + ILSMIOOperationCallbackFactory ioOpCallbackFactory = + new LSMBTreeIOOperationCallbackFactory(idGeneratorProvider); IStorageComponentProvider storageComponentProvider = appContext.getStorageComponentProvider(); if (isNewUniverse()) { LSMBTreeLocalResourceFactory lsmBtreeFactory = new LSMBTreeLocalResourceFactory(
