This is an automated email from the ASF dual-hosted git repository.
peeyush pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/asterixdb.git
The following commit(s) were added to refs/heads/master by this push:
new b9ce5f159d [NO ISSUE][MTD][TX] Implement atomic metadata transactions
without WAL
b9ce5f159d is described below
commit b9ce5f159d83cda175aa789b42e197b081deb1f0
Author: Peeyush Gupta <[email protected]>
AuthorDate: Wed Jul 26 13:13:41 2023 -0700
[NO ISSUE][MTD][TX] Implement atomic metadata transactions without WAL
- user model changes: no
- storage format changes: no
- interface changes: yes
Details:
- With this change, metadata transactions without WAL are made atomic on
cloud deployments.
- Some refactoring related to global transaction manager.
Change-Id: I282e0e4ca8a9bff68fa88613b9d34b14bc2b764c
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/17657
Reviewed-by: Peeyush Gupta <[email protected]>
Reviewed-by: Murtadha Hubail <[email protected]>
Tested-by: Peeyush Gupta <[email protected]>
---
.../org/apache/asterix/app/cc/GlobalTxManager.java | 3 +-
.../org/apache/asterix/app/nc/RecoveryManager.java | 9 ++
.../apache/asterix/test/txn/LogManagerTest.java | 2 +-
.../asterix/common/cluster/IGlobalTxManager.java | 3 +-
.../LSMInsertDeleteOperatorNodePushable.java | 4 +-
.../common/messaging/AtomicJobPreparedMessage.java | 7 +-
.../common/transactions/ITransactionManager.java | 5 +
.../asterix/common/utils/StorageConstants.java | 1 +
.../org/apache/asterix/metadata/MetadataNode.java | 10 ++
.../metadata/bootstrap/MetadataBootstrap.java | 2 +-
.../LSMPrimaryInsertOperatorNodePushable.java | 4 +-
.../LSMPrimaryUpsertOperatorNodePushable.java | 4 +-
.../transaction/AtomicNoWALTransactionContext.java | 108 ++++++++++++++++++++-
.../transaction/MetadataAtomicTransactionLog.java | 64 ++++++++++++
.../transaction/TransactionContextFactory.java | 5 +-
.../service/transaction/TransactionManager.java | 31 +++++-
16 files changed, 237 insertions(+), 25 deletions(-)
diff --git
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/cc/GlobalTxManager.java
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/cc/GlobalTxManager.java
index d69376aabf..f6456dc942 100644
---
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/cc/GlobalTxManager.java
+++
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/cc/GlobalTxManager.java
@@ -104,8 +104,7 @@ public class GlobalTxManager implements IGlobalTxManager {
}
@Override
- public void handleJobPreparedMessage(JobId jobId, String nodeId, int
datasetId,
- Map<String, ILSMComponentId> componentIdMap) {
+ public void handleJobPreparedMessage(JobId jobId, String nodeId,
Map<String, ILSMComponentId> componentIdMap) {
IGlobalTransactionContext context = txnContextRepository.get(jobId);
if (context == null) {
LOGGER.warn("JobPreparedMessage received for jobId " + jobId
diff --git
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java
index f6eb123df3..54942509e3 100644
---
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java
+++
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java
@@ -61,6 +61,7 @@ import
org.apache.asterix.common.transactions.ITransactionContext;
import org.apache.asterix.common.transactions.ITransactionSubsystem;
import org.apache.asterix.common.transactions.LogType;
import org.apache.asterix.common.transactions.TxnId;
+import org.apache.asterix.common.utils.StorageConstants;
import
org.apache.asterix.transaction.management.opcallbacks.AbstractIndexModificationOperationCallback;
import
org.apache.asterix.transaction.management.resource.PersistentLocalResourceRepository;
import org.apache.asterix.transaction.management.service.logging.LogManager;
@@ -157,6 +158,9 @@ public class RecoveryManager implements IRecoveryManager,
ILifeCycleComponent {
@Override
public void startLocalRecovery(Set<Integer> partitions) throws
IOException, ACIDException {
state = SystemState.RECOVERING;
+ if (appCtx.isCloudDeployment()) {
+ doMetadataRecovery();
+ }
LOGGER.info("starting recovery for partitions {}", partitions);
long readableSmallestLSN = logMgr.getReadableSmallestLSN();
Checkpoint checkpointObject = checkpointManager.getLatest();
@@ -171,6 +175,11 @@ public class RecoveryManager implements IRecoveryManager,
ILifeCycleComponent {
replayPartitionsLogs(partitions, logMgr.getLogReader(true),
lowWaterMarkLSN, true);
}
+ public synchronized void doMetadataRecovery() {
+ LOGGER.info("starting recovery for metadata partition {}",
StorageConstants.METADATA_PARTITION);
+
appCtx.getTransactionSubsystem().getTransactionManager().rollbackMetadataTransactionsWithoutWAL();
+ }
+
public synchronized void replayPartitionsLogs(Set<Integer> partitions,
ILogReader logReader, long lowWaterMarkLSN,
boolean closeOnFlushRedo) throws IOException, ACIDException {
try {
diff --git
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/txn/LogManagerTest.java
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/txn/LogManagerTest.java
index fb08fe35a8..cea3832a57 100644
---
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/txn/LogManagerTest.java
+++
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/txn/LogManagerTest.java
@@ -175,7 +175,7 @@ public class LogManagerTest {
LogRecord logRecord = new LogRecord();
final long txnId = 1;
logRecord.setTxnCtx(TransactionContextFactory.create(new TxnId(txnId),
- new
TransactionOptions(ITransactionManager.AtomicityLevel.ENTITY_LEVEL)));
+ new
TransactionOptions(ITransactionManager.AtomicityLevel.ENTITY_LEVEL), ncAppCtx));
logRecord.setLogSource(LogSource.LOCAL);
logRecord.setLogType(LogType.WAIT);
logRecord.setTxnId(txnId);
diff --git
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/IGlobalTxManager.java
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/IGlobalTxManager.java
index 498d174d3f..956ae9ecf7 100644
---
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/IGlobalTxManager.java
+++
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/IGlobalTxManager.java
@@ -46,8 +46,7 @@ public interface IGlobalTxManager extends
IJobLifecycleListener {
IGlobalTransactionContext getTransactionContext(JobId jobId) throws
ACIDException;
- void handleJobPreparedMessage(JobId jobId, String nodeId, int datasetId,
- Map<String, ILSMComponentId> componentIdMap);
+ void handleJobPreparedMessage(JobId jobId, String nodeId, Map<String,
ILSMComponentId> componentIdMap);
void handleJobCompletionMessage(JobId jobId, String nodeId);
diff --git
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/LSMInsertDeleteOperatorNodePushable.java
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/LSMInsertDeleteOperatorNodePushable.java
index 64748ce923..1a23ca857c 100644
---
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/LSMInsertDeleteOperatorNodePushable.java
+++
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/LSMInsertDeleteOperatorNodePushable.java
@@ -249,7 +249,6 @@ public class LSMInsertDeleteOperatorNodePushable extends
LSMIndexInsertUpdateDel
private void commitAtomicInsertDelete() throws HyracksDataException {
if (isPrimary) {
final Map<String, ILSMComponentId> componentIdMap = new
HashMap<>();
- int datasetID = -1;
boolean atomic = false;
for (IIndex index : indexes) {
if (((ILSMIndex) index).isAtomic()) {
@@ -259,14 +258,13 @@ public class LSMInsertDeleteOperatorNodePushable extends
LSMIndexInsertUpdateDel
for (Map.Entry<String, FlushOperation> entry :
opTracker.getLastFlushOperation().entrySet()) {
componentIdMap.put(entry.getKey(),
entry.getValue().getFlushingComponent().getId());
}
- datasetID = opTracker.getDatasetInfo().getDatasetID();
atomic = true;
}
}
if (atomic) {
AtomicJobPreparedMessage message = new
AtomicJobPreparedMessage(ctx.getJobletContext().getJobId(),
-
ctx.getJobletContext().getServiceContext().getNodeId(), datasetID,
componentIdMap);
+
ctx.getJobletContext().getServiceContext().getNodeId(), componentIdMap);
try {
((NodeControllerService)
ctx.getJobletContext().getServiceContext().getControllerService())
.sendRealTimeApplicationMessageToCC(ctx.getJobletContext().getJobId().getCcId(),
diff --git
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/AtomicJobPreparedMessage.java
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/AtomicJobPreparedMessage.java
index 8adbf49da6..b4832ff4fc 100644
---
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/AtomicJobPreparedMessage.java
+++
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/AtomicJobPreparedMessage.java
@@ -38,20 +38,17 @@ public class AtomicJobPreparedMessage implements
ICcAddressedMessage {
private static final long serialVersionUID = 1L;
private final JobId jobId;
private final String nodeId;
- private final int datasetId;
private final Map<String, ILSMComponentId> componentIdMap;
- public AtomicJobPreparedMessage(JobId jobId, String nodeId, int datasetId,
- Map<String, ILSMComponentId> componentIdMap) {
+ public AtomicJobPreparedMessage(JobId jobId, String nodeId, Map<String,
ILSMComponentId> componentIdMap) {
this.nodeId = nodeId;
- this.datasetId = datasetId;
this.componentIdMap = componentIdMap;
this.jobId = jobId;
}
@Override
public void handle(ICcApplicationContext appCtx) throws
HyracksDataException, InterruptedException {
- appCtx.getGlobalTxManager().handleJobPreparedMessage(jobId, nodeId,
datasetId, componentIdMap);
+ appCtx.getGlobalTxManager().handleJobPreparedMessage(jobId, nodeId,
componentIdMap);
}
}
diff --git
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ITransactionManager.java
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ITransactionManager.java
index 30c693d219..66b5359944 100644
---
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ITransactionManager.java
+++
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ITransactionManager.java
@@ -112,4 +112,9 @@ public interface ITransactionManager {
*/
void ensureMaxTxnId(long txnId);
+ /**
+ * Rollback incomplete metadata transactions without WAL during recovery.
+ */
+ void rollbackMetadataTransactionsWithoutWAL();
+
}
diff --git
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StorageConstants.java
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StorageConstants.java
index 1321c96306..ed9c48e68b 100644
---
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StorageConstants.java
+++
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StorageConstants.java
@@ -30,6 +30,7 @@ import
org.apache.hyracks.storage.am.lsm.common.impls.ConcurrentMergePolicyFacto
*/
public class StorageConstants {
+ public static final String METADATA_TXN_NOWAL_DIR_NAME = "mtd-txn-logs";
public static final String STORAGE_ROOT_DIR_NAME = "storage";
public static final String INGESTION_LOGS_DIR_NAME = "ingestion_logs";
public static final String PARTITION_DIR_PREFIX = "partition_";
diff --git
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataNode.java
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataNode.java
index 522fd32556..b40a4eb07e 100644
---
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataNode.java
+++
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataNode.java
@@ -128,6 +128,7 @@ import
org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer;
import org.apache.hyracks.api.exceptions.ErrorCode;
import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.util.HyracksConstants;
import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleReference;
import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
@@ -1742,6 +1743,12 @@ public class MetadataNode implements IMetadataNode {
return sb.toString();
}
+ private void setAtomicOpContext(IIndexAccessor accessor) {
+ Map<String, Object> indexAccessorOpContextParameters = new HashMap<>();
+
indexAccessorOpContextParameters.put(HyracksConstants.ATOMIC_OP_CONTEXT, true);
+ ((ILSMIndexAccessor)
accessor).getOpContext().setParameters(indexAccessorOpContextParameters);
+ }
+
private <T> void searchIndex(TxnId txnId, IMetadataIndex index,
ITupleReference searchKey,
IValueExtractor<T> valueExtractor, List<T> results) throws
AlgebricksException, HyracksDataException {
IBinaryComparatorFactory[] comparatorFactories =
index.getKeyBinaryComparatorFactory();
@@ -1753,6 +1760,9 @@ public class MetadataNode implements IMetadataNode {
IIndex indexInstance = datasetLifecycleManager.get(resourceName);
datasetLifecycleManager.open(resourceName);
IIndexAccessor indexAccessor =
indexInstance.createAccessor(NoOpIndexAccessParameters.INSTANCE);
+ if (atomicNoWAL) {
+ setAtomicOpContext(indexAccessor);
+ }
try {
IBinaryComparator[] searchCmps = null;
MultiComparator searchCmp = null;
diff --git
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataBootstrap.java
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataBootstrap.java
index c1eba7c0cd..be935d0fce 100644
---
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataBootstrap.java
+++
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataBootstrap.java
@@ -433,7 +433,7 @@ public class MetadataBootstrap {
StorageConstants.DEFAULT_COMPACTION_POLICY_PROPERTIES, true,
bloomFilterKeyFields,
bloomFilterFalsePositiveRate, true, null,
NoOpCompressorDecompressorFactory.INSTANCE, true,
TypeTraitProvider.INSTANCE.getTypeTrait(BuiltinType.ANULL),
NullIntrospector.INSTANCE,
- false, false);
+ false, appContext.isCloudDeployment());
DatasetLocalResourceFactory dsLocalResourceFactory =
new DatasetLocalResourceFactory(datasetId,
lsmBtreeFactory);
// TODO(amoudi) Creating the index should be done through the same
code path as
diff --git
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryInsertOperatorNodePushable.java
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryInsertOperatorNodePushable.java
index ec016bcf9d..2c89b614b1 100644
---
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryInsertOperatorNodePushable.java
+++
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryInsertOperatorNodePushable.java
@@ -352,7 +352,6 @@ public class LSMPrimaryInsertOperatorNodePushable extends
LSMIndexInsertUpdateDe
private void commitAtomicInsert() throws HyracksDataException {
final Map<String, ILSMComponentId> componentIdMap = new HashMap<>();
- int datasetID = -1;
boolean atomic = false;
for (IIndex index : indexes) {
if (((ILSMIndex) index).isAtomic()) {
@@ -362,14 +361,13 @@ public class LSMPrimaryInsertOperatorNodePushable extends
LSMIndexInsertUpdateDe
for (Map.Entry<String, FlushOperation> entry :
opTracker.getLastFlushOperation().entrySet()) {
componentIdMap.put(entry.getKey(),
entry.getValue().getFlushingComponent().getId());
}
- datasetID = opTracker.getDatasetInfo().getDatasetID();
atomic = true;
}
}
if (atomic) {
AtomicJobPreparedMessage message = new
AtomicJobPreparedMessage(ctx.getJobletContext().getJobId(),
- ctx.getJobletContext().getServiceContext().getNodeId(),
datasetID, componentIdMap);
+ ctx.getJobletContext().getServiceContext().getNodeId(),
componentIdMap);
try {
((NodeControllerService)
ctx.getJobletContext().getServiceContext().getControllerService())
.sendRealTimeApplicationMessageToCC(ctx.getJobletContext().getJobId().getCcId(),
diff --git
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryUpsertOperatorNodePushable.java
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryUpsertOperatorNodePushable.java
index de95d60593..6b9d3ebaf2 100644
---
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryUpsertOperatorNodePushable.java
+++
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryUpsertOperatorNodePushable.java
@@ -570,7 +570,6 @@ public class LSMPrimaryUpsertOperatorNodePushable extends
LSMIndexInsertUpdateDe
// TODO: Refactor and remove duplicated code
private void commitAtomicUpsert() throws HyracksDataException {
final Map<String, ILSMComponentId> componentIdMap = new HashMap<>();
- int datasetID = -1;
boolean atomic = false;
for (IIndex index : indexes) {
if (((ILSMIndex) index).isAtomic()) {
@@ -580,14 +579,13 @@ public class LSMPrimaryUpsertOperatorNodePushable extends
LSMIndexInsertUpdateDe
for (Map.Entry<String, FlushOperation> entry :
opTracker.getLastFlushOperation().entrySet()) {
componentIdMap.put(entry.getKey(),
entry.getValue().getFlushingComponent().getId());
}
- datasetID = opTracker.getDatasetInfo().getDatasetID();
atomic = true;
}
}
if (atomic) {
AtomicJobPreparedMessage message = new
AtomicJobPreparedMessage(ctx.getJobletContext().getJobId(),
- ctx.getJobletContext().getServiceContext().getNodeId(),
datasetID, componentIdMap);
+ ctx.getJobletContext().getServiceContext().getNodeId(),
componentIdMap);
try {
((NodeControllerService)
ctx.getJobletContext().getServiceContext().getControllerService())
.sendRealTimeApplicationMessageToCC(ctx.getJobletContext().getJobId().getCcId(),
diff --git
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/AtomicNoWALTransactionContext.java
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/AtomicNoWALTransactionContext.java
index 3576ba1388..aa698beecc 100644
---
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/AtomicNoWALTransactionContext.java
+++
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/AtomicNoWALTransactionContext.java
@@ -18,25 +18,46 @@
*/
package org.apache.asterix.transaction.management.service.transaction;
+import java.nio.file.Paths;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
+import org.apache.asterix.common.api.IDatasetLifecycleManager;
+import org.apache.asterix.common.api.INcApplicationContext;
+import org.apache.asterix.common.context.IndexInfo;
import org.apache.asterix.common.context.PrimaryIndexOperationTracker;
import org.apache.asterix.common.dataflow.LSMIndexUtil;
import org.apache.asterix.common.exceptions.ACIDException;
+import org.apache.asterix.common.storage.IIndexCheckpointManager;
+import org.apache.asterix.common.storage.IIndexCheckpointManagerProvider;
+import org.apache.asterix.common.storage.IndexCheckpoint;
+import org.apache.asterix.common.storage.ResourceReference;
import org.apache.asterix.common.transactions.ITransactionManager;
import org.apache.asterix.common.transactions.LogRecord;
import org.apache.asterix.common.transactions.TxnId;
+import org.apache.asterix.common.utils.StorageConstants;
import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.io.FileReference;
+import org.apache.hyracks.api.io.IIOManager;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentId;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMOperationTracker;
import org.apache.hyracks.storage.am.lsm.common.impls.FlushOperation;
import org.apache.hyracks.util.annotations.ThreadSafe;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+
@ThreadSafe
public class AtomicNoWALTransactionContext extends AtomicTransactionContext {
- public AtomicNoWALTransactionContext(TxnId txnId) {
+ private final INcApplicationContext appCtx;
+ private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+
+ public AtomicNoWALTransactionContext(TxnId txnId, INcApplicationContext
appCtx) {
super(txnId);
+ this.appCtx = appCtx;
}
@Override
@@ -59,7 +80,7 @@ public class AtomicNoWALTransactionContext extends
AtomicTransactionContext {
for (ILSMOperationTracker opTrackerRef : modifiedIndexes) {
PrimaryIndexOperationTracker primaryIndexOpTracker =
(PrimaryIndexOperationTracker) opTrackerRef;
try {
- primaryIndexOpTracker.deleteMemoryComponent(true);
+ primaryIndexOpTracker.abort();
} catch (HyracksDataException e) {
throw new ACIDException(e);
}
@@ -68,19 +89,102 @@ public class AtomicNoWALTransactionContext extends
AtomicTransactionContext {
private void ensureDurable() {
List<FlushOperation> flushes = new ArrayList<>();
+ List<Integer> datasetIds = new ArrayList<>();
+ Map<String, ILSMComponentId> resourceMap = new HashMap<>();
LogRecord dummyLogRecord = new LogRecord();
try {
for (ILSMOperationTracker opTrackerRef : modifiedIndexes) {
PrimaryIndexOperationTracker primaryIndexOpTracker =
(PrimaryIndexOperationTracker) opTrackerRef;
primaryIndexOpTracker.triggerScheduleFlush(dummyLogRecord);
flushes.addAll(primaryIndexOpTracker.getScheduledFlushes());
+
datasetIds.add(primaryIndexOpTracker.getDatasetInfo().getDatasetID());
+ for (Map.Entry<String, FlushOperation> entry :
primaryIndexOpTracker.getLastFlushOperation()
+ .entrySet()) {
+ resourceMap.put(entry.getKey(),
entry.getValue().getFlushingComponent().getId());
+ }
}
LSMIndexUtil.waitFor(flushes);
+ persistLogFile(datasetIds, resourceMap);
+ } catch (Exception e) {
+ deleteUncommittedRecords();
+ throw new ACIDException(e);
+ }
+ try {
+ commit();
+ } catch (Exception e) {
+ rollback(resourceMap);
+ throw new ACIDException(e);
+ } finally {
+ deleteLogFile();
+ }
+ enableMerge();
+ }
+
+ private void persistLogFile(List<Integer> datasetIds, Map<String,
ILSMComponentId> resourceMap)
+ throws HyracksDataException, JsonProcessingException {
+ IIOManager ioManager = appCtx.getIoManager();
+ FileReference fref =
ioManager.resolve(Paths.get(StorageConstants.METADATA_TXN_NOWAL_DIR_NAME,
+ StorageConstants.PARTITION_DIR_PREFIX +
StorageConstants.METADATA_PARTITION,
+ String.format("%s.log", txnId)).toString());
+ MetadataAtomicTransactionLog txnLog = new
MetadataAtomicTransactionLog(txnId, datasetIds,
+ appCtx.getServiceContext().getNodeId(), resourceMap);
+ ioManager.overwrite(fref,
OBJECT_MAPPER.writerWithDefaultPrettyPrinter().writeValueAsString(txnLog).getBytes());
+ }
+
+ public void deleteLogFile() {
+ IIOManager ioManager = appCtx.getIoManager();
+ try {
+ FileReference fref =
ioManager.resolve(Paths.get(StorageConstants.METADATA_TXN_NOWAL_DIR_NAME,
+ StorageConstants.PARTITION_DIR_PREFIX +
StorageConstants.METADATA_PARTITION,
+ String.format("%s.log", txnId)).toString());
+ ioManager.delete(fref);
} catch (HyracksDataException e) {
throw new ACIDException(e);
}
}
+ private void commit() throws HyracksDataException {
+ for (ILSMOperationTracker opTrackerRef : modifiedIndexes) {
+ PrimaryIndexOperationTracker primaryIndexOpTracker =
(PrimaryIndexOperationTracker) opTrackerRef;
+ primaryIndexOpTracker.commit();
+ }
+ }
+
+ private void enableMerge() {
+ for (ILSMOperationTracker opTrackerRef : modifiedIndexes) {
+ PrimaryIndexOperationTracker primaryIndexOpTracker =
(PrimaryIndexOperationTracker) opTrackerRef;
+ for (IndexInfo indexInfo :
primaryIndexOpTracker.getDatasetInfo().getIndexes().values()) {
+ if (indexInfo.getIndex().isPrimaryIndex()) {
+ try {
+
indexInfo.getIndex().getMergePolicy().diskComponentAdded(indexInfo.getIndex(),
false);
+ } catch (HyracksDataException e) {
+ throw new ACIDException(e);
+ }
+ }
+ }
+ }
+ }
+
+ public void rollback(Map<String, ILSMComponentId> resourceMap) {
+ deleteUncommittedRecords();
+ IDatasetLifecycleManager datasetLifecycleManager =
appCtx.getDatasetLifecycleManager();
+ IIndexCheckpointManagerProvider indexCheckpointManagerProvider =
+ datasetLifecycleManager.getIndexCheckpointManagerProvider();
+ resourceMap.forEach((k, v) -> {
+ try {
+ IIndexCheckpointManager checkpointManager =
indexCheckpointManagerProvider.get(ResourceReference.of(k));
+ if (checkpointManager.getCheckpointCount() > 0) {
+ IndexCheckpoint checkpoint = checkpointManager.getLatest();
+ if (checkpoint.getLastComponentId() == v.getMaxId()) {
+ checkpointManager.deleteLatest(v.getMaxId(), 1);
+ }
+ }
+ } catch (HyracksDataException e) {
+ throw new ACIDException(e);
+ }
+ });
+ }
+
@Override
public boolean hasWAL() {
return false;
diff --git
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/MetadataAtomicTransactionLog.java
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/MetadataAtomicTransactionLog.java
new file mode 100644
index 0000000000..7b3af3f76a
--- /dev/null
+++
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/MetadataAtomicTransactionLog.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.transaction.management.service.transaction;
+
+import java.util.List;
+import java.util.Map;
+
+import org.apache.asterix.common.transactions.TxnId;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentId;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class MetadataAtomicTransactionLog {
+
+ private TxnId txnId;
+ private List<Integer> datasetIds;
+ private String nodeId;
+ private Map<String, ILSMComponentId> resourceMap;
+
+ @JsonCreator
+ public MetadataAtomicTransactionLog(@JsonProperty("txnId") TxnId txnId,
+ @JsonProperty("datasetIds") List<Integer> datasetIds,
@JsonProperty("nodeId") String nodeId,
+ @JsonProperty("resourceMap") Map<String, ILSMComponentId>
resourceMap) {
+ this.txnId = txnId;
+ this.datasetIds = datasetIds;
+ this.nodeId = nodeId;
+ this.resourceMap = resourceMap;
+ }
+
+ public TxnId getTxnId() {
+ return txnId;
+ }
+
+ public List<Integer> getDatasetIds() {
+ return datasetIds;
+ }
+
+ public String getNodeId() {
+ return nodeId;
+ }
+
+ public Map<String, ILSMComponentId> getResourceMap() {
+ return resourceMap;
+ }
+}
diff --git
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/TransactionContextFactory.java
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/TransactionContextFactory.java
index 1076086346..e1e2ca8156 100644
---
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/TransactionContextFactory.java
+++
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/TransactionContextFactory.java
@@ -20,6 +20,7 @@ package
org.apache.asterix.transaction.management.service.transaction;
import static
org.apache.asterix.common.transactions.ITransactionManager.AtomicityLevel;
+import org.apache.asterix.common.api.INcApplicationContext;
import org.apache.asterix.common.transactions.ITransactionContext;
import org.apache.asterix.common.transactions.TransactionOptions;
import org.apache.asterix.common.transactions.TxnId;
@@ -29,13 +30,13 @@ public class TransactionContextFactory {
private TransactionContextFactory() {
}
- public static ITransactionContext create(TxnId txnId, TransactionOptions
options) {
+ public static ITransactionContext create(TxnId txnId, TransactionOptions
options, INcApplicationContext appCtx) {
final AtomicityLevel atomicityLevel = options.getAtomicityLevel();
switch (atomicityLevel) {
case ATOMIC:
return new AtomicTransactionContext(txnId);
case ATOMIC_NO_WAL:
- return new AtomicNoWALTransactionContext(txnId);
+ return new AtomicNoWALTransactionContext(txnId, appCtx);
case ENTITY_LEVEL:
return new EntityLevelTransactionContext(txnId);
default:
diff --git
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/TransactionManager.java
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/TransactionManager.java
index 8bbfbfdb85..ea7fcb89c8 100644
---
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/TransactionManager.java
+++
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/TransactionManager.java
@@ -22,6 +22,7 @@ import static
org.apache.asterix.transaction.management.service.transaction.Tran
import java.io.IOException;
import java.io.OutputStream;
+import java.nio.file.Paths;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
@@ -34,14 +35,19 @@ import
org.apache.asterix.common.transactions.ITransactionSubsystem;
import org.apache.asterix.common.transactions.LogRecord;
import org.apache.asterix.common.transactions.TransactionOptions;
import org.apache.asterix.common.transactions.TxnId;
+import org.apache.asterix.common.utils.StorageConstants;
import org.apache.asterix.common.utils.TransactionUtil;
import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.io.FileReference;
+import org.apache.hyracks.api.io.IIOManager;
import org.apache.hyracks.api.lifecycle.ILifeCycleComponent;
import org.apache.hyracks.util.annotations.ThreadSafe;
import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
+import com.fasterxml.jackson.databind.ObjectMapper;
+
@ThreadSafe
public class TransactionManager implements ITransactionManager,
ILifeCycleComponent {
@@ -61,7 +67,7 @@ public class TransactionManager implements
ITransactionManager, ILifeCycleCompon
if (txnCtx != null) {
throw new ACIDException("Transaction with the same (" + txnId + ")
already exists");
}
- txnCtx = TransactionContextFactory.create(txnId, options);
+ txnCtx = TransactionContextFactory.create(txnId, options,
txnSubsystem.getApplicationContext());
txnCtxRepository.put(txnId, txnCtx);
ensureMaxTxnId(txnId.getId());
return txnCtx;
@@ -187,4 +193,27 @@ public class TransactionManager implements
ITransactionManager, ILifeCycleCompon
LOGGER.log(Level.WARN, "exception while dumping state", e);
}
}
+
+ @Override
+ public void rollbackMetadataTransactionsWithoutWAL() {
+ IIOManager ioManager =
txnSubsystem.getApplicationContext().getIoManager();
+ try {
+ Set<FileReference> txnLogFileRefs =
+ ioManager.list(ioManager.resolve(Paths
+ .get(StorageConstants.METADATA_TXN_NOWAL_DIR_NAME,
+ StorageConstants.PARTITION_DIR_PREFIX +
StorageConstants.METADATA_PARTITION)
+ .toString()));
+ ObjectMapper objectMapper = new ObjectMapper();
+ for (FileReference txnLogFileRef : txnLogFileRefs) {
+ MetadataAtomicTransactionLog atomicTransactionLog =
objectMapper.readValue(
+ new String(ioManager.readAllBytes(txnLogFileRef)),
MetadataAtomicTransactionLog.class);
+ AtomicNoWALTransactionContext context = new
AtomicNoWALTransactionContext(
+ atomicTransactionLog.getTxnId(),
txnSubsystem.getApplicationContext());
+ context.rollback(atomicTransactionLog.getResourceMap());
+ context.deleteLogFile();
+ }
+ } catch (Exception e) {
+ throw new ACIDException(e);
+ }
+ }
}