[ASTERIXDB-2231][STO] Separate primary op tracker for each partition - user model changes: no - storage format changes: no. - interface changes: yes.
Details: - Separate primary index operation tracker for each partition, instead of having a global one on each NC to achieve better scalability. - As a coordinated change, separate component id generator for each partition as well. - Add partition to transaction context so that transaction operations can operate on proper op tracker. - Fixes [ASTERIXDB-2232] to calculate dataset partitions correctly. Change-Id: I9eb3854d2343e45beeccb87b0d434e5f4efd69c9 Reviewed-on: https://asterix-gerrit.ics.uci.edu/2263 Integration-Tests: Jenkins <[email protected]> Tested-by: Jenkins <[email protected]> Reviewed-by: abdullah alamoudi <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/asterixdb/repo Commit: http://git-wip-us.apache.org/repos/asf/asterixdb/commit/2f934e31 Tree: http://git-wip-us.apache.org/repos/asf/asterixdb/tree/2f934e31 Diff: http://git-wip-us.apache.org/repos/asf/asterixdb/diff/2f934e31 Branch: refs/heads/master Commit: 2f934e312e841223aa0e4941e01f23b9e63fdc18 Parents: f94fdcc Author: luochen01 <[email protected]> Authored: Thu Feb 1 09:17:23 2018 -0800 Committer: abdullah alamoudi <[email protected]> Committed: Thu Feb 1 23:35:43 2018 -0800 ---------------------------------------------------------------------- .../asterix/app/nc/NCAppRuntimeContext.java | 22 +-- .../test/dataflow/ComponentRollbackTest.java | 14 +- .../dataflow/MultiPartitionLSMIndexTest.java | 5 +- .../TestLsmBtreeIoOpCallbackFactory.java | 3 +- .../TestPrimaryIndexOperationTracker.java | 4 +- ...TestPrimaryIndexOperationTrackerFactory.java | 29 ++- .../asterix/test/metadata/MetadataTxnTest.java | 7 +- .../apache/asterix/test/txn/LogManagerTest.java | 2 +- .../common/api/IDatasetLifecycleManager.java | 6 +- .../common/api/INcApplicationContext.java | 2 +- .../context/CorrelatedPrefixMergePolicy.java | 25 +-- .../asterix/common/context/DatasetInfo.java | 43 +++-- .../DatasetLSMComponentIdGeneratorFactory.java | 9 +- .../common/context/DatasetLifecycleManager.java | 192 +++++++++++-------- .../asterix/common/context/DatasetResource.java | 45 +++-- .../context/PrimaryIndexOperationTracker.java | 12 +- ...tractLSMIndexIOOperationCallbackFactory.java | 13 +- .../LSMBTreeIOOperationCallbackFactory.java | 3 +- ...TreeWithBuddyIOOperationCallbackFactory.java | 3 +- ...InvertedIndexIOOperationCallbackFactory.java | 3 +- .../LSMRTreeIOOperationCallbackFactory.java | 3 +- .../transactions/ITransactionContext.java | 8 +- .../CorrelatedPrefixMergePolicyTest.java | 27 +-- .../apache/asterix/metadata/MetadataNode.java | 5 +- .../metadata/bootstrap/MetadataBootstrap.java | 2 +- .../asterix/metadata/entities/Dataset.java | 7 +- .../job/listener/JobEventListenerFactory.java | 3 +- ...dexModificationOperationCallbackFactory.java | 2 +- .../PrimaryIndexOperationTrackerFactory.java | 9 +- ...dexModificationOperationCallbackFactory.java | 2 +- .../SecondaryIndexOperationTrackerFactory.java | 5 +- .../UpsertOperationCallbackFactory.java | 2 +- .../resource/DatasetLocalResourceFactory.java | 3 +- .../management/service/logging/LogBuffer.java | 2 +- .../transaction/AbstractTransactionContext.java | 2 +- .../transaction/AtomicTransactionContext.java | 6 +- .../EntityLevelTransactionContext.java | 44 +++-- .../dataflow/ExternalBTreeLocalResource.java | 4 +- .../ExternalBTreeWithBuddyLocalResource.java | 4 +- .../btree/dataflow/LSMBTreeLocalResource.java | 4 +- .../am/lsm/btree/impls/ExternalBTree.java | 3 +- .../lsm/btree/impls/ExternalBTreeWithBuddy.java | 4 +- .../storage/am/lsm/btree/impls/LSMBTree.java | 2 +- .../am/lsm/btree/utils/LSMBTreeUtil.java | 6 +- .../api/ILSMComponentIdGeneratorFactory.java | 6 +- .../api/ILSMIOOperationCallbackFactory.java | 8 +- .../common/api/ILSMOperationTrackerFactory.java | 4 +- .../am/lsm/common/impls/AbstractLSMIndex.java | 4 +- .../impls/LSMComponentIdGeneratorFactory.java | 3 +- .../impls/NoOpIOOperationCallbackFactory.java | 3 +- .../impls/NoOpOperationTrackerFactory.java | 3 +- .../ThreadCountingOperationTrackerFactory.java | 3 +- .../dataflow/LSMInvertedIndexLocalResource.java | 6 +- .../dataflow/ExternalRTreeLocalResource.java | 8 +- .../rtree/dataflow/LSMRTreeLocalResource.java | 9 +- .../LSMRTreeWithAntiMatterLocalResource.java | 4 +- .../am/lsm/rtree/impls/AbstractLSMRTree.java | 2 +- .../am/lsm/rtree/impls/ExternalRTree.java | 2 +- .../storage/am/lsm/rtree/impls/LSMRTree.java | 2 +- ...MBTreeModificationOperationCallbackTest.java | 2 +- .../LSMBTreeSearchOperationCallbackTest.java | 2 +- .../am/lsm/btree/LSMBTreeUpdateInPlaceTest.java | 2 +- .../btree/impl/TestLsmBtreeLocalResource.java | 4 +- 63 files changed, 408 insertions(+), 270 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2f934e31/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java index c554cbd..366438a 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java @@ -109,15 +109,15 @@ public class NCAppRuntimeContext implements INcApplicationContext { private ILSMMergePolicyFactory metadataMergePolicyFactory; private final INCServiceContext ncServiceContext; private final IResourceIdFactory resourceIdFactory; - private CompilerProperties compilerProperties; - private ExternalProperties externalProperties; - private MetadataProperties metadataProperties; - private StorageProperties storageProperties; - private TransactionProperties txnProperties; - private ActiveProperties activeProperties; - private BuildProperties buildProperties; - private ReplicationProperties replicationProperties; - private MessagingProperties messagingProperties; + private final CompilerProperties compilerProperties; + private final ExternalProperties externalProperties; + private final MetadataProperties metadataProperties; + private final StorageProperties storageProperties; + private final TransactionProperties txnProperties; + private final ActiveProperties activeProperties; + private final BuildProperties buildProperties; + private final ReplicationProperties replicationProperties; + private final MessagingProperties messagingProperties; private final NodeProperties nodeProperties; private ExecutorService threadExecutor; private IDatasetMemoryManager datasetMemoryManager; @@ -373,8 +373,8 @@ public class NCAppRuntimeContext implements INcApplicationContext { } @Override - public ILSMOperationTracker getLSMBTreeOperationTracker(int datasetID) { - return datasetLifecycleManager.getOperationTracker(datasetID); + public ILSMOperationTracker getPrimaryOperationTracker(int datasetID, int partition) { + return datasetLifecycleManager.getOperationTracker(datasetID, partition); } @Override http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2f934e31/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/ComponentRollbackTest.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/ComponentRollbackTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/ComponentRollbackTest.java index a46b029..c6232f5 100644 --- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/ComponentRollbackTest.java +++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/ComponentRollbackTest.java @@ -144,7 +144,7 @@ public class ComponentRollbackTest { Assert.assertTrue(memComponents.get(lsmBtree.getCurrentMemoryComponentIndex()).isModified()); StorageTestUtils.searchAndAssertCount(nc, PARTITION, StorageTestUtils.TOTAL_NUM_OF_RECORDS); ILSMIndexAccessor lsmAccessor = lsmBtree.createAccessor(NoOpIndexAccessParameters.INSTANCE); - dsLifecycleMgr.getComponentIdGenerator(StorageTestUtils.DATASET_ID).refresh(); + dsLifecycleMgr.getComponentIdGenerator(StorageTestUtils.DATASET_ID, PARTITION).refresh(); ((AbstractLSMIOOperationCallback) lsmBtree.getIOOperationCallback()).updateLastLSN(0); // rollback a memory component lsmAccessor.deleteComponents(memoryComponentsPredicate); @@ -153,7 +153,7 @@ public class ComponentRollbackTest { // rollback the last disk component lsmAccessor = lsmBtree.createAccessor(NoOpIndexAccessParameters.INSTANCE); long lsn = AbstractLSMIOOperationCallback.getTreeIndexLSN(diskComponents.get(0).getMetadata()); - dsLifecycleMgr.getComponentIdGenerator(StorageTestUtils.DATASET_ID).refresh(); + dsLifecycleMgr.getComponentIdGenerator(StorageTestUtils.DATASET_ID, PARTITION).refresh(); ((AbstractLSMIOOperationCallback) lsmBtree.getIOOperationCallback()).updateLastLSN(0); DiskComponentLsnPredicate pred = new DiskComponentLsnPredicate(lsn); lsmAccessor.deleteComponents(pred); @@ -201,7 +201,7 @@ public class ComponentRollbackTest { Assert.assertTrue(memComponents.get(lsmBtree.getCurrentMemoryComponentIndex()).isModified()); StorageTestUtils.searchAndAssertCount(nc, PARTITION, StorageTestUtils.TOTAL_NUM_OF_RECORDS); ILSMIndexAccessor lsmAccessor = lsmBtree.createAccessor(NoOpIndexAccessParameters.INSTANCE); - dsLifecycleMgr.getComponentIdGenerator(StorageTestUtils.DATASET_ID).refresh(); + dsLifecycleMgr.getComponentIdGenerator(StorageTestUtils.DATASET_ID, PARTITION).refresh(); ((AbstractLSMIOOperationCallback) lsmBtree.getIOOperationCallback()).updateLastLSN(0); // rollback a memory component lsmAccessor.deleteComponents(memoryComponentsPredicate); @@ -227,7 +227,7 @@ public class ComponentRollbackTest { // rollback the last disk component lsmAccessor = lsmBtree.createAccessor(NoOpIndexAccessParameters.INSTANCE); long lsn = AbstractLSMIOOperationCallback.getTreeIndexLSN(diskComponents.get(0).getMetadata()); - dsLifecycleMgr.getComponentIdGenerator(StorageTestUtils.DATASET_ID).refresh(); + dsLifecycleMgr.getComponentIdGenerator(StorageTestUtils.DATASET_ID, PARTITION).refresh(); ((AbstractLSMIOOperationCallback) lsmBtree.getIOOperationCallback()).updateLastLSN(0); DiskComponentLsnPredicate pred = new DiskComponentLsnPredicate(lsn); lsmAccessor.deleteComponents(pred); @@ -276,7 +276,7 @@ public class ComponentRollbackTest { firstSearcher.waitUntilEntered(); // now that we enetered, we will rollback ILSMIndexAccessor lsmAccessor = lsmBtree.createAccessor(NoOpIndexAccessParameters.INSTANCE); - dsLifecycleMgr.getComponentIdGenerator(StorageTestUtils.DATASET_ID).refresh(); + dsLifecycleMgr.getComponentIdGenerator(StorageTestUtils.DATASET_ID, PARTITION).refresh(); ((AbstractLSMIOOperationCallback) lsmBtree.getIOOperationCallback()).updateLastLSN(0); // rollback a memory component lsmAccessor.deleteComponents( @@ -298,7 +298,7 @@ public class ComponentRollbackTest { lsmAccessor = lsmBtree.createAccessor(NoOpIndexAccessParameters.INSTANCE); long lsn = AbstractLSMIOOperationCallback.getTreeIndexLSN(diskComponents.get(0).getMetadata()); - dsLifecycleMgr.getComponentIdGenerator(StorageTestUtils.DATASET_ID).refresh(); + dsLifecycleMgr.getComponentIdGenerator(StorageTestUtils.DATASET_ID, PARTITION).refresh(); ((AbstractLSMIOOperationCallback) lsmBtree.getIOOperationCallback()).updateLastLSN(0); DiskComponentLsnPredicate pred = new DiskComponentLsnPredicate(lsn); lsmAccessor.deleteComponents(pred); @@ -702,7 +702,7 @@ public class ComponentRollbackTest { public void run() { ILSMIndexAccessor lsmAccessor = lsmBtree.createAccessor(NoOpIndexAccessParameters.INSTANCE); try { - dsLifecycleMgr.getComponentIdGenerator(StorageTestUtils.DATASET_ID).refresh(); + dsLifecycleMgr.getComponentIdGenerator(StorageTestUtils.DATASET_ID, PARTITION).refresh(); ((AbstractLSMIOOperationCallback) lsmBtree.getIOOperationCallback()).updateLastLSN(0); lsmAccessor.deleteComponents(predicate); } catch (HyracksDataException e) { http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2f934e31/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/MultiPartitionLSMIndexTest.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/MultiPartitionLSMIndexTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/MultiPartitionLSMIndexTest.java index 367d0b9..62705cc 100644 --- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/MultiPartitionLSMIndexTest.java +++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/MultiPartitionLSMIndexTest.java @@ -251,7 +251,7 @@ public class MultiPartitionLSMIndexTest { /** * This test update partition 0, schedule flush and modify partition 1 - * Then ensure that in partition 1, primary and secondary have different component ids + * Then ensure that in partition 1, primary and secondary have the same component ids */ @Test public void testAllocateWhileFlushIsScheduled() { @@ -400,7 +400,8 @@ public class MultiPartitionLSMIndexTest { AtomicBoolean arrivedAtSchduleFlush = new AtomicBoolean(false); AtomicBoolean finishedSchduleFlush = new AtomicBoolean(false); MutableBoolean proceedToScheduleFlush = new MutableBoolean(false); - addOpTrackerCallback(primaryLsmBtrees[0], new ITestOpCallback<Void>() { + // keep track of the flush of partition 1 since partitions 0 and 1 are flushed seperately + addOpTrackerCallback(primaryLsmBtrees[1], new ITestOpCallback<Void>() { @Override public void before(Void t) { synchronized (arrivedAtSchduleFlush) { http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2f934e31/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestLsmBtreeIoOpCallbackFactory.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestLsmBtreeIoOpCallbackFactory.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestLsmBtreeIoOpCallbackFactory.java index 4bfc581..c69ffe5 100644 --- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestLsmBtreeIoOpCallbackFactory.java +++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestLsmBtreeIoOpCallbackFactory.java @@ -22,7 +22,6 @@ import java.util.List; import org.apache.asterix.common.ioopcallbacks.LSMBTreeIOOperationCallback; import org.apache.asterix.common.ioopcallbacks.LSMBTreeIOOperationCallbackFactory; -import org.apache.asterix.common.storage.IIndexCheckpointManager; import org.apache.asterix.common.storage.IIndexCheckpointManagerProvider; import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.storage.am.lsm.btree.impl.TestLsmBtree; @@ -52,7 +51,7 @@ public class TestLsmBtreeIoOpCallbackFactory extends LSMBTreeIOOperationCallback } @Override - public synchronized ILSMIOOperationCallback createIoOpCallback(ILSMIndex index) { + public synchronized ILSMIOOperationCallback createIoOpCallback(ILSMIndex index) throws HyracksDataException { completedFlushes = 0; completedMerges = 0; rollbackFlushes = 0; http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2f934e31/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestPrimaryIndexOperationTracker.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestPrimaryIndexOperationTracker.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestPrimaryIndexOperationTracker.java index e376ff9..9a528d3 100644 --- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestPrimaryIndexOperationTracker.java +++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestPrimaryIndexOperationTracker.java @@ -33,9 +33,9 @@ public class TestPrimaryIndexOperationTracker extends PrimaryIndexOperationTrack private final List<ITestOpCallback<Void>> callbacks = new ArrayList<>(); - public TestPrimaryIndexOperationTracker(int datasetID, ILogManager logManager, DatasetInfo dsInfo, + public TestPrimaryIndexOperationTracker(int datasetID, int partition, ILogManager logManager, DatasetInfo dsInfo, ILSMComponentIdGenerator idGenerator) { - super(datasetID, logManager, dsInfo, idGenerator); + super(datasetID, partition, logManager, dsInfo, idGenerator); } public void addCallback(ITestOpCallback<Void> callback) { http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2f934e31/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestPrimaryIndexOperationTrackerFactory.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestPrimaryIndexOperationTrackerFactory.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestPrimaryIndexOperationTrackerFactory.java index 5d7a7c6..e6b34b8 100644 --- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestPrimaryIndexOperationTrackerFactory.java +++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestPrimaryIndexOperationTrackerFactory.java @@ -20,19 +20,22 @@ package org.apache.asterix.test.dataflow; import java.lang.reflect.Field; import java.lang.reflect.Modifier; +import java.util.Map; import org.apache.asterix.common.api.INcApplicationContext; import org.apache.asterix.common.context.DatasetLifecycleManager; import org.apache.asterix.common.context.DatasetResource; import org.apache.asterix.common.context.PrimaryIndexOperationTracker; +import org.apache.asterix.common.utils.StoragePathUtil; import org.apache.asterix.transaction.management.opcallbacks.PrimaryIndexOperationTrackerFactory; import org.apache.hyracks.api.application.INCServiceContext; import org.apache.hyracks.storage.am.lsm.common.api.ILSMOperationTracker; +import org.apache.hyracks.storage.common.IResource; public class TestPrimaryIndexOperationTrackerFactory extends PrimaryIndexOperationTrackerFactory { private static final long serialVersionUID = 1L; - private int datasetId; + private final int datasetId; public TestPrimaryIndexOperationTrackerFactory(int datasetId) { super(datasetId); @@ -40,17 +43,19 @@ public class TestPrimaryIndexOperationTrackerFactory extends PrimaryIndexOperati } @Override - public ILSMOperationTracker getOperationTracker(INCServiceContext ctx) { + public ILSMOperationTracker getOperationTracker(INCServiceContext ctx, IResource resource) { try { INcApplicationContext appCtx = (INcApplicationContext) ctx.getApplicationContext(); DatasetLifecycleManager dslcManager = (DatasetLifecycleManager) appCtx.getDatasetLifecycleManager(); DatasetResource dsr = dslcManager.getDatasetLifecycle(datasetId); - PrimaryIndexOperationTracker opTracker = dsr.getOpTracker(); + int partition = StoragePathUtil.getPartitionNumFromRelativePath(resource.getPath()); + PrimaryIndexOperationTracker opTracker = dslcManager.getOperationTracker(datasetId, partition); if (!(opTracker instanceof TestPrimaryIndexOperationTracker)) { - Field opTrackerField = DatasetResource.class.getDeclaredField("datasetPrimaryOpTracker"); - opTracker = new TestPrimaryIndexOperationTracker(datasetId, - appCtx.getTransactionSubsystem().getLogManager(), dsr.getDatasetInfo(), dsr.getIdGenerator()); - setFinal(opTrackerField, dsr, opTracker); + Field opTrackersField = DatasetResource.class.getDeclaredField("datasetPrimaryOpTrackers"); + opTracker = new TestPrimaryIndexOperationTracker(datasetId, partition, + appCtx.getTransactionSubsystem().getLogManager(), dsr.getDatasetInfo(), + dslcManager.getComponentIdGenerator(datasetId, partition)); + replaceMapEntry(opTrackersField, dsr, partition, opTracker); } return opTracker; } catch (Exception e) { @@ -65,4 +70,14 @@ public class TestPrimaryIndexOperationTrackerFactory extends PrimaryIndexOperati modifiersField.setInt(field, field.getModifiers() & ~Modifier.FINAL); field.set(obj, newValue); } + + @SuppressWarnings({ "rawtypes", "unchecked" }) + static void replaceMapEntry(Field field, Object obj, Object key, Object value) + throws Exception, IllegalAccessException { + field.setAccessible(true); + Field modifiersField = Field.class.getDeclaredField("modifiers"); + modifiersField.setAccessible(true); + Map map = (Map) field.get(obj); + map.put(key, value); + } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2f934e31/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/metadata/MetadataTxnTest.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/metadata/MetadataTxnTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/metadata/MetadataTxnTest.java index a10c234..70e5f6e 100644 --- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/metadata/MetadataTxnTest.java +++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/metadata/MetadataTxnTest.java @@ -42,6 +42,7 @@ import org.apache.asterix.metadata.entities.NodeGroup; import org.apache.asterix.metadata.utils.DatasetUtil; import org.apache.asterix.test.common.TestExecutor; import org.apache.asterix.testframework.context.TestCaseContext; +import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex; import org.apache.hyracks.storage.am.lsm.common.impls.NoMergePolicyFactory; import org.junit.After; import org.junit.Assert; @@ -203,8 +204,10 @@ public class MetadataTxnTest { ((INcApplicationContext) integrationUtil.ncs[0].getApplicationContext()).getDatasetLifecycleManager(); int maxMetadatasetId = 14; for (int i = 1; i <= maxMetadatasetId; i++) { - if (datasetLifecycleManager.getIndex(i, i) != null) { - final PrimaryIndexOperationTracker opTracker = datasetLifecycleManager.getOperationTracker(i); + ILSMIndex index = (ILSMIndex) datasetLifecycleManager.getIndex(i, i); + if (index != null) { + final PrimaryIndexOperationTracker opTracker = + (PrimaryIndexOperationTracker) index.getOperationTracker(); Assert.assertEquals(0, opTracker.getNumActiveOperations()); } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2f934e31/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/txn/LogManagerTest.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/txn/LogManagerTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/txn/LogManagerTest.java index a1978eb..6a70a29 100644 --- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/txn/LogManagerTest.java +++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/txn/LogManagerTest.java @@ -170,7 +170,7 @@ public class LogManagerTest { final TransactionOptions options = new TransactionOptions(ITransactionManager.AtomicityLevel.ENTITY_LEVEL); final ITransactionManager transactionManager = ncAppCtx.getTransactionSubsystem().getTransactionManager(); final ITransactionContext txnCtx = transactionManager.beginTransaction(txnId, options); - txnCtx.register(resourceId, index, NoOpOperationCallback.INSTANCE, true); + txnCtx.register(resourceId, 0, index, NoOpOperationCallback.INSTANCE, true); return txnCtx; } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2f934e31/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IDatasetLifecycleManager.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IDatasetLifecycleManager.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IDatasetLifecycleManager.java index 41c5ade..4441c6e 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IDatasetLifecycleManager.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IDatasetLifecycleManager.java @@ -75,17 +75,19 @@ public interface IDatasetLifecycleManager extends IResourceLifecycleManager<IInd * creates (if necessary) and returns the primary index operation tracker of a dataset. * * @param datasetId + * @param partition * @return */ - PrimaryIndexOperationTracker getOperationTracker(int datasetId); + PrimaryIndexOperationTracker getOperationTracker(int datasetId, int partition); /** * creates (if necessary) and returns the component Id generator of a dataset. * * @param datasetId + * @param partition * @return */ - ILSMComponentIdGenerator getComponentIdGenerator(int datasetId); + ILSMComponentIdGenerator getComponentIdGenerator(int datasetId, int partition); /** * creates (if necessary) and returns the dataset virtual buffer caches. http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2f934e31/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/INcApplicationContext.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/INcApplicationContext.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/INcApplicationContext.java index 8a83c7b..fffc170 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/INcApplicationContext.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/INcApplicationContext.java @@ -66,7 +66,7 @@ public interface INcApplicationContext extends IApplicationContext { IResourceIdFactory getResourceIdFactory(); - ILSMOperationTracker getLSMBTreeOperationTracker(int datasetID); + ILSMOperationTracker getPrimaryOperationTracker(int datasetID, int partition); void initialize(boolean initialRun) throws IOException, ACIDException, AlgebricksException; http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2f934e31/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/CorrelatedPrefixMergePolicy.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/CorrelatedPrefixMergePolicy.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/CorrelatedPrefixMergePolicy.java index e5fc998..41461ec 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/CorrelatedPrefixMergePolicy.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/CorrelatedPrefixMergePolicy.java @@ -23,7 +23,6 @@ import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.Set; -import java.util.stream.Collectors; import org.apache.asterix.common.api.IDatasetLifecycleManager; import org.apache.commons.lang3.tuple.Pair; @@ -92,10 +91,10 @@ public class CorrelatedPrefixMergePolicy extends PrefixMergePolicy { ILSMComponent leftComponent = immutableComponents.get(mergeableIndexes.getLeft()); ILSMComponent rightComponent = immutableComponents.get(mergeableIndexes.getRight()); ILSMComponentId targetId = LSMComponentIdUtils.union(leftComponent.getId(), rightComponent.getId()); - Set<IndexInfo> indexInfos = datasetLifecycleManager.getDatasetInfo(datasetId).getDatsetIndexInfos(); - int partition = getIndexPartition(index, indexInfos); - triggerScheduledMerge(targetId, - indexInfos.stream().filter(info -> info.getPartition() == partition).collect(Collectors.toSet())); + int partition = ((PrimaryIndexOperationTracker) index.getOperationTracker()).getPartition(); + Set<ILSMIndex> indexes = + datasetLifecycleManager.getDatasetInfo(datasetId).getDatasetPartitionOpenIndexes(partition); + triggerScheduledMerge(targetId, indexes); return true; } @@ -107,11 +106,8 @@ public class CorrelatedPrefixMergePolicy extends PrefixMergePolicy { * @param indexInfos * @throws HyracksDataException */ - private void triggerScheduledMerge(ILSMComponentId targetId, Set<IndexInfo> indexInfos) - throws HyracksDataException { - for (IndexInfo info : indexInfos) { - ILSMIndex lsmIndex = info.getIndex(); - + private void triggerScheduledMerge(ILSMComponentId targetId, Set<ILSMIndex> indexes) throws HyracksDataException { + for (ILSMIndex lsmIndex : indexes) { List<ILSMDiskComponent> immutableComponents = new ArrayList<>(lsmIndex.getDiskComponents()); if (isMergeOngoing(immutableComponents)) { continue; @@ -132,13 +128,4 @@ public class CorrelatedPrefixMergePolicy extends PrefixMergePolicy { accessor.scheduleMerge(lsmIndex.getIOOperationCallback(), mergableComponents); } } - - private int getIndexPartition(ILSMIndex index, Set<IndexInfo> indexInfos) { - for (IndexInfo info : indexInfos) { - if (info.getIndex() == index) { - return info.getPartition(); - } - } - return -1; - } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2f934e31/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetInfo.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetInfo.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetInfo.java index 9d63818..44baf77 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetInfo.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetInfo.java @@ -30,6 +30,9 @@ import org.apache.logging.log4j.Logger; public class DatasetInfo extends Info implements Comparable<DatasetInfo> { private static final Logger LOGGER = LogManager.getLogger(); + // partition -> index + private final Map<Integer, Set<IndexInfo>> partitionIndexes; + // resourceID -> index private final Map<Long, IndexInfo> indexes; private final int datasetID; private int numActiveIOOps; @@ -40,6 +43,7 @@ public class DatasetInfo extends Info implements Comparable<DatasetInfo> { private boolean durable; public DatasetInfo(int datasetID) { + this.partitionIndexes = new HashMap<>(); this.indexes = new HashMap<>(); this.setLastAccess(-1); this.datasetID = datasetID; @@ -69,26 +73,17 @@ public class DatasetInfo extends Info implements Comparable<DatasetInfo> { notifyAll(); } - public synchronized Set<ILSMIndex> getDatasetIndexes() { - Set<ILSMIndex> datasetIndexes = new HashSet<>(); - for (IndexInfo iInfo : getIndexes().values()) { - if (iInfo.isOpen()) { - datasetIndexes.add(iInfo.getIndex()); - } - } - - return datasetIndexes; - } - - public synchronized Set<IndexInfo> getDatsetIndexInfos() { - Set<IndexInfo> infos = new HashSet<>(); - for (IndexInfo iInfo : getIndexes().values()) { - if (iInfo.isOpen()) { - infos.add(iInfo); + public synchronized Set<ILSMIndex> getDatasetPartitionOpenIndexes(int partition) { + Set<ILSMIndex> indexSet = new HashSet<>(); + Set<IndexInfo> partitionIndexInfos = this.partitionIndexes.get(partition); + if (partitionIndexInfos != null) { + for (IndexInfo iInfo : partitionIndexInfos) { + if (iInfo.isOpen()) { + indexSet.add(iInfo.getIndex()); + } } } - - return infos; + return indexSet; } @Override @@ -160,6 +155,18 @@ public class DatasetInfo extends Info implements Comparable<DatasetInfo> { return indexes; } + public synchronized void addIndex(long resourceID, IndexInfo indexInfo) { + indexes.put(resourceID, indexInfo); + partitionIndexes.computeIfAbsent(indexInfo.getPartition(), partition -> new HashSet<>()).add(indexInfo); + } + + public synchronized void removeIndex(long resourceID) { + IndexInfo info = indexes.remove(resourceID); + if (info != null) { + partitionIndexes.get(info.getPartition()).remove(info); + } + } + public boolean isRegistered() { return isRegistered; } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2f934e31/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLSMComponentIdGeneratorFactory.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLSMComponentIdGeneratorFactory.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLSMComponentIdGeneratorFactory.java index 7b8397c..83e3144 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLSMComponentIdGeneratorFactory.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLSMComponentIdGeneratorFactory.java @@ -21,9 +21,12 @@ package org.apache.asterix.common.context; import org.apache.asterix.common.api.IDatasetLifecycleManager; import org.apache.asterix.common.api.INcApplicationContext; +import org.apache.asterix.common.utils.StoragePathUtil; import org.apache.hyracks.api.application.INCServiceContext; +import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentIdGenerator; import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentIdGeneratorFactory; +import org.apache.hyracks.storage.common.IResource; /** * This factory implementation is used by AsterixDB layer so that indexes of a dataset (/partition) @@ -41,10 +44,12 @@ public class DatasetLSMComponentIdGeneratorFactory implements ILSMComponentIdGen } @Override - public ILSMComponentIdGenerator getComponentIdGenerator(INCServiceContext serviceCtx) { + public ILSMComponentIdGenerator getComponentIdGenerator(INCServiceContext serviceCtx, IResource resource) + throws HyracksDataException { IDatasetLifecycleManager dslcManager = ((INcApplicationContext) serviceCtx.getApplicationContext()).getDatasetLifecycleManager(); - return dslcManager.getComponentIdGenerator(datasetId); + int partition = StoragePathUtil.getPartitionNumFromRelativePath(resource.getPath()); + return dslcManager.getComponentIdGenerator(datasetId, partition); } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2f934e31/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java index 3a70515..1a61b8f 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java @@ -21,6 +21,7 @@ package org.apache.asterix.common.context; import java.io.IOException; import java.io.OutputStream; import java.util.ArrayList; +import java.util.Collection; import java.util.Collections; import java.util.List; import java.util.Map; @@ -139,7 +140,7 @@ public class DatasetLifecycleManager implements IDatasetLifecycleManager, ILifeC throw HyracksDataException.create(ErrorCode.INDEX_DOES_NOT_EXIST); } - PrimaryIndexOperationTracker opTracker = dsr.getOpTracker(); + PrimaryIndexOperationTracker opTracker = dsr.getOpTracker(iInfo.getPartition()); if (iInfo.getReferenceCount() != 0 || (opTracker != null && opTracker.getNumActiveOperations() != 0)) { if (LOGGER.isErrorEnabled()) { final String logMsg = String.format( @@ -155,7 +156,7 @@ public class DatasetLifecycleManager implements IDatasetLifecycleManager, ILifeC DatasetInfo dsInfo = dsr.getDatasetInfo(); dsInfo.waitForIO(); closeIndex(iInfo); - dsInfo.getIndexes().remove(resourceID); + dsInfo.removeIndex(resourceID); if (dsInfo.getReferenceCount() == 0 && dsInfo.isOpen() && dsInfo.getIndexes().isEmpty() && !dsInfo.isExternal()) { removeDatasetFromCache(dsInfo.getDatasetID()); @@ -203,10 +204,7 @@ public class DatasetLifecycleManager implements IDatasetLifecycleManager, ILifeC List<DatasetResource> datasetsResources = new ArrayList<>(datasets.values()); Collections.sort(datasetsResources); for (DatasetResource dsr : datasetsResources) { - PrimaryIndexOperationTracker opTracker = dsr.getOpTracker(); - if (opTracker != null && opTracker.getNumActiveOperations() == 0 - && dsr.getDatasetInfo().getReferenceCount() == 0 && dsr.getDatasetInfo().isOpen() - && !dsr.isMetadataDataset()) { + if (isCandidateDatasetForEviction(dsr)) { closeDataset(dsr); LOGGER.info(() -> "Evicted Dataset" + dsr.getDatasetID()); return true; @@ -215,14 +213,18 @@ public class DatasetLifecycleManager implements IDatasetLifecycleManager, ILifeC return false; } - private static void flushAndWaitForIO(DatasetInfo dsInfo, IndexInfo iInfo) throws HyracksDataException { - if (iInfo.isOpen()) { - ILSMIndexAccessor accessor = iInfo.getIndex().createAccessor(NoOpIndexAccessParameters.INSTANCE); - accessor.scheduleFlush(iInfo.getIndex().getIOOperationCallback()); + private boolean isCandidateDatasetForEviction(DatasetResource dsr) { + for (PrimaryIndexOperationTracker opTracker : dsr.getOpTrackers()) { + if (opTracker.getNumActiveOperations() != 0) { + return false; + } + } + if (dsr.getDatasetInfo().getReferenceCount() != 0 || !dsr.getDatasetInfo().isOpen() + || dsr.isMetadataDataset()) { + return false; } - // Wait for the above flush op. - dsInfo.waitForIO(); + return true; } public DatasetResource getDatasetLifecycle(int did) { @@ -234,12 +236,9 @@ public class DatasetLifecycleManager implements IDatasetLifecycleManager, ILifeC dsr = datasets.get(did); if (dsr == null) { DatasetInfo dsInfo = new DatasetInfo(did); - ILSMComponentIdGenerator idGenerator = new LSMComponentIdGenerator(); - PrimaryIndexOperationTracker opTracker = - new PrimaryIndexOperationTracker(did, logManager, dsInfo, idGenerator); DatasetVirtualBufferCaches vbcs = new DatasetVirtualBufferCaches(did, storageProperties, memoryManager.getNumPages(did), numPartitions); - dsr = new DatasetResource(dsInfo, opTracker, vbcs, idGenerator); + dsr = new DatasetResource(dsInfo, vbcs); datasets.put(did, dsr); } return dsr; @@ -318,13 +317,33 @@ public class DatasetLifecycleManager implements IDatasetLifecycleManager, ILifeC } @Override - public PrimaryIndexOperationTracker getOperationTracker(int datasetId) { - return datasets.get(datasetId).getOpTracker(); + public synchronized PrimaryIndexOperationTracker getOperationTracker(int datasetId, int partition) { + DatasetResource dataset = datasets.get(datasetId); + PrimaryIndexOperationTracker opTracker = dataset.getOpTracker(partition); + if (opTracker == null) { + populateOpTrackerAndIdGenerator(dataset, partition); + opTracker = dataset.getOpTracker(partition); + } + return opTracker; } @Override - public ILSMComponentIdGenerator getComponentIdGenerator(int datasetId) { - return datasets.get(datasetId).getIdGenerator(); + public synchronized ILSMComponentIdGenerator getComponentIdGenerator(int datasetId, int partition) { + DatasetResource dataset = datasets.get(datasetId); + ILSMComponentIdGenerator generator = dataset.getComponentIdGenerator(partition); + if (generator == null) { + populateOpTrackerAndIdGenerator(dataset, partition); + generator = dataset.getComponentIdGenerator(partition); + } + return generator; + } + + private void populateOpTrackerAndIdGenerator(DatasetResource dataset, int partition) { + ILSMComponentIdGenerator idGenerator = new LSMComponentIdGenerator(); + PrimaryIndexOperationTracker opTracker = new PrimaryIndexOperationTracker(dataset.getDatasetID(), partition, + logManager, dataset.getDatasetInfo(), idGenerator); + dataset.setPrimaryIndexOperationTracker(partition, opTracker); + dataset.setIdGenerator(partition, idGenerator); } private void validateDatasetLifecycleManagerState() throws HyracksDataException { @@ -357,31 +376,40 @@ public class DatasetLifecycleManager implements IDatasetLifecycleManager, ILifeC public synchronized void scheduleAsyncFlushForLaggingDatasets(long targetLSN) throws HyracksDataException { //schedule flush for datasets with min LSN (Log Serial Number) < targetLSN for (DatasetResource dsr : datasets.values()) { - PrimaryIndexOperationTracker opTracker = dsr.getOpTracker(); - synchronized (opTracker) { - for (IndexInfo iInfo : dsr.getIndexes().values()) { - AbstractLSMIOOperationCallback ioCallback = - (AbstractLSMIOOperationCallback) iInfo.getIndex().getIOOperationCallback(); - if (!(iInfo.getIndex().isCurrentMutableComponentEmpty() || ioCallback.hasPendingFlush() - || opTracker.isFlushLogCreated() || opTracker.isFlushOnExit())) { - long firstLSN = ioCallback.getFirstLSN(); - if (firstLSN < targetLSN) { - LOGGER.info("Checkpoint flush dataset {}", dsr.getDatasetID()); - opTracker.setFlushOnExit(true); - if (opTracker.getNumActiveOperations() == 0) { - // No Modify operations currently, we need to trigger the flush and we can do so safely - opTracker.flushIfRequested(); - } - break; - } + for (PrimaryIndexOperationTracker opTracker : dsr.getOpTrackers()) { + // check all partitions + synchronized (opTracker) { + scheduleAsyncFlushForLaggingDatasetPartition(dsr, opTracker, targetLSN); + } + } + } + } + + private void scheduleAsyncFlushForLaggingDatasetPartition(DatasetResource dsr, + PrimaryIndexOperationTracker opTracker, long targetLSN) throws HyracksDataException { + int partition = opTracker.getPartition(); + for (ILSMIndex lsmIndex : dsr.getDatasetInfo().getDatasetPartitionOpenIndexes(partition)) { + AbstractLSMIOOperationCallback ioCallback = + (AbstractLSMIOOperationCallback) lsmIndex.getIOOperationCallback(); + if (!(lsmIndex.isCurrentMutableComponentEmpty() || ioCallback.hasPendingFlush() + || opTracker.isFlushLogCreated() || opTracker.isFlushOnExit())) { + long firstLSN = ioCallback.getFirstLSN(); + if (firstLSN < targetLSN) { + LOGGER.info("Checkpoint flush dataset {} partition {}", dsr.getDatasetID(), partition); + opTracker.setFlushOnExit(true); + if (opTracker.getNumActiveOperations() == 0) { + // No Modify operations currently, we need to trigger the flush and we can do so safely + opTracker.flushIfRequested(); } + break; } } } } /* - * This method can only be called asynchronously safely if we're sure no modify operation will take place until the flush is scheduled + * This method can only be called asynchronously safely if we're sure no modify operation + * will take place until the flush is scheduled */ private void flushDatasetOpenIndexes(DatasetResource dsr, boolean asyncFlush) throws HyracksDataException { DatasetInfo dsInfo = dsr.getDatasetInfo(); @@ -389,53 +417,61 @@ public class DatasetLifecycleManager implements IDatasetLifecycleManager, ILifeC // no memory components for external dataset return; } - PrimaryIndexOperationTracker primaryOpTracker = dsr.getOpTracker(); - if (primaryOpTracker.getNumActiveOperations() > 0) { - throw new IllegalStateException( - "flushDatasetOpenIndexes is called on a dataset with currently active operations"); - } - - ILSMComponentIdGenerator idGenerator = getComponentIdGenerator(dsInfo.getDatasetID()); - idGenerator.refresh(); + for (PrimaryIndexOperationTracker primaryOpTracker : dsr.getOpTrackers()) { + // flush each partition one by one + if (primaryOpTracker.getNumActiveOperations() > 0) { + throw new IllegalStateException( + "flushDatasetOpenIndexes is called on a dataset with currently active operations"); + } + int partition = primaryOpTracker.getPartition(); + Collection<ILSMIndex> indexes = dsInfo.getDatasetPartitionOpenIndexes(partition); + ILSMComponentIdGenerator idGenerator = getComponentIdGenerator(dsInfo.getDatasetID(), partition); + idGenerator.refresh(); + + if (dsInfo.isDurable()) { + synchronized (logRecord) { + TransactionUtil.formFlushLogRecord(logRecord, dsInfo.getDatasetID(), null); + try { + logManager.log(logRecord); + } catch (ACIDException e) { + throw new HyracksDataException("could not write flush log while closing dataset", e); + } - if (dsInfo.isDurable()) { - synchronized (logRecord) { - TransactionUtil.formFlushLogRecord(logRecord, dsInfo.getDatasetID(), null); - try { - logManager.log(logRecord); - } catch (ACIDException e) { - throw new HyracksDataException("could not write flush log while closing dataset", e); + try { + //notification will come from LogBuffer class (notifyFlushTerminator) + logRecord.wait(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw HyracksDataException.create(e); + } } + } + for (ILSMIndex index : indexes) { + //update resource lsn + AbstractLSMIOOperationCallback ioOpCallback = + (AbstractLSMIOOperationCallback) index.getIOOperationCallback(); + ioOpCallback.updateLastLSN(logRecord.getLSN()); + } - try { - //notification will come from LogPage class (notifyFlushTerminator) - logRecord.wait(); - } catch (InterruptedException e) { - throw new HyracksDataException(e); + if (asyncFlush) { + for (ILSMIndex index : indexes) { + ILSMIndexAccessor accessor = index.createAccessor(NoOpIndexAccessParameters.INSTANCE); + accessor.scheduleFlush(index.getIOOperationCallback()); + } + } else { + for (ILSMIndex index : indexes) { + // TODO: This is not efficient since we flush the indexes sequentially. + // Think of a way to allow submitting the flush requests concurrently. + // We don't do them concurrently because this may lead to a deadlock scenario + // between the DatasetLifeCycleManager and the PrimaryIndexOperationTracker. + ILSMIndexAccessor accessor = index.createAccessor(NoOpIndexAccessParameters.INSTANCE); + accessor.scheduleFlush(index.getIOOperationCallback()); + // Wait for the above flush op. + dsInfo.waitForIO(); } } } - for (IndexInfo iInfo : dsInfo.getIndexes().values()) { - //update resource lsn - AbstractLSMIOOperationCallback ioOpCallback = - (AbstractLSMIOOperationCallback) iInfo.getIndex().getIOOperationCallback(); - ioOpCallback.updateLastLSN(logRecord.getLSN()); - } - - if (asyncFlush) { - for (IndexInfo iInfo : dsInfo.getIndexes().values()) { - ILSMIndexAccessor accessor = iInfo.getIndex().createAccessor(NoOpIndexAccessParameters.INSTANCE); - accessor.scheduleFlush(iInfo.getIndex().getIOOperationCallback()); - } - } else { - for (IndexInfo iInfo : dsInfo.getIndexes().values()) { - // TODO: This is not efficient since we flush the indexes sequentially. - // Think of a way to allow submitting the flush requests concurrently. We don't do them concurrently because this - // may lead to a deadlock scenario between the DatasetLifeCycleManager and the PrimaryIndexOperationTracker. - flushAndWaitForIO(dsInfo, iInfo); - } - } } private void closeDataset(DatasetResource dsr) throws HyracksDataException { http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2f934e31/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetResource.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetResource.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetResource.java index c02de7e..8dcae23 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetResource.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetResource.java @@ -18,6 +18,8 @@ */ package org.apache.asterix.common.context; +import java.util.Collection; +import java.util.HashMap; import java.util.Map; import org.apache.asterix.common.dataflow.DatasetLocalResource; @@ -41,17 +43,16 @@ import org.apache.hyracks.storage.common.LocalResource; */ public class DatasetResource implements Comparable<DatasetResource> { private final DatasetInfo datasetInfo; - private final PrimaryIndexOperationTracker datasetPrimaryOpTracker; private final DatasetVirtualBufferCaches datasetVirtualBufferCaches; - private final ILSMComponentIdGenerator datasetComponentIdGenerator; - public DatasetResource(DatasetInfo datasetInfo, PrimaryIndexOperationTracker datasetPrimaryOpTracker, - DatasetVirtualBufferCaches datasetVirtualBufferCaches, - ILSMComponentIdGenerator datasetComponentIdGenerator) { + private final Map<Integer, PrimaryIndexOperationTracker> datasetPrimaryOpTrackers; + private final Map<Integer, ILSMComponentIdGenerator> datasetComponentIdGenerators; + + public DatasetResource(DatasetInfo datasetInfo, DatasetVirtualBufferCaches datasetVirtualBufferCaches) { this.datasetInfo = datasetInfo; - this.datasetPrimaryOpTracker = datasetPrimaryOpTracker; this.datasetVirtualBufferCaches = datasetVirtualBufferCaches; - this.datasetComponentIdGenerator = datasetComponentIdGenerator; + this.datasetPrimaryOpTrackers = new HashMap<>(); + this.datasetComponentIdGenerators = new HashMap<>(); } public boolean isRegistered() { @@ -108,7 +109,8 @@ public class DatasetResource implements Comparable<DatasetResource> { if (index == null) { throw new HyracksDataException("Attempt to register a null index"); } - datasetInfo.getIndexes().put(resourceID, new IndexInfo(index, datasetInfo.getDatasetID(), resource, + + datasetInfo.addIndex(resourceID, new IndexInfo(index, datasetInfo.getDatasetID(), resource, ((DatasetLocalResource) resource.getResource()).getPartition())); } @@ -116,12 +118,31 @@ public class DatasetResource implements Comparable<DatasetResource> { return datasetInfo; } - public PrimaryIndexOperationTracker getOpTracker() { - return datasetPrimaryOpTracker; + public PrimaryIndexOperationTracker getOpTracker(int partition) { + return datasetPrimaryOpTrackers.get(partition); + } + + public Collection<PrimaryIndexOperationTracker> getOpTrackers() { + return datasetPrimaryOpTrackers.values(); + } + + public ILSMComponentIdGenerator getComponentIdGenerator(int partition) { + return datasetComponentIdGenerators.get(partition); } - public ILSMComponentIdGenerator getIdGenerator() { - return datasetComponentIdGenerator; + public void setPrimaryIndexOperationTracker(int partition, PrimaryIndexOperationTracker opTracker) { + if (datasetPrimaryOpTrackers.containsKey(partition)) { + throw new IllegalStateException( + "PrimaryIndexOperationTracker has already been set for partition " + partition); + } + datasetPrimaryOpTrackers.put(partition, opTracker); + } + + public void setIdGenerator(int partition, ILSMComponentIdGenerator idGenerator) { + if (datasetComponentIdGenerators.containsKey(partition)) { + throw new IllegalStateException("LSMComponentIdGenerator has already been set for partition " + partition); + } + datasetComponentIdGenerators.put(partition, idGenerator); } @Override http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2f934e31/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java index 5170310..1a76b66 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java @@ -43,6 +43,7 @@ import org.apache.hyracks.storage.common.ISearchOperationCallback; public class PrimaryIndexOperationTracker extends BaseOperationTracker { + private final int partition; // Number of active operations on an ILSMIndex instance. private final AtomicInteger numActiveOperations; private final ILogManager logManager; @@ -50,9 +51,10 @@ public class PrimaryIndexOperationTracker extends BaseOperationTracker { private boolean flushOnExit = false; private boolean flushLogCreated = false; - public PrimaryIndexOperationTracker(int datasetID, ILogManager logManager, DatasetInfo dsInfo, + public PrimaryIndexOperationTracker(int datasetID, int partition, ILogManager logManager, DatasetInfo dsInfo, ILSMComponentIdGenerator idGenerator) { super(datasetID, dsInfo); + this.partition = partition; this.logManager = logManager; this.numActiveOperations = new AtomicInteger(); this.idGenerator = idGenerator; @@ -100,7 +102,7 @@ public class PrimaryIndexOperationTracker extends BaseOperationTracker { // or if there is a flush scheduled by the checkpoint (flushOnExit), then schedule it boolean needsFlush = false; - Set<ILSMIndex> indexes = dsInfo.getDatasetIndexes(); + Set<ILSMIndex> indexes = dsInfo.getDatasetPartitionOpenIndexes(partition); if (!flushOnExit) { for (ILSMIndex lsmIndex : indexes) { @@ -146,7 +148,7 @@ public class PrimaryIndexOperationTracker extends BaseOperationTracker { //This method is called sequentially by LogPage.notifyFlushTerminator in the sequence flushes were scheduled. public synchronized void triggerScheduleFlush(LogRecord logRecord) throws HyracksDataException { idGenerator.refresh(); - for (ILSMIndex lsmIndex : dsInfo.getDatasetIndexes()) { + for (ILSMIndex lsmIndex : dsInfo.getDatasetPartitionOpenIndexes(partition)) { //get resource ILSMIndexAccessor accessor = lsmIndex.createAccessor(NoOpIndexAccessParameters.INSTANCE); //update resource lsn @@ -199,4 +201,8 @@ public class PrimaryIndexOperationTracker extends BaseOperationTracker { return flushLogCreated; } + public int getPartition() { + return partition; + } + } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2f934e31/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/AbstractLSMIndexIOOperationCallbackFactory.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/AbstractLSMIndexIOOperationCallbackFactory.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/AbstractLSMIndexIOOperationCallbackFactory.java index ed56ab1..5b9883c 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/AbstractLSMIndexIOOperationCallbackFactory.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/AbstractLSMIndexIOOperationCallbackFactory.java @@ -24,11 +24,13 @@ import java.io.ObjectStreamException; import org.apache.asterix.common.api.INcApplicationContext; import org.apache.asterix.common.storage.IIndexCheckpointManagerProvider; import org.apache.hyracks.api.application.INCServiceContext; +import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentId; import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentIdGenerator; import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentIdGeneratorFactory; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallbackFactory; import org.apache.hyracks.storage.am.lsm.common.impls.LSMComponentId; +import org.apache.hyracks.storage.common.IResource; public abstract class AbstractLSMIndexIOOperationCallbackFactory implements ILSMIOOperationCallbackFactory { @@ -38,17 +40,20 @@ public abstract class AbstractLSMIndexIOOperationCallbackFactory implements ILSM protected transient INCServiceContext ncCtx; + protected transient IResource resource; + public AbstractLSMIndexIOOperationCallbackFactory(ILSMComponentIdGeneratorFactory idGeneratorFactory) { this.idGeneratorFactory = idGeneratorFactory; } @Override - public void initialize(INCServiceContext ncCtx) { + public void initialize(INCServiceContext ncCtx, IResource resource) { this.ncCtx = ncCtx; + this.resource = resource; } - protected ILSMComponentIdGenerator getComponentIdGenerator() { - return idGeneratorFactory.getComponentIdGenerator(ncCtx); + protected ILSMComponentIdGenerator getComponentIdGenerator() throws HyracksDataException { + return idGeneratorFactory.getComponentIdGenerator(ncCtx, resource); } protected IIndexCheckpointManagerProvider getIndexCheckpointManagerProvider() { @@ -60,7 +65,7 @@ public abstract class AbstractLSMIndexIOOperationCallbackFactory implements ILSM private static final long serialVersionUID = 1L; @Override - public ILSMComponentIdGenerator getComponentIdGenerator(INCServiceContext serviceCtx) { + public ILSMComponentIdGenerator getComponentIdGenerator(INCServiceContext serviceCtx, IResource resource) { // used for backward compatibility // if idGeneratorFactory is not set for legacy lsm indexes, we return a default // component id generator which always generates the missing component id. http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2f934e31/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMBTreeIOOperationCallbackFactory.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMBTreeIOOperationCallbackFactory.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMBTreeIOOperationCallbackFactory.java index 95245cb..97badb2 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMBTreeIOOperationCallbackFactory.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMBTreeIOOperationCallbackFactory.java @@ -19,6 +19,7 @@ package org.apache.asterix.common.ioopcallbacks; +import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentIdGeneratorFactory; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallback; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex; @@ -32,7 +33,7 @@ public class LSMBTreeIOOperationCallbackFactory extends AbstractLSMIndexIOOperat } @Override - public ILSMIOOperationCallback createIoOpCallback(ILSMIndex index) { + public ILSMIOOperationCallback createIoOpCallback(ILSMIndex index) throws HyracksDataException { return new LSMBTreeIOOperationCallback(index, getComponentIdGenerator(), getIndexCheckpointManagerProvider()); } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2f934e31/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMBTreeWithBuddyIOOperationCallbackFactory.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMBTreeWithBuddyIOOperationCallbackFactory.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMBTreeWithBuddyIOOperationCallbackFactory.java index 6c75ed6..9b32345 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMBTreeWithBuddyIOOperationCallbackFactory.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMBTreeWithBuddyIOOperationCallbackFactory.java @@ -18,6 +18,7 @@ */ package org.apache.asterix.common.ioopcallbacks; +import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentIdGeneratorFactory; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallback; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex; @@ -31,7 +32,7 @@ public class LSMBTreeWithBuddyIOOperationCallbackFactory extends AbstractLSMInde } @Override - public ILSMIOOperationCallback createIoOpCallback(ILSMIndex index) { + public ILSMIOOperationCallback createIoOpCallback(ILSMIndex index) throws HyracksDataException { return new LSMBTreeWithBuddyIOOperationCallback(index, getComponentIdGenerator(), getIndexCheckpointManagerProvider()); } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2f934e31/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMInvertedIndexIOOperationCallbackFactory.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMInvertedIndexIOOperationCallbackFactory.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMInvertedIndexIOOperationCallbackFactory.java index fb73d19..766ef95 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMInvertedIndexIOOperationCallbackFactory.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMInvertedIndexIOOperationCallbackFactory.java @@ -19,6 +19,7 @@ package org.apache.asterix.common.ioopcallbacks; +import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentIdGeneratorFactory; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallback; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex; @@ -32,7 +33,7 @@ public class LSMInvertedIndexIOOperationCallbackFactory extends AbstractLSMIndex } @Override - public ILSMIOOperationCallback createIoOpCallback(ILSMIndex index) { + public ILSMIOOperationCallback createIoOpCallback(ILSMIndex index) throws HyracksDataException { return new LSMInvertedIndexIOOperationCallback(index, getComponentIdGenerator(), getIndexCheckpointManagerProvider()); } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2f934e31/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMRTreeIOOperationCallbackFactory.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMRTreeIOOperationCallbackFactory.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMRTreeIOOperationCallbackFactory.java index 94be0bb..3a0afa8 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMRTreeIOOperationCallbackFactory.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMRTreeIOOperationCallbackFactory.java @@ -19,6 +19,7 @@ package org.apache.asterix.common.ioopcallbacks; +import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentIdGeneratorFactory; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallback; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex; @@ -32,7 +33,7 @@ public class LSMRTreeIOOperationCallbackFactory extends AbstractLSMIndexIOOperat } @Override - public ILSMIOOperationCallback createIoOpCallback(ILSMIndex index) { + public ILSMIOOperationCallback createIoOpCallback(ILSMIndex index) throws HyracksDataException { return new LSMRTreeIOOperationCallback(index, getComponentIdGenerator(), getIndexCheckpointManagerProvider()); } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2f934e31/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ITransactionContext.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ITransactionContext.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ITransactionContext.java index c4a2d03..a3d5bc5 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ITransactionContext.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ITransactionContext.java @@ -38,11 +38,13 @@ public interface ITransactionContext { * transaction. * * @param resourceId + * @param partition * @param index * @param callback * @param primaryIndex */ - void register(long resourceId, ILSMIndex index, IModificationOperationCallback callback, boolean primaryIndex); + void register(long resourceId, int partition, ILSMIndex index, IModificationOperationCallback callback, + boolean primaryIndex); /** * Gets the unique transaction id. @@ -135,8 +137,10 @@ public interface ITransactionContext { * Called to notify the transaction that an entity commit * log belonging to this transaction has been flushed to * disk. + * + * @param partition */ - void notifyEntityCommitted(); + void notifyEntityCommitted(int partition); /** * Called after an operation is performed on index http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2f934e31/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/context/CorrelatedPrefixMergePolicyTest.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/context/CorrelatedPrefixMergePolicyTest.java b/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/context/CorrelatedPrefixMergePolicyTest.java index 3aa7b17..f9f742a 100644 --- a/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/context/CorrelatedPrefixMergePolicyTest.java +++ b/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/context/CorrelatedPrefixMergePolicyTest.java @@ -22,15 +22,14 @@ package org.apache.asterix.test.context; import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; -import java.util.HashSet; import java.util.List; import java.util.Map; -import java.util.Set; import org.apache.asterix.common.api.IDatasetLifecycleManager; import org.apache.asterix.common.context.CorrelatedPrefixMergePolicy; import org.apache.asterix.common.context.DatasetInfo; import org.apache.asterix.common.context.IndexInfo; +import org.apache.asterix.common.context.PrimaryIndexOperationTracker; import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponent.ComponentState; import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentId; @@ -60,6 +59,8 @@ public class CorrelatedPrefixMergePolicyTest extends TestCase { private final int DATASET_ID = 1; + private long nextResourceId = 0; + @Test public void testBasic() { try { @@ -183,19 +184,15 @@ public class CorrelatedPrefixMergePolicyTest extends TestCase { } } - private ILSMMergePolicy mockMergePolicy(IndexInfo... indexes) { + private ILSMMergePolicy mockMergePolicy(IndexInfo... indexInfos) { Map<String, String> properties = new HashMap<>(); properties.put("max-tolerance-component-count", String.valueOf(MAX_COMPONENT_COUNT)); properties.put("max-mergable-component-size", String.valueOf(MAX_COMPONENT_SIZE)); - Set<IndexInfo> indexInfos = new HashSet<>(); - for (IndexInfo info : indexes) { - indexInfos.add(info); + DatasetInfo dsInfo = new DatasetInfo(DATASET_ID); + for (IndexInfo index : indexInfos) { + dsInfo.addIndex(index.getResourceId(), index); } - - DatasetInfo dsInfo = Mockito.mock(DatasetInfo.class); - Mockito.when(dsInfo.getDatsetIndexInfos()).thenReturn(indexInfos); - IDatasetLifecycleManager manager = Mockito.mock(IDatasetLifecycleManager.class); Mockito.when(manager.getDatasetInfo(DATASET_ID)).thenReturn(dsInfo); @@ -238,8 +235,16 @@ public class CorrelatedPrefixMergePolicyTest extends TestCase { Mockito.when(index.createAccessor(Mockito.any(IIndexAccessParameters.class))).thenReturn(accessor); Mockito.when(index.isPrimaryIndex()).thenReturn(isPrimary); + if (isPrimary) { + PrimaryIndexOperationTracker opTracker = Mockito.mock(PrimaryIndexOperationTracker.class); + Mockito.when(opTracker.getPartition()).thenReturn(partition); + Mockito.when(index.getOperationTracker()).thenReturn(opTracker); + } final LocalResource localResource = Mockito.mock(LocalResource.class); - return new IndexInfo(index, DATASET_ID, localResource, partition); + Mockito.when(localResource.getId()).thenReturn(nextResourceId++); + IndexInfo indexInfo = new IndexInfo(index, DATASET_ID, localResource, partition); + indexInfo.setOpen(true); + return indexInfo; } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2f934e31/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataNode.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataNode.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataNode.java index 6634e51..e8f2595 100644 --- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataNode.java +++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataNode.java @@ -42,6 +42,7 @@ import org.apache.asterix.common.transactions.ITransactionSubsystem; import org.apache.asterix.common.transactions.ImmutableDatasetId; import org.apache.asterix.common.transactions.TransactionOptions; import org.apache.asterix.common.transactions.TxnId; +import org.apache.asterix.common.utils.StoragePathUtil; import org.apache.asterix.external.indexing.ExternalFile; import org.apache.asterix.formats.nontagged.SerializerDeserializerProvider; import org.apache.asterix.metadata.api.ExtensionMetadataDataset; @@ -474,7 +475,9 @@ public class MetadataNode implements IMetadataNode { IIndexAccessParameters iap = new IndexAccessParameters(modCallback, NoOpOperationCallback.INSTANCE); ILSMIndexAccessor indexAccessor = lsmIndex.createAccessor(iap); txnCtx.setWriteTxn(true); - txnCtx.register(metadataIndex.getResourceId(), lsmIndex, modCallback, metadataIndex.isPrimaryIndex()); + txnCtx.register(metadataIndex.getResourceId(), + StoragePathUtil.getPartitionNumFromRelativePath(resourceName), lsmIndex, modCallback, + metadataIndex.isPrimaryIndex()); LSMIndexUtil.checkAndSetFirstLSN((AbstractLSMIndex) lsmIndex, transactionSubsystem.getLogManager()); switch (op) { case INSERT: http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2f934e31/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataBootstrap.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataBootstrap.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataBootstrap.java index 9753bcf..9ebd21b 100644 --- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataBootstrap.java +++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataBootstrap.java @@ -63,7 +63,6 @@ import org.apache.asterix.metadata.utils.MetadataConstants; import org.apache.asterix.metadata.utils.MetadataUtil; import org.apache.asterix.om.types.IAType; import org.apache.asterix.runtime.formats.NonTaggedDataFormat; -import org.apache.asterix.runtime.utils.ClusterStateManager; import org.apache.asterix.transaction.management.opcallbacks.PrimaryIndexOperationTrackerFactory; import org.apache.asterix.transaction.management.opcallbacks.SecondaryIndexOperationTrackerFactory; import org.apache.asterix.transaction.management.resource.DatasetLocalResourceFactory; @@ -476,4 +475,5 @@ public class MetadataBootstrap { public static void setNewUniverse(boolean isNewUniverse) { MetadataBootstrap.isNewUniverse = isNewUniverse; } + } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2f934e31/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java index ea2d715..8cd7053 100644 --- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java +++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java @@ -42,6 +42,7 @@ import org.apache.asterix.common.metadata.IDataset; import org.apache.asterix.common.transactions.IRecoveryManager.ResourceType; import org.apache.asterix.common.utils.JobUtils; import org.apache.asterix.common.utils.JobUtils.ProgressState; +import org.apache.asterix.common.utils.StoragePathUtil; import org.apache.asterix.external.feed.management.FeedConnectionId; import org.apache.asterix.external.indexing.IndexingConstants; import org.apache.asterix.formats.nontagged.BinaryHashFunctionFactoryProvider; @@ -818,6 +819,10 @@ public class Dataset implements IMetadataEntity<Dataset>, IDataset { protected int[] getDatasetPartitions(MetadataProvider metadataProvider) throws AlgebricksException { FileSplit[] splitsForDataset = metadataProvider.splitsForIndex(metadataProvider.getMetadataTxnContext(), this, getDatasetName()); - return IntStream.range(0, splitsForDataset.length).toArray(); + int[] partitions = new int[splitsForDataset.length]; + for (int i = 0; i < partitions.length; i++) { + partitions[i] = StoragePathUtil.getPartitionNumFromRelativePath(splitsForDataset[i].getPath()); + } + return partitions; } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2f934e31/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/job/listener/JobEventListenerFactory.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/job/listener/JobEventListenerFactory.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/job/listener/JobEventListenerFactory.java index 7913d48..6faffc7 100644 --- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/job/listener/JobEventListenerFactory.java +++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/job/listener/JobEventListenerFactory.java @@ -18,13 +18,12 @@ */ package org.apache.asterix.runtime.job.listener; -import static org.apache.asterix.common.transactions.ITransactionManager.AtomicityLevel; - import org.apache.asterix.common.api.IJobEventListenerFactory; import org.apache.asterix.common.api.INcApplicationContext; import org.apache.asterix.common.exceptions.ACIDException; import org.apache.asterix.common.transactions.ITransactionContext; import org.apache.asterix.common.transactions.ITransactionManager; +import org.apache.asterix.common.transactions.ITransactionManager.AtomicityLevel; import org.apache.asterix.common.transactions.TransactionOptions; import org.apache.asterix.common.transactions.TxnId; import org.apache.hyracks.api.context.IHyracksJobletContext; http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2f934e31/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexModificationOperationCallbackFactory.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexModificationOperationCallbackFactory.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexModificationOperationCallbackFactory.java index d057c50..10c6b8f 100644 --- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexModificationOperationCallbackFactory.java +++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexModificationOperationCallbackFactory.java @@ -73,7 +73,7 @@ public class PrimaryIndexModificationOperationCallbackFactory extends AbstractOp IModificationOperationCallback modCallback = new PrimaryIndexModificationOperationCallback( new DatasetId(datasetId), primaryKeyFields, txnCtx, txnSubsystem.getLockManager(), txnSubsystem, resource.getId(), aResource.getPartition(), resourceType, indexOp, operatorNodePushable); - txnCtx.register(resource.getId(), index, modCallback, true); + txnCtx.register(resource.getId(), aResource.getPartition(), index, modCallback, true); return modCallback; } catch (ACIDException e) { throw HyracksDataException.create(e); http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2f934e31/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexOperationTrackerFactory.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexOperationTrackerFactory.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexOperationTrackerFactory.java index f40140a..eef1cb0 100644 --- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexOperationTrackerFactory.java +++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexOperationTrackerFactory.java @@ -21,9 +21,12 @@ package org.apache.asterix.transaction.management.opcallbacks; import org.apache.asterix.common.api.IDatasetLifecycleManager; import org.apache.asterix.common.api.INcApplicationContext; +import org.apache.asterix.common.utils.StoragePathUtil; import org.apache.hyracks.api.application.INCServiceContext; +import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.storage.am.lsm.common.api.ILSMOperationTracker; import org.apache.hyracks.storage.am.lsm.common.api.ILSMOperationTrackerFactory; +import org.apache.hyracks.storage.common.IResource; public class PrimaryIndexOperationTrackerFactory implements ILSMOperationTrackerFactory { @@ -36,10 +39,12 @@ public class PrimaryIndexOperationTrackerFactory implements ILSMOperationTracker } @Override - public ILSMOperationTracker getOperationTracker(INCServiceContext ctx) { + public ILSMOperationTracker getOperationTracker(INCServiceContext ctx, IResource resource) + throws HyracksDataException { IDatasetLifecycleManager dslcManager = ((INcApplicationContext) ctx.getApplicationContext()).getDatasetLifecycleManager(); - return dslcManager.getOperationTracker(datasetId); + int partition = StoragePathUtil.getPartitionNumFromRelativePath(resource.getPath()); + return dslcManager.getOperationTracker(datasetId, partition); } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2f934e31/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/SecondaryIndexModificationOperationCallbackFactory.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/SecondaryIndexModificationOperationCallbackFactory.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/SecondaryIndexModificationOperationCallbackFactory.java index 26e1b22..a927da0 100644 --- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/SecondaryIndexModificationOperationCallbackFactory.java +++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/SecondaryIndexModificationOperationCallbackFactory.java @@ -69,7 +69,7 @@ public class SecondaryIndexModificationOperationCallbackFactory extends Abstract IModificationOperationCallback modCallback = new SecondaryIndexModificationOperationCallback( new DatasetId(datasetId), primaryKeyFields, txnCtx, txnSubsystem.getLockManager(), txnSubsystem, resource.getId(), aResource.getPartition(), resourceType, indexOp); - txnCtx.register(resource.getId(), index, modCallback, false); + txnCtx.register(resource.getId(), aResource.getPartition(), index, modCallback, false); return modCallback; } catch (ACIDException e) { throw HyracksDataException.create(e);
