Repository: asterixdb Updated Branches: refs/heads/master f674168df -> 3fe67422d
[ASTERIXDB-2188] Ensure recovery of component ids - user model changes: no - storage format changes: yes. Flush log record format changes. - interface changes: no Details: - Add flush component ids to the flush log record. Upon seeing a flush log record during recovery, schedule a flush to all indexes in this partition s.t. LSN>maxDiskLSN to ensure component ids are properly maintained upon failed flushes. - Add a test case to ensure the correctness of the recovery logic of component ids Change-Id: I8c1fc2b209cfb9d3dafa216771d2b7032eb99e75 Reviewed-on: https://asterix-gerrit.ics.uci.edu/2408 Tested-by: Jenkins <jenk...@fulliautomatix.ics.uci.edu> Contrib: Jenkins <jenk...@fulliautomatix.ics.uci.edu> Integration-Tests: Jenkins <jenk...@fulliautomatix.ics.uci.edu> Reviewed-by: abdullah alamoudi <bamou...@gmail.com> Project: http://git-wip-us.apache.org/repos/asf/asterixdb/repo Commit: http://git-wip-us.apache.org/repos/asf/asterixdb/commit/3fe67422 Tree: http://git-wip-us.apache.org/repos/asf/asterixdb/tree/3fe67422 Diff: http://git-wip-us.apache.org/repos/asf/asterixdb/diff/3fe67422 Branch: refs/heads/master Commit: 3fe67422dd79449b0ed38ced42f8589005ee4423 Parents: f674168 Author: luochen01 <cl...@uci.edu> Authored: Fri Feb 23 09:24:30 2018 -0800 Committer: Luo Chen <cl...@uci.edu> Committed: Fri Feb 23 14:37:24 2018 -0800 ---------------------------------------------------------------------- .../apache/asterix/app/nc/RecoveryManager.java | 68 +++- .../app/bootstrap/TestNodeController.java | 67 ++-- .../test/dataflow/ComponentRollbackTest.java | 13 +- .../test/dataflow/LSMFlushRecoveryTest.java | 365 +++++++++++++++++++ .../SearchCursorComponentSwitchTest.java | 3 +- .../asterix/test/dataflow/StorageTestUtils.java | 31 +- .../common/context/DatasetLifecycleManager.java | 20 +- .../context/PrimaryIndexOperationTracker.java | 11 +- .../AbstractLSMIOOperationCallback.java | 11 +- .../asterix/common/transactions/ILogRecord.java | 13 +- .../asterix/common/transactions/LogRecord.java | 39 +- .../asterix/common/utils/TransactionUtil.java | 6 +- .../AbstractLSMIOOperationCallbackTest.java | 8 +- .../logging/RemoteLogsProcessor.java | 4 +- .../am/lsm/common/api/ILSMMemoryComponent.java | 4 +- .../impls/AbstractLSMMemoryComponent.java | 4 +- .../am/lsm/btree/impl/AllowTestOpCallback.java | 41 +++ .../storage/am/lsm/btree/impl/TestLsmBtree.java | 22 +- 18 files changed, 667 insertions(+), 63 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/asterixdb/blob/3fe67422/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java index 273d832..a8d8610 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java @@ -44,6 +44,8 @@ import java.util.stream.Collectors; import org.apache.asterix.common.api.IDatasetLifecycleManager; import org.apache.asterix.common.api.INcApplicationContext; import org.apache.asterix.common.config.ReplicationProperties; +import org.apache.asterix.common.context.DatasetInfo; +import org.apache.asterix.common.context.IndexInfo; import org.apache.asterix.common.dataflow.DatasetLocalResource; import org.apache.asterix.common.exceptions.ACIDException; import org.apache.asterix.common.ioopcallbacks.AbstractLSMIOOperationCallback; @@ -69,9 +71,14 @@ import org.apache.hyracks.api.exceptions.ErrorCode; import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.api.lifecycle.ILifeCycleComponent; import org.apache.hyracks.storage.am.common.impls.NoOpIndexAccessParameters; +import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentId; +import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentId.IdCompareResult; +import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentIdGenerator; +import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor; import org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMIndex; +import org.apache.hyracks.storage.am.lsm.common.impls.LSMComponentId; import org.apache.hyracks.storage.common.IIndex; import org.apache.hyracks.storage.common.LocalResource; import org.apache.logging.log4j.Level; @@ -284,6 +291,7 @@ public class RecoveryManager implements IRecoveryManager, ILifeCycleComponent { TxnEntityId tempKeyTxnEntityId = new TxnEntityId(-1, -1, -1, null, -1, false); ILogRecord logRecord = null; + ILSMComponentIdGenerator idGenerator = null; try { logReader.setPosition(lowWaterMarkLSN); logRecord = logReader.next(); @@ -363,10 +371,51 @@ public class RecoveryManager implements IRecoveryManager, ILifeCycleComponent { } } break; + case LogType.FLUSH: + int partition = logRecord.getResourcePartition(); + if (partitions.contains(partition)) { + int datasetId = logRecord.getDatasetId(); + idGenerator = datasetLifecycleManager.getComponentIdGenerator(datasetId, partition); + if (idGenerator == null) { + // it's possible this dataset has been dropped + logRecord = logReader.next(); + continue; + } + idGenerator.refresh(); + DatasetInfo dsInfo = datasetLifecycleManager.getDatasetInfo(datasetId); + // we only need to flush open indexes here (opened by previous update records) + // if an index has no ongoing updates, then it's memory component must be empty + // and there is nothing to flush + for (IndexInfo iInfo : dsInfo.getIndexes().values()) { + if (iInfo.isOpen()) { + maxDiskLastLsn = resourceId2MaxLSNMap.get(iInfo.getResourceId()); + index = iInfo.getIndex(); + AbstractLSMIOOperationCallback ioCallback = + (AbstractLSMIOOperationCallback) index.getIOOperationCallback(); + if (logRecord.getLSN() > maxDiskLastLsn + && !index.isCurrentMutableComponentEmpty()) { + // schedule flush + ioCallback.updateLastLSN(logRecord.getLSN()); + redoFlush(index, logRecord); + redoCount++; + } else { + if (index.isMemoryComponentsAllocated()) { + // if the memory component has been allocated, we + // force it to receive the same Id + index.getCurrentMemoryComponent().resetId(idGenerator.getId(), true); + } else { + // otherwise, we refresh the id stored in ioCallback + // to ensure the memory component receives correct Id upon activation + ioCallback.forceRefreshNextId(); + } + } + } + } + } + break; case LogType.JOB_COMMIT: case LogType.ENTITY_COMMIT: case LogType.ABORT: - case LogType.FLUSH: case LogType.WAIT: case LogType.MARKER: //do nothing @@ -736,6 +785,23 @@ public class RecoveryManager implements IRecoveryManager, ILifeCycleComponent { } } + private static void redoFlush(ILSMIndex index, ILogRecord logRecord) throws HyracksDataException { + ILSMIndexAccessor accessor = index.createAccessor(NoOpIndexAccessParameters.INSTANCE); + long minId = logRecord.getFlushingComponentMinId(); + long maxId = logRecord.getFlushingComponentMaxId(); + ILSMComponentId id = new LSMComponentId(minId, maxId); + if (!index.getDiskComponents().isEmpty()) { + ILSMDiskComponent diskComponent = index.getDiskComponents().get(0); + ILSMComponentId maxDiskComponentId = diskComponent.getId(); + if (maxDiskComponentId.compareTo(id) != IdCompareResult.LESS_THAN) { + throw new IllegalStateException("Illegal state of component Id. Max disk component Id " + + maxDiskComponentId + " should be less than redo flush component Id " + id); + } + } + index.getCurrentMemoryComponent().resetId(id, true); + accessor.scheduleFlush(index.getIOOperationCallback()); + } + private class JobEntityCommits { private static final String PARTITION_FILE_NAME_SEPARATOR = "_"; private final long txnId; http://git-wip-us.apache.org/repos/asf/asterixdb/blob/3fe67422/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java index 6c4d068..f99429c 100644 --- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java +++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java @@ -121,8 +121,8 @@ public class TestNodeController { protected static final String TEST_CONFIG_FILE_NAME = "src/main/resources/cc.conf"; protected static TransactionProperties txnProperties; - private static final boolean cleanupOnStart = true; - private static final boolean cleanupOnStop = true; + private static final boolean CLEANUP_ON_START = true; + private static final boolean CLEANUP_ON_STOP = true; // Constants public static final int DEFAULT_HYRACKS_CC_CLIENT_PORT = 1098; @@ -142,6 +142,10 @@ public class TestNodeController { } public void init() throws Exception { + init(CLEANUP_ON_START); + } + + public void init(boolean cleanupOnStart) throws Exception { try { File outdir = new File(PATH_ACTUAL); outdir.mkdirs(); @@ -157,6 +161,10 @@ public class TestNodeController { } public void deInit() throws Exception { + deInit(CLEANUP_ON_STOP); + } + + public void deInit(boolean cleanupOnStop) throws Exception { ExternalUDFLibrarian.removeLibraryDir(); ExecutionTestUtil.tearDown(cleanupOnStop); } @@ -165,6 +173,10 @@ public class TestNodeController { options.addAll(opts); } + public void clearOpts() { + options.clear(); + } + public TxnId getTxnJobId(IHyracksTaskContext ctx) { return getTxnJobId(ctx.getJobletContext().getJobId()); } @@ -241,10 +253,15 @@ public class TestNodeController { SecondaryIndexInfo secondaryIndexInfo = new SecondaryIndexInfo(primaryIndexInfo, secondaryIndex); IIndexDataflowHelperFactory secondaryIndexHelperFactory = new IndexDataflowHelperFactory( storageComponentProvider.getStorageManager(), secondaryIndexInfo.fileSplitProvider); + + IModificationOperationCallbackFactory secondaryModCallbackFactory = + dataset.getModificationCallbackFactory(storageComponentProvider, secondaryIndex, + IndexOperation.INSERT, primaryKeyIndexes); + LSMInsertDeleteOperatorNodePushable secondaryInsertOp = new LSMInsertDeleteOperatorNodePushable(ctx, ctx.getTaskAttemptId().getTaskId().getPartition(), secondaryIndexInfo.insertFieldsPermutations, secondaryIndexInfo.rDesc, op, false, - secondaryIndexHelperFactory, NoOpOperationCallbackFactory.INSTANCE, null); + secondaryIndexHelperFactory, secondaryModCallbackFactory, null); assignOp.setOutputFrameWriter(0, secondaryInsertOp, secondaryIndexInfo.rDesc); CommitRuntime commitOp = new CommitRuntime(ctx, getTxnJobId(ctx), dataset.getDatasetId(), secondaryIndexInfo.primaryKeyIndexes, true, ctx.getTaskAttemptId().getTaskId().getPartition(), @@ -465,12 +482,12 @@ public class TestNodeController { } public static class SecondaryIndexInfo { - private int[] primaryKeyIndexes; - private PrimaryIndexInfo primaryIndexInfo; - private Index secondaryIndex; - private ConstantFileSplitProvider fileSplitProvider; - private RecordDescriptor rDesc; - private int[] insertFieldsPermutations; + private final int[] primaryKeyIndexes; + private final PrimaryIndexInfo primaryIndexInfo; + private final Index secondaryIndex; + private final ConstantFileSplitProvider fileSplitProvider; + private final RecordDescriptor rDesc; + private final int[] insertFieldsPermutations; public SecondaryIndexInfo(PrimaryIndexInfo primaryIndexInfo, Index secondaryIndex) { this.primaryIndexInfo = primaryIndexInfo; @@ -504,20 +521,20 @@ public class TestNodeController { } public static class PrimaryIndexInfo { - private Dataset dataset; - private IAType[] primaryKeyTypes; - private ARecordType recordType; - private ARecordType metaType; - private ILSMMergePolicyFactory mergePolicyFactory; - private Map<String, String> mergePolicyProperties; - private int primaryIndexNumOfTupleFields; - private ITypeTraits[] primaryIndexTypeTraits; - private ISerializerDeserializer<?>[] primaryIndexSerdes; - private ConstantFileSplitProvider fileSplitProvider; - private RecordDescriptor rDesc; - private int[] primaryIndexInsertFieldsPermutations; - private int[] primaryKeyIndexes; - private Index index; + private final Dataset dataset; + private final IAType[] primaryKeyTypes; + private final ARecordType recordType; + private final ARecordType metaType; + private final ILSMMergePolicyFactory mergePolicyFactory; + private final Map<String, String> mergePolicyProperties; + private final int primaryIndexNumOfTupleFields; + private final ITypeTraits[] primaryIndexTypeTraits; + private final ISerializerDeserializer<?>[] primaryIndexSerdes; + private final ConstantFileSplitProvider fileSplitProvider; + private final RecordDescriptor rDesc; + private final int[] primaryIndexInsertFieldsPermutations; + private final int[] primaryKeyIndexes; + private final Index index; public PrimaryIndexInfo(Dataset dataset, IAType[] primaryKeyTypes, ARecordType recordType, ARecordType metaType, ILSMMergePolicyFactory mergePolicyFactory, Map<String, String> mergePolicyProperties, @@ -563,6 +580,10 @@ public class TestNodeController { return index; } + public Dataset getDataset() { + return dataset; + } + public IRecordDescriptorProvider getInsertRecordDescriptorProvider() { IRecordDescriptorProvider rDescProvider = Mockito.mock(IRecordDescriptorProvider.class); Mockito.when(rDescProvider.getInputRecordDescriptor(Mockito.any(), Mockito.anyInt())).thenReturn(rDesc); http://git-wip-us.apache.org/repos/asf/asterixdb/blob/3fe67422/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/ComponentRollbackTest.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/ComponentRollbackTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/ComponentRollbackTest.java index c6232f5..9ef531e 100644 --- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/ComponentRollbackTest.java +++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/ComponentRollbackTest.java @@ -47,6 +47,7 @@ import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference; import org.apache.hyracks.storage.am.common.api.IIndexDataflowHelper; import org.apache.hyracks.storage.am.common.dataflow.IndexDataflowHelperFactory; import org.apache.hyracks.storage.am.common.impls.NoOpIndexAccessParameters; +import org.apache.hyracks.storage.am.lsm.btree.impl.AllowTestOpCallback; import org.apache.hyracks.storage.am.lsm.btree.impl.TestLsmBtree; import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponent; import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent; @@ -282,7 +283,7 @@ public class ComponentRollbackTest { lsmAccessor.deleteComponents( c -> (c instanceof ILSMMemoryComponent && ((ILSMMemoryComponent) c).isModified())); // now that the rollback has completed, we will unblock the search - lsmBtree.addSearchCallback(StorageTestUtils.ALLOW_CALLBACK); + lsmBtree.addSearchCallback(AllowTestOpCallback.INSTANCE); lsmBtree.allowSearch(1); Assert.assertTrue(firstSearcher.result()); // search now and ensure @@ -303,7 +304,7 @@ public class ComponentRollbackTest { DiskComponentLsnPredicate pred = new DiskComponentLsnPredicate(lsn); lsmAccessor.deleteComponents(pred); // now that the rollback has completed, we will unblock the search - lsmBtree.addSearchCallback(StorageTestUtils.ALLOW_CALLBACK); + lsmBtree.addSearchCallback(AllowTestOpCallback.INSTANCE); lsmBtree.allowSearch(1); Assert.assertTrue(secondSearcher.result()); StorageTestUtils.searchAndAssertCount(nc, PARTITION, @@ -477,7 +478,7 @@ public class ComponentRollbackTest { Rollerback rollerback = new Rollerback(lsmBtree, memoryComponentsPredicate); //unblock the flush lsmBtree.allowFlush(1); - lsmBtree.addSearchCallback(StorageTestUtils.ALLOW_CALLBACK); + lsmBtree.addSearchCallback(AllowTestOpCallback.INSTANCE); lsmBtree.allowSearch(1); Assert.assertTrue(firstSearcher.result()); // ensure current mem component is not modified @@ -535,7 +536,7 @@ public class ComponentRollbackTest { // now that we enetered, we will rollback Rollerback rollerback = new Rollerback(lsmBtree, memoryComponentsPredicate); // The rollback will be waiting for the flush to complete - lsmBtree.addSearchCallback(StorageTestUtils.ALLOW_CALLBACK); + lsmBtree.addSearchCallback(AllowTestOpCallback.INSTANCE); lsmBtree.allowSearch(1); Assert.assertTrue(firstSearcher.result()); //unblock the flush @@ -606,7 +607,7 @@ public class ComponentRollbackTest { // unblock the merge lsmBtree.allowMerge(1); // unblock the search - lsmBtree.addSearchCallback(StorageTestUtils.ALLOW_CALLBACK); + lsmBtree.addSearchCallback(AllowTestOpCallback.INSTANCE); lsmBtree.allowSearch(1); Assert.assertTrue(firstSearcher.result()); rollerback.complete(); @@ -673,7 +674,7 @@ public class ComponentRollbackTest { // now that we enetered, we will rollback Rollerback rollerBack = new Rollerback(lsmBtree, new DiskComponentLsnPredicate(lsn)); // unblock the search - lsmBtree.addSearchCallback(StorageTestUtils.ALLOW_CALLBACK); + lsmBtree.addSearchCallback(AllowTestOpCallback.INSTANCE); lsmBtree.allowSearch(1); Assert.assertTrue(firstSearcher.result()); // even though rollback has been called, it is still waiting for the merge to complete http://git-wip-us.apache.org/repos/asf/asterixdb/blob/3fe67422/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/LSMFlushRecoveryTest.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/LSMFlushRecoveryTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/LSMFlushRecoveryTest.java new file mode 100644 index 0000000..b10e9b1 --- /dev/null +++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/LSMFlushRecoveryTest.java @@ -0,0 +1,365 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.test.dataflow; + +import java.io.File; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.Semaphore; + +import org.apache.asterix.app.bootstrap.TestNodeController; +import org.apache.asterix.app.bootstrap.TestNodeController.PrimaryIndexInfo; +import org.apache.asterix.app.bootstrap.TestNodeController.SecondaryIndexInfo; +import org.apache.asterix.app.data.gen.TupleGenerator; +import org.apache.asterix.app.nc.NCAppRuntimeContext; +import org.apache.asterix.common.api.IDatasetLifecycleManager; +import org.apache.asterix.common.config.DatasetConfig.IndexType; +import org.apache.asterix.common.config.StorageProperties.Option; +import org.apache.asterix.common.dataflow.LSMInsertDeleteOperatorNodePushable; +import org.apache.asterix.common.transactions.ITransactionContext; +import org.apache.asterix.common.transactions.ITransactionManager; +import org.apache.asterix.common.transactions.TransactionOptions; +import org.apache.asterix.external.util.DataflowUtils; +import org.apache.asterix.metadata.entities.Dataset; +import org.apache.asterix.metadata.entities.Index; +import org.apache.asterix.om.types.BuiltinType; +import org.apache.asterix.om.types.IAType; +import org.apache.asterix.test.common.TestHelper; +import org.apache.commons.lang3.tuple.Pair; +import org.apache.hyracks.api.comm.VSizeFrame; +import org.apache.hyracks.api.config.IOption; +import org.apache.hyracks.api.context.IHyracksTaskContext; +import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.api.job.JobId; +import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender; +import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference; +import org.apache.hyracks.storage.am.common.api.IIndexDataflowHelper; +import org.apache.hyracks.storage.am.common.dataflow.IndexDataflowHelperFactory; +import org.apache.hyracks.storage.am.lsm.btree.impl.AllowTestOpCallback; +import org.apache.hyracks.storage.am.lsm.btree.impl.ITestOpCallback; +import org.apache.hyracks.storage.am.lsm.btree.impl.TestLsmBtree; +import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent; +import org.apache.hyracks.storage.am.lsm.common.api.ILSMMemoryComponent; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; + +public class LSMFlushRecoveryTest { + + private static TestNodeController nc; + private static Dataset dataset; + private static PrimaryIndexInfo primaryIndexInfo; + private static SecondaryIndexInfo secondaryIndexInfo; + private static TestLsmBtree primaryIndex; + private static TestLsmBtree secondaryIndex; + private static Index secondaryIndexEntity; + private static NCAppRuntimeContext ncAppCtx; + private static IDatasetLifecycleManager dsLifecycleMgr; + + private static IHyracksTaskContext ctx; + private static IIndexDataflowHelper primaryIndexDataflowHelper; + private static IIndexDataflowHelper secondaryIndexDataflowHelper; + private static ITransactionContext txnCtx; + private static LSMInsertDeleteOperatorNodePushable insertOp; + private static final int PARTITION = 0; + private static TupleGenerator tupleGenerator; + + private static final String SECONDARY_INDEX_NAME = "TestIdx"; + private static final IndexType SECONDARY_INDEX_TYPE = IndexType.BTREE; + private static final List<List<String>> SECONDARY_INDEX_FIELD_NAMES = + Arrays.asList(Arrays.asList(StorageTestUtils.RECORD_TYPE.getFieldNames()[1])); + private static final List<Integer> SECONDARY_INDEX_FIELD_INDICATORS = Arrays.asList(Index.RECORD_INDICATOR); + private static final List<IAType> SECONDARY_INDEX_FIELD_TYPES = Arrays.asList(BuiltinType.AINT64); + + @BeforeClass + public static void setUp() throws Exception { + System.out.println("SetUp: "); + TestHelper.deleteExistingInstanceFiles(); + String configPath = System.getProperty("user.dir") + File.separator + "src" + File.separator + "test" + + File.separator + "resources" + File.separator + "cc.conf"; + nc = new TestNodeController(configPath, false); + } + + @Before + public void initializeTest() throws Exception { + // initialize NC before each test + initializeNc(true); + initializeTestCtx(); + createIndex(); + readIndex(); + insertOp = StorageTestUtils.getInsertPipeline(nc, ctx, secondaryIndexEntity); + tupleGenerator = StorageTestUtils.getTupleGenerator(); + } + + @After + public void testRecovery() { + try { + // right now we've inserted 1000 records to the index, and each record is at least 12 bytes. + // thus, the memory component size is at least 12KB. + List<Pair<IOption, Object>> opts = new ArrayList<>(); + opts.add(Pair.of(Option.STORAGE_MEMORYCOMPONENT_GLOBALBUDGET, "128MB")); + opts.add(Pair.of(Option.STORAGE_MAX_ACTIVE_WRITABLE_DATASETS, "10000")); + nc.setOpts(opts); + nc.init(false); + initializeTestCtx(); + readIndex(); + checkComponentIds(); + insertOp = StorageTestUtils.getInsertPipeline(nc, ctx, secondaryIndexEntity); + // insert more records + insertRecords(StorageTestUtils.TOTAL_NUM_OF_RECORDS, StorageTestUtils.RECORDS_PER_COMPONENT); + checkComponentIds(); + + dropIndex(); + // cleanup after each test case + nc.deInit(true); + nc.clearOpts(); + } catch (Throwable e) { + e.printStackTrace(); + Assert.fail(e.getMessage()); + } + } + + private void initializeNc(boolean cleanUpOnStart) throws Exception { + nc.init(cleanUpOnStart); + ncAppCtx = nc.getAppRuntimeContext(); + dsLifecycleMgr = ncAppCtx.getDatasetLifecycleManager(); + } + + private void createIndex() throws Exception { + primaryIndexInfo = StorageTestUtils.createPrimaryIndex(nc, PARTITION); + dataset = primaryIndexInfo.getDataset(); + secondaryIndexEntity = new Index(dataset.getDataverseName(), dataset.getDatasetName(), SECONDARY_INDEX_NAME, + SECONDARY_INDEX_TYPE, SECONDARY_INDEX_FIELD_NAMES, SECONDARY_INDEX_FIELD_INDICATORS, + SECONDARY_INDEX_FIELD_TYPES, false, false, false, 0); + secondaryIndexInfo = nc.createSecondaryIndex(primaryIndexInfo, secondaryIndexEntity, + StorageTestUtils.STORAGE_MANAGER, PARTITION); + } + + private void initializeTestCtx() throws Exception { + JobId jobId = nc.newJobId(); + ctx = nc.createTestContext(jobId, PARTITION, false); + txnCtx = nc.getTransactionManager().beginTransaction(nc.getTxnJobId(ctx), + new TransactionOptions(ITransactionManager.AtomicityLevel.ENTITY_LEVEL)); + } + + private void readIndex() throws HyracksDataException { + IndexDataflowHelperFactory primaryHelperFactory = + new IndexDataflowHelperFactory(nc.getStorageManager(), primaryIndexInfo.getFileSplitProvider()); + primaryIndexDataflowHelper = primaryHelperFactory.create(ctx.getJobletContext().getServiceContext(), PARTITION); + primaryIndexDataflowHelper.open(); + primaryIndex = (TestLsmBtree) primaryIndexDataflowHelper.getIndexInstance(); + primaryIndexDataflowHelper.close(); + + IndexDataflowHelperFactory secodnaryIHelperFactory = + new IndexDataflowHelperFactory(nc.getStorageManager(), secondaryIndexInfo.getFileSplitProvider()); + secondaryIndexDataflowHelper = + secodnaryIHelperFactory.create(ctx.getJobletContext().getServiceContext(), PARTITION); + secondaryIndexDataflowHelper.open(); + secondaryIndex = (TestLsmBtree) secondaryIndexDataflowHelper.getIndexInstance(); + secondaryIndexDataflowHelper.close(); + } + + private void dropIndex() throws HyracksDataException { + primaryIndexDataflowHelper.destroy(); + secondaryIndexDataflowHelper.destroy(); + } + + @Test + public void testBothFlushSucceed() throws Exception { + insertRecords(StorageTestUtils.TOTAL_NUM_OF_RECORDS, StorageTestUtils.RECORDS_PER_COMPONENT); + // shutdown the server + nc.deInit(false); + } + + @Test + public void testSecondaryFlushFails() throws Exception { + insertRecords(StorageTestUtils.TOTAL_NUM_OF_RECORDS, StorageTestUtils.RECORDS_PER_COMPONENT); + + primaryIndex.clearFlushCallbacks(); + secondaryIndex.clearFlushCallbacks(); + + Semaphore primaryFlushSemaphore = new Semaphore(0); + secondaryIndex.addFlushCallback(new ITestOpCallback<Semaphore>() { + @Override + public void before(Semaphore t) throws HyracksDataException { + throw new HyracksDataException("Kill the flush thread"); + } + + @Override + public void after() throws HyracksDataException { + + } + }); + + primaryIndex.addFlushCallback(AllowTestOpCallback.INSTANCE); + primaryIndex.addIoAfterFinalizeCallback(new ITestOpCallback<Void>() { + @Override + public void before(Void t) throws HyracksDataException { + + } + + @Override + public void after() throws HyracksDataException { + primaryFlushSemaphore.release(); + } + }); + StorageTestUtils.flush(dsLifecycleMgr, primaryIndex, true); + + primaryFlushSemaphore.acquire(); + List<ILSMDiskComponent> primaryComponents = primaryIndex.getDiskComponents(); + List<ILSMDiskComponent> secondaryComponents = secondaryIndex.getDiskComponents(); + Assert.assertEquals(primaryComponents.size(), secondaryComponents.size() + 1); + // shutdown the NC + nc.deInit(false); + } + + @Test + public void testPrimaryFlushFails() throws Exception { + insertRecords(StorageTestUtils.TOTAL_NUM_OF_RECORDS, StorageTestUtils.RECORDS_PER_COMPONENT); + + primaryIndex.clearFlushCallbacks(); + secondaryIndex.clearFlushCallbacks(); + + Semaphore secondaryFlushSemaphore = new Semaphore(0); + + primaryIndex.addFlushCallback(new ITestOpCallback<Semaphore>() { + @Override + public void before(Semaphore t) throws HyracksDataException { + throw new HyracksDataException("Kill the flush thread"); + } + + @Override + public void after() throws HyracksDataException { + + } + }); + + secondaryIndex.addFlushCallback(AllowTestOpCallback.INSTANCE); + secondaryIndex.addIoAfterFinalizeCallback(new ITestOpCallback<Void>() { + @Override + public void before(Void t) throws HyracksDataException { + + } + + @Override + public void after() throws HyracksDataException { + secondaryFlushSemaphore.release(); + } + }); + StorageTestUtils.flush(dsLifecycleMgr, primaryIndex, true); + + secondaryFlushSemaphore.acquire(); + List<ILSMDiskComponent> primaryComponents = primaryIndex.getDiskComponents(); + List<ILSMDiskComponent> secondaryComponents = secondaryIndex.getDiskComponents(); + Assert.assertEquals(secondaryComponents.size(), primaryComponents.size() + 1); + // shutdown the NC + nc.deInit(false); + } + + @Test + public void testBothFlushFail() throws Exception { + insertRecords(StorageTestUtils.TOTAL_NUM_OF_RECORDS, StorageTestUtils.RECORDS_PER_COMPONENT); + + primaryIndex.clearFlushCallbacks(); + secondaryIndex.clearFlushCallbacks(); + + Semaphore primaryFlushSemaphore = new Semaphore(0); + Semaphore secondaryFlushSemaphore = new Semaphore(0); + + primaryIndex.addFlushCallback(new ITestOpCallback<Semaphore>() { + @Override + public void before(Semaphore t) throws HyracksDataException { + primaryFlushSemaphore.release(); + throw new HyracksDataException("Kill the flush thread"); + } + + @Override + public void after() throws HyracksDataException { + + } + }); + + secondaryIndex.addFlushCallback(new ITestOpCallback<Semaphore>() { + @Override + public void before(Semaphore t) throws HyracksDataException { + secondaryFlushSemaphore.release(); + throw new HyracksDataException("Kill the fluhs thread"); + } + + @Override + public void after() throws HyracksDataException { + + } + }); + StorageTestUtils.flush(dsLifecycleMgr, primaryIndex, true); + + primaryFlushSemaphore.acquire(); + secondaryFlushSemaphore.acquire(); + List<ILSMDiskComponent> primaryComponents = primaryIndex.getDiskComponents(); + List<ILSMDiskComponent> secondaryComponents = secondaryIndex.getDiskComponents(); + Assert.assertEquals(secondaryComponents.size(), primaryComponents.size()); + // shutdown the NC + nc.deInit(false); + } + + private void insertRecords(int totalNumRecords, int recordsPerComponent) throws Exception { + StorageTestUtils.allowAllOps(primaryIndex); + StorageTestUtils.allowAllOps(secondaryIndex); + insertOp.open(); + VSizeFrame frame = new VSizeFrame(ctx); + FrameTupleAppender tupleAppender = new FrameTupleAppender(frame); + for (int i = 0; i < totalNumRecords; i++) { + // flush every RECORDS_PER_COMPONENT records + if (i % recordsPerComponent == 0 && i + 1 != totalNumRecords) { + if (tupleAppender.getTupleCount() > 0) { + tupleAppender.write(insertOp, true); + } + StorageTestUtils.flush(dsLifecycleMgr, primaryIndex, false); + } + ITupleReference tuple = tupleGenerator.next(); + DataflowUtils.addTupleToFrame(tupleAppender, tuple, insertOp); + } + if (tupleAppender.getTupleCount() > 0) { + tupleAppender.write(insertOp, true); + } + insertOp.close(); + nc.getTransactionManager().commitTransaction(txnCtx.getTxnId()); + } + + private void checkComponentIds() throws HyracksDataException { + // check memory component + if (primaryIndex.isMemoryComponentsAllocated()) { + ILSMMemoryComponent primaryMemComponent = primaryIndex.getCurrentMemoryComponent(); + ILSMMemoryComponent secondaryMemComponent = secondaryIndex.getCurrentMemoryComponent(); + Assert.assertEquals(primaryMemComponent.getId(), secondaryMemComponent.getId()); + } + + List<ILSMDiskComponent> primaryDiskComponents = primaryIndex.getDiskComponents(); + List<ILSMDiskComponent> secondaryDiskComponents = secondaryIndex.getDiskComponents(); + + Assert.assertEquals(primaryDiskComponents.size(), secondaryDiskComponents.size()); + for (int i = 0; i < primaryDiskComponents.size(); i++) { + Assert.assertEquals(primaryDiskComponents.get(i).getId(), secondaryDiskComponents.get(i).getId()); + } + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/asterixdb/blob/3fe67422/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/SearchCursorComponentSwitchTest.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/SearchCursorComponentSwitchTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/SearchCursorComponentSwitchTest.java index c452548..e4373f6 100644 --- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/SearchCursorComponentSwitchTest.java +++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/SearchCursorComponentSwitchTest.java @@ -57,6 +57,7 @@ import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference; import org.apache.hyracks.storage.am.common.api.IIndexDataflowHelper; import org.apache.hyracks.storage.am.common.dataflow.IndexDataflowHelperFactory; import org.apache.hyracks.storage.am.common.impls.NoOpIndexAccessParameters; +import org.apache.hyracks.storage.am.lsm.btree.impl.AllowTestOpCallback; import org.apache.hyracks.storage.am.lsm.btree.impl.TestLsmBtree; import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor; @@ -147,7 +148,7 @@ public class SearchCursorComponentSwitchTest { } void unblockSearch(TestLsmBtree lsmBtree) { - lsmBtree.addSearchCallback(StorageTestUtils.ALLOW_CALLBACK); + lsmBtree.addSearchCallback(AllowTestOpCallback.INSTANCE); lsmBtree.allowSearch(1); } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/3fe67422/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/StorageTestUtils.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/StorageTestUtils.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/StorageTestUtils.java index e7a455c..d9f2e20 100644 --- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/StorageTestUtils.java +++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/StorageTestUtils.java @@ -57,6 +57,7 @@ import org.apache.hyracks.api.job.JobId; import org.apache.hyracks.api.test.CountAnswer; import org.apache.hyracks.api.test.FrameWriterTestUtils; import org.apache.hyracks.api.test.FrameWriterTestUtils.FrameWriterOperation; +import org.apache.hyracks.storage.am.lsm.btree.impl.AllowTestOpCallback; import org.apache.hyracks.storage.am.lsm.btree.impl.ITestOpCallback; import org.apache.hyracks.storage.am.lsm.btree.impl.TestLsmBtree; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex; @@ -92,25 +93,20 @@ public class StorageTestUtils { NoMergePolicyFactory.NAME, null, new InternalDatasetDetails(null, PartitioningStrategy.HASH, PARTITIONING_KEYS, null, null, null, false, null), null, DatasetType.INTERNAL, DATASET_ID, 0); - public static final ITestOpCallback<Semaphore> ALLOW_CALLBACK = new ITestOpCallback<Semaphore>() { - @Override - public void before(Semaphore smeaphore) { - smeaphore.release(); - } - - @Override - public void after() { - } - }; private StorageTestUtils() { } static void allowAllOps(TestLsmBtree lsmBtree) { - lsmBtree.addModifyCallback(ALLOW_CALLBACK); - lsmBtree.addFlushCallback(ALLOW_CALLBACK); - lsmBtree.addSearchCallback(ALLOW_CALLBACK); - lsmBtree.addMergeCallback(ALLOW_CALLBACK); + lsmBtree.clearModifyCallbacks(); + lsmBtree.clearFlushCallbacks(); + lsmBtree.clearSearchCallbacks(); + lsmBtree.clearMergeCallbacks(); + + lsmBtree.addModifyCallback(AllowTestOpCallback.INSTANCE); + lsmBtree.addFlushCallback(AllowTestOpCallback.INSTANCE); + lsmBtree.addSearchCallback(AllowTestOpCallback.INSTANCE); + lsmBtree.addMergeCallback(AllowTestOpCallback.INSTANCE); } public static PrimaryIndexInfo createPrimaryIndex(TestNodeController nc, int partition) @@ -121,8 +117,13 @@ public class StorageTestUtils { public static LSMInsertDeleteOperatorNodePushable getInsertPipeline(TestNodeController nc, IHyracksTaskContext ctx) throws HyracksDataException, RemoteException, ACIDException, AlgebricksException { + return getInsertPipeline(nc, ctx, null); + } + + public static LSMInsertDeleteOperatorNodePushable getInsertPipeline(TestNodeController nc, IHyracksTaskContext ctx, + Index secondaryIndex) throws HyracksDataException, RemoteException, ACIDException, AlgebricksException { return nc.getInsertPipeline(ctx, DATASET, KEY_TYPES, RECORD_TYPE, META_TYPE, null, KEY_INDEXES, - KEY_INDICATORS_LIST, STORAGE_MANAGER, null).getLeft(); + KEY_INDICATORS_LIST, STORAGE_MANAGER, secondaryIndex).getLeft(); } public static TupleGenerator getTupleGenerator() { http://git-wip-us.apache.org/repos/asf/asterixdb/blob/3fe67422/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java index 1a61b8f..9de8f73 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java @@ -49,6 +49,7 @@ import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor; import org.apache.hyracks.storage.am.lsm.common.api.ILSMOperationTracker; import org.apache.hyracks.storage.am.lsm.common.api.IVirtualBufferCache; +import org.apache.hyracks.storage.am.lsm.common.impls.LSMComponentId; import org.apache.hyracks.storage.am.lsm.common.impls.LSMComponentIdGenerator; import org.apache.hyracks.storage.common.IIndex; import org.apache.hyracks.storage.common.ILocalResourceRepository; @@ -330,6 +331,9 @@ public class DatasetLifecycleManager implements IDatasetLifecycleManager, ILifeC @Override public synchronized ILSMComponentIdGenerator getComponentIdGenerator(int datasetId, int partition) { DatasetResource dataset = datasets.get(datasetId); + if (dataset == null) { + return null; + } ILSMComponentIdGenerator generator = dataset.getComponentIdGenerator(partition); if (generator == null) { populateOpTrackerAndIdGenerator(dataset, partition); @@ -425,12 +429,26 @@ public class DatasetLifecycleManager implements IDatasetLifecycleManager, ILifeC } int partition = primaryOpTracker.getPartition(); Collection<ILSMIndex> indexes = dsInfo.getDatasetPartitionOpenIndexes(partition); + ILSMIndex flushIndex = null; + for (ILSMIndex lsmIndex : indexes) { + if (!lsmIndex.isCurrentMutableComponentEmpty()) { + flushIndex = lsmIndex; + break; + } + } + if (flushIndex == null) { + // all open indexes are empty, nothing to flush + continue; + } + LSMComponentId componentId = (LSMComponentId) flushIndex.getCurrentMemoryComponent().getId(); ILSMComponentIdGenerator idGenerator = getComponentIdGenerator(dsInfo.getDatasetID(), partition); idGenerator.refresh(); if (dsInfo.isDurable()) { + synchronized (logRecord) { - TransactionUtil.formFlushLogRecord(logRecord, dsInfo.getDatasetID(), null); + TransactionUtil.formFlushLogRecord(logRecord, dsInfo.getDatasetID(), partition, + componentId.getMinId(), componentId.getMaxId(), null); try { logManager.log(logRecord); } catch (ACIDException e) { http://git-wip-us.apache.org/repos/asf/asterixdb/blob/3fe67422/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java index f7f2806..8ed4bb6 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java @@ -38,6 +38,7 @@ import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor; import org.apache.hyracks.storage.am.lsm.common.api.ILSMMemoryComponent; import org.apache.hyracks.storage.am.lsm.common.api.ILSMOperationTracker; import org.apache.hyracks.storage.am.lsm.common.api.LSMOperationType; +import org.apache.hyracks.storage.am.lsm.common.impls.LSMComponentId; import org.apache.hyracks.storage.common.IModificationOperationCallback; import org.apache.hyracks.storage.common.ISearchOperationCallback; @@ -117,6 +118,7 @@ public class PrimaryIndexOperationTracker extends BaseOperationTracker { if (needsFlush || flushOnExit) { //Make the current mutable components READABLE_UNWRITABLE to stop coming modify operations from entering them until the current flush is scheduled. + LSMComponentId primaryId = null; for (ILSMIndex lsmIndex : indexes) { ILSMOperationTracker opTracker = lsmIndex.getOperationTracker(); synchronized (opTracker) { @@ -124,8 +126,14 @@ public class PrimaryIndexOperationTracker extends BaseOperationTracker { if (memComponent.getState() == ComponentState.READABLE_WRITABLE && memComponent.isModified()) { memComponent.setState(ComponentState.READABLE_UNWRITABLE); } + if (lsmIndex.isPrimaryIndex()) { + primaryId = (LSMComponentId) memComponent.getId(); + } } } + if (primaryId == null) { + throw new IllegalStateException("Primary index not found in dataset " + dsInfo.getDatasetID()); + } LogRecord logRecord = new LogRecord(); flushOnExit = false; if (dsInfo.isDurable()) { @@ -133,7 +141,8 @@ public class PrimaryIndexOperationTracker extends BaseOperationTracker { * Generate a FLUSH log. * Flush will be triggered when the log is written to disk by LogFlusher. */ - TransactionUtil.formFlushLogRecord(logRecord, datasetID, this); + TransactionUtil.formFlushLogRecord(logRecord, datasetID, partition, primaryId.getMinId(), + primaryId.getMaxId(), this); try { logManager.log(logRecord); } catch (ACIDException e) { http://git-wip-us.apache.org/repos/asf/asterixdb/blob/3fe67422/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/AbstractLSMIOOperationCallback.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/AbstractLSMIOOperationCallback.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/AbstractLSMIOOperationCallback.java index bacebf1..412981c 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/AbstractLSMIOOperationCallback.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/AbstractLSMIOOperationCallback.java @@ -213,6 +213,13 @@ public abstract class AbstractLSMIOOperationCallback implements ILSMIOOperationC } } + /** + * Used during the recovery process to force refresh the next component id + */ + public void forceRefreshNextId() { + nextComponentIds[writeIndex] = idGenerator.getId(); + } + public synchronized void setFirstLSN(long firstLSN) { // We make sure that this method is only called on an empty component so the first LSN is not set incorrectly firstLSNs[writeIndex] = firstLSN; @@ -258,7 +265,7 @@ public abstract class AbstractLSMIOOperationCallback implements ILSMIOOperationC @Override public void recycled(ILSMMemoryComponent component, boolean componentSwitched) throws HyracksDataException { ILSMComponentId componentId = getLSMComponentId(); - component.resetId(componentId); + component.resetId(componentId, false); if (componentSwitched) { recycleIndex = (recycleIndex + 1) % nextComponentIds.length; } @@ -269,7 +276,7 @@ public abstract class AbstractLSMIOOperationCallback implements ILSMIOOperationC if (component == lsmIndex.getCurrentMemoryComponent()) { // only set the component id for the first (current) memory component ILSMComponentId componentId = getLSMComponentId(); - component.resetId(componentId); + component.resetId(componentId, false); } } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/3fe67422/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ILogRecord.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ILogRecord.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ILogRecord.java index 7ddfdfb..e58a6fa 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ILogRecord.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ILogRecord.java @@ -46,6 +46,8 @@ public interface ILogRecord { int SEQ_NUM_LEN = Long.BYTES; int TYPE_LEN = Byte.BYTES; int UUID_LEN = Long.BYTES; + int FLUSHING_COMPONENT_MINID_LEN = Long.BYTES; + int FLUSHING_COMPONENT_MAXID_LEN = Long.BYTES; int ALL_RECORD_HEADER_LEN = LOG_SOURCE_LEN + TYPE_LEN + TxnId.BYTES; int ENTITYCOMMIT_UPDATE_HEADER_LEN = RS_PARTITION_LEN + DatasetId.BYTES + PKHASH_LEN + PKSZ_LEN; @@ -55,7 +57,8 @@ public interface ILogRecord { int JOB_TERMINATE_LOG_SIZE = ALL_RECORD_HEADER_LEN + CHKSUM_LEN; int ENTITY_COMMIT_LOG_BASE_SIZE = ALL_RECORD_HEADER_LEN + ENTITYCOMMIT_UPDATE_HEADER_LEN + CHKSUM_LEN; int UPDATE_LOG_BASE_SIZE = ENTITY_COMMIT_LOG_BASE_SIZE + UPDATE_LSN_HEADER + UPDATE_BODY_HEADER; - int FLUSH_LOG_SIZE = ALL_RECORD_HEADER_LEN + DatasetId.BYTES + CHKSUM_LEN; + int FLUSH_LOG_SIZE = ALL_RECORD_HEADER_LEN + DS_LEN + RS_PARTITION_LEN + FLUSHING_COMPONENT_MINID_LEN + + FLUSHING_COMPONENT_MAXID_LEN + CHKSUM_LEN; int WAIT_LOG_SIZE = ALL_RECORD_HEADER_LEN + CHKSUM_LEN; int MARKER_BASE_LOG_SIZE = ALL_RECORD_HEADER_LEN + CHKSUM_LEN + DS_LEN + RS_PARTITION_LEN + PRVLSN_LEN + LOGRCD_SZ_LEN; @@ -176,4 +179,12 @@ public interface ILogRecord { * @return the flag */ boolean isReplicate(); + + long getFlushingComponentMinId(); + + void setFlushingComponentMinId(long flushingComponentMinId); + + long getFlushingComponentMaxId(); + + void setFlushingComponentMaxId(long flushingComponentMaxId); } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/3fe67422/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/LogRecord.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/LogRecord.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/LogRecord.java index 743a3fe..7e61266 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/LogRecord.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/LogRecord.java @@ -81,6 +81,8 @@ public class LogRecord implements ILogRecord { private long checksum; private long prevMarkerLSN; private ByteBuffer marker; + private long flushingComponentMinId; + private long flushingComponentMaxId; // ------------- fields in a log record (end) --------------// private final ILogMarkerCallback callback; // A callback for log mark operations private int PKFieldCnt; @@ -141,6 +143,9 @@ public class LogRecord implements ILogRecord { break; case LogType.FLUSH: buffer.putInt(datasetId); + buffer.putInt(resourcePartition); + buffer.putLong(flushingComponentMinId); + buffer.putLong(flushingComponentMaxId); break; case LogType.MARKER: buffer.putInt(datasetId); @@ -238,13 +243,23 @@ public class LogRecord implements ILogRecord { txnId = buffer.getLong(); switch (logType) { case LogType.FLUSH: - if (buffer.remaining() < ILogRecord.DS_LEN) { + if (buffer.remaining() < DS_LEN + RS_PARTITION_LEN + FLUSHING_COMPONENT_MINID_LEN + + FLUSHING_COMPONENT_MAXID_LEN) { return RecordReadStatus.TRUNCATED; } datasetId = buffer.getInt(); + resourcePartition = buffer.getInt(); + flushingComponentMinId = buffer.getLong(); + flushingComponentMaxId = buffer.getLong(); resourceId = 0l; - // fall throuh + computeAndSetLogSize(); + break; case LogType.WAIT: + if (buffer.remaining() < ILogRecord.DS_LEN) { + return RecordReadStatus.TRUNCATED; + } + datasetId = buffer.getInt(); + resourceId = 0l; computeAndSetLogSize(); break; case LogType.JOB_COMMIT: @@ -710,4 +725,24 @@ public class LogRecord implements ILogRecord { public void setRequester(ILogRequester requester) { this.requester = requester; } + + @Override + public long getFlushingComponentMinId() { + return flushingComponentMinId; + } + + @Override + public void setFlushingComponentMinId(long flushingComponentMinId) { + this.flushingComponentMinId = flushingComponentMinId; + } + + @Override + public long getFlushingComponentMaxId() { + return flushingComponentMaxId; + } + + @Override + public void setFlushingComponentMaxId(long flushingComponentMaxId) { + this.flushingComponentMaxId = flushingComponentMaxId; + } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/3fe67422/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/TransactionUtil.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/TransactionUtil.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/TransactionUtil.java index c3af0f3..690eeb6 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/TransactionUtil.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/TransactionUtil.java @@ -45,10 +45,14 @@ public class TransactionUtil { logRecord.computeAndSetLogSize(); } - public static void formFlushLogRecord(LogRecord logRecord, int datasetId, PrimaryIndexOperationTracker opTracker) { + public static void formFlushLogRecord(LogRecord logRecord, int datasetId, int resourcePartition, + long flushingComponentMinId, long flushingComponentMaxId, PrimaryIndexOperationTracker opTracker) { logRecord.setLogType(LogType.FLUSH); logRecord.setTxnId(-1); logRecord.setDatasetId(datasetId); + logRecord.setResourcePartition(resourcePartition); + logRecord.setFlushingComponentMinId(flushingComponentMinId); + logRecord.setFlushingComponentMaxId(flushingComponentMaxId); logRecord.setOpTracker(opTracker); logRecord.computeAndSetLogSize(); } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/3fe67422/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/ioopcallbacks/AbstractLSMIOOperationCallbackTest.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/ioopcallbacks/AbstractLSMIOOperationCallbackTest.java b/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/ioopcallbacks/AbstractLSMIOOperationCallbackTest.java index 1c57d1b..2ab5b4e 100644 --- a/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/ioopcallbacks/AbstractLSMIOOperationCallbackTest.java +++ b/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/ioopcallbacks/AbstractLSMIOOperationCallbackTest.java @@ -263,9 +263,11 @@ public abstract class AbstractLSMIOOperationCallbackTest extends TestCase { private void checkMemoryComponent(ILSMComponentId expected, ILSMMemoryComponent memoryComponent) throws HyracksDataException { - ArgumentCaptor<ILSMComponentId> argument = ArgumentCaptor.forClass(ILSMComponentId.class); - Mockito.verify(memoryComponent).resetId(argument.capture()); - assertEquals(expected, argument.getValue()); + ArgumentCaptor<ILSMComponentId> idArgument = ArgumentCaptor.forClass(ILSMComponentId.class); + ArgumentCaptor<Boolean> forceArgument = ArgumentCaptor.forClass(Boolean.class); + Mockito.verify(memoryComponent).resetId(idArgument.capture(), forceArgument.capture()); + assertEquals(expected, idArgument.getValue()); + assertEquals(false, forceArgument.getValue().booleanValue()); Mockito.reset(memoryComponent); } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/3fe67422/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/logging/RemoteLogsProcessor.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/logging/RemoteLogsProcessor.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/logging/RemoteLogsProcessor.java index 5009614..6189e37 100644 --- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/logging/RemoteLogsProcessor.java +++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/logging/RemoteLogsProcessor.java @@ -67,7 +67,9 @@ public class RemoteLogsProcessor implements ILogRequester { break; case LogType.FLUSH: RemoteLogRecord flushLog = new RemoteLogRecord(); - TransactionUtil.formFlushLogRecord(flushLog, reusableLog.getDatasetId(), null); + TransactionUtil.formFlushLogRecord(flushLog, reusableLog.getDatasetId(), + reusableLog.getResourcePartition(), reusableLog.getFlushingComponentMinId(), + reusableLog.getFlushingComponentMaxId(), null); flushLog.setRequester(this); flushLog.setLogSource(LogSource.REMOTE); flushLog.setMasterLsn(reusableLog.getLSN()); http://git-wip-us.apache.org/repos/asf/asterixdb/blob/3fe67422/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMMemoryComponent.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMMemoryComponent.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMMemoryComponent.java index c72d402..4ff6377 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMMemoryComponent.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMMemoryComponent.java @@ -107,7 +107,9 @@ public interface ILSMMemoryComponent extends ILSMComponent { * Reset the component Id of the memory component after it's recycled * * @param newId + * @param force + * Whether to force reset the Id to skip sanity checks * @throws HyracksDataException */ - void resetId(ILSMComponentId newId) throws HyracksDataException; + void resetId(ILSMComponentId newId, boolean force) throws HyracksDataException; } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/3fe67422/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMMemoryComponent.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMMemoryComponent.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMMemoryComponent.java index 3fbef18..9596495 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMMemoryComponent.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMMemoryComponent.java @@ -296,8 +296,8 @@ public abstract class AbstractLSMMemoryComponent extends AbstractLSMComponent im } @Override - public void resetId(ILSMComponentId componentId) throws HyracksDataException { - if (this.componentId != null && !componentId.missing() // for backward compatibility + public void resetId(ILSMComponentId componentId, boolean force) throws HyracksDataException { + if (!force && this.componentId != null && !componentId.missing() // for backward compatibility && this.componentId.compareTo(componentId) != IdCompareResult.LESS_THAN) { throw new IllegalStateException( this + " receives illegal id. Old id " + this.componentId + ", new id " + componentId); http://git-wip-us.apache.org/repos/asf/asterixdb/blob/3fe67422/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/impl/AllowTestOpCallback.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/impl/AllowTestOpCallback.java b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/impl/AllowTestOpCallback.java new file mode 100644 index 0000000..19a9872 --- /dev/null +++ b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/impl/AllowTestOpCallback.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hyracks.storage.am.lsm.btree.impl; + +import java.util.concurrent.Semaphore; + +import org.apache.hyracks.api.exceptions.HyracksDataException; + +public class AllowTestOpCallback implements ITestOpCallback<Semaphore> { + + public static final ITestOpCallback<Semaphore> INSTANCE = new AllowTestOpCallback(); + + private AllowTestOpCallback() { + } + + @Override + public void before(Semaphore t) throws HyracksDataException { + t.release(); + } + + @Override + public void after() throws HyracksDataException { + + } +} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/3fe67422/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/impl/TestLsmBtree.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/impl/TestLsmBtree.java b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/impl/TestLsmBtree.java index 3c781a6..1d4b7d6 100644 --- a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/impl/TestLsmBtree.java +++ b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/impl/TestLsmBtree.java @@ -53,6 +53,7 @@ import org.apache.hyracks.util.trace.ITracer; public class TestLsmBtree extends LSMBTree { // Semaphores are used to control operations + // Operations are allowed by default. private final Semaphore modifySemaphore = new Semaphore(0); private final Semaphore searchSemaphore = new Semaphore(0); private final Semaphore flushSemaphore = new Semaphore(0); @@ -91,6 +92,11 @@ public class TestLsmBtree extends LSMBTree { filterFrameFactory, filterManager, bloomFilterFalsePositiveRate, fieldCount, cmpFactories, mergePolicy, opTracker, ioScheduler, ioOperationCallbackFactory, needKeyDupCheck, btreeFields, filterFields, durable, updateAware, tracer); + + addModifyCallback(AllowTestOpCallback.INSTANCE); + addSearchCallback(AllowTestOpCallback.INSTANCE); + addFlushCallback(AllowTestOpCallback.INSTANCE); + addMergeCallback(AllowTestOpCallback.INSTANCE); } @Override @@ -226,13 +232,13 @@ public class TestLsmBtree extends LSMBTree { } public void addModifyCallback(ITestOpCallback<Semaphore> modifyCallback) { - synchronized (mergeCallbacks) { + synchronized (modifyCallbacks) { modifyCallbacks.add(modifyCallback); } } public void clearModifyCallbacks() { - synchronized (mergeCallbacks) { + synchronized (modifyCallbacks) { modifyCallbacks.clear(); } } @@ -329,6 +335,18 @@ public class TestLsmBtree extends LSMBTree { } } + public void addIoAfterFinalizeCallback(ITestOpCallback<Void> callback) { + synchronized (ioAfterFinalizeCallbacks) { + ioAfterFinalizeCallbacks.add(callback); + } + } + + public void clearIoAfterFinalizeCallbacks() { + synchronized (ioAfterFinalizeCallbacks) { + ioAfterFinalizeCallbacks.clear(); + } + } + @Override public void allocateMemoryComponents() throws HyracksDataException { synchronized (allocateComponentCallbacks) {