Repository: asterixdb Updated Branches: refs/heads/master 893d385f7 -> 5070d633e
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5070d633/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/SecondaryIndexSearchOperationCallbackFactory.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/SecondaryIndexSearchOperationCallbackFactory.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/SecondaryIndexSearchOperationCallbackFactory.java index 0b96164..972668a 100644 --- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/SecondaryIndexSearchOperationCallbackFactory.java +++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/SecondaryIndexSearchOperationCallbackFactory.java @@ -32,6 +32,6 @@ public class SecondaryIndexSearchOperationCallbackFactory implements ISearchOper @Override public ISearchOperationCallback createSearchOperationCallback(long resourceId, IHyracksTaskContext ctx, IOperatorNodePushable operatorNodePushable) throws HyracksDataException { - return new SecondaryIndexSearchOperationCallback(); + return new SecondaryIndexSearchOperationCallback(resourceId); } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5070d633/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/TempDatasetPrimaryIndexModificationOperationCallbackFactory.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/TempDatasetPrimaryIndexModificationOperationCallbackFactory.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/TempDatasetPrimaryIndexModificationOperationCallbackFactory.java index 79ce788..735d7ea 100644 --- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/TempDatasetPrimaryIndexModificationOperationCallbackFactory.java +++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/TempDatasetPrimaryIndexModificationOperationCallbackFactory.java @@ -23,7 +23,6 @@ import org.apache.asterix.common.api.IJobEventListenerFactory; import org.apache.asterix.common.context.ITransactionSubsystemProvider; import org.apache.asterix.common.dataflow.DatasetLocalResource; import org.apache.asterix.common.exceptions.ACIDException; -import org.apache.asterix.common.transactions.AbstractOperationCallback; import org.apache.asterix.common.transactions.AbstractOperationCallbackFactory; import org.apache.asterix.common.transactions.DatasetId; import org.apache.asterix.common.transactions.ITransactionContext; @@ -67,12 +66,12 @@ public class TempDatasetPrimaryIndexModificationOperationCallbackFactory extends try { IJobletEventListenerFactory fact = ctx.getJobletContext().getJobletEventListenerFactory(); ITransactionContext txnCtx = txnSubsystem.getTransactionManager() - .getTransactionContext(((IJobEventListenerFactory) fact).getTxnId(txnId), false); + .getTransactionContext(((IJobEventListenerFactory) fact).getTxnId(txnId)); DatasetLocalResource aResource = (DatasetLocalResource) resource.getResource(); IModificationOperationCallback modCallback = new TempDatasetIndexModificationOperationCallback( new DatasetId(datasetId), primaryKeyFields, txnCtx, txnSubsystem.getLockManager(), txnSubsystem, resource.getId(), aResource.getPartition(), resourceType, indexOp); - txnCtx.registerIndexAndCallback(resource.getId(), index, (AbstractOperationCallback) modCallback, true); + txnCtx.register(resource.getId(), index, modCallback, true); return modCallback; } catch (ACIDException e) { throw new HyracksDataException(e); http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5070d633/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/TempDatasetSecondaryIndexModificationOperationCallbackFactory.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/TempDatasetSecondaryIndexModificationOperationCallbackFactory.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/TempDatasetSecondaryIndexModificationOperationCallbackFactory.java index 8a27914..b744606 100644 --- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/TempDatasetSecondaryIndexModificationOperationCallbackFactory.java +++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/TempDatasetSecondaryIndexModificationOperationCallbackFactory.java @@ -23,7 +23,6 @@ import org.apache.asterix.common.api.IJobEventListenerFactory; import org.apache.asterix.common.context.ITransactionSubsystemProvider; import org.apache.asterix.common.dataflow.DatasetLocalResource; import org.apache.asterix.common.exceptions.ACIDException; -import org.apache.asterix.common.transactions.AbstractOperationCallback; import org.apache.asterix.common.transactions.AbstractOperationCallbackFactory; import org.apache.asterix.common.transactions.DatasetId; import org.apache.asterix.common.transactions.ITransactionContext; @@ -69,11 +68,11 @@ public class TempDatasetSecondaryIndexModificationOperationCallbackFactory exten try { IJobletEventListenerFactory fact = ctx.getJobletContext().getJobletEventListenerFactory(); ITransactionContext txnCtx = txnSubsystem.getTransactionManager() - .getTransactionContext(((IJobEventListenerFactory) fact).getTxnId(txnId), false); + .getTransactionContext(((IJobEventListenerFactory) fact).getTxnId(txnId)); IModificationOperationCallback modCallback = new TempDatasetIndexModificationOperationCallback( new DatasetId(datasetId), primaryKeyFields, txnCtx, txnSubsystem.getLockManager(), txnSubsystem, resource.getId(), aResource.getPartition(), resourceType, indexOp); - txnCtx.registerIndexAndCallback(resource.getId(), index, (AbstractOperationCallback) modCallback, false); + txnCtx.register(resource.getId(), index, modCallback, false); return modCallback; } catch (ACIDException e) { throw new HyracksDataException(e); http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5070d633/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/UpsertOperationCallbackFactory.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/UpsertOperationCallbackFactory.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/UpsertOperationCallbackFactory.java index dfd3eb1..da4aab8 100644 --- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/UpsertOperationCallbackFactory.java +++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/UpsertOperationCallbackFactory.java @@ -22,7 +22,6 @@ import org.apache.asterix.common.api.IJobEventListenerFactory; import org.apache.asterix.common.context.ITransactionSubsystemProvider; import org.apache.asterix.common.dataflow.DatasetLocalResource; import org.apache.asterix.common.exceptions.ACIDException; -import org.apache.asterix.common.transactions.AbstractOperationCallback; import org.apache.asterix.common.transactions.AbstractOperationCallbackFactory; import org.apache.asterix.common.transactions.DatasetId; import org.apache.asterix.common.transactions.ITransactionContext; @@ -66,11 +65,11 @@ public class UpsertOperationCallbackFactory extends AbstractOperationCallbackFac try { IJobletEventListenerFactory fact = ctx.getJobletContext().getJobletEventListenerFactory(); ITransactionContext txnCtx = txnSubsystem.getTransactionManager() - .getTransactionContext(((IJobEventListenerFactory) fact).getTxnId(txnId), false); + .getTransactionContext(((IJobEventListenerFactory) fact).getTxnId(txnId)); IModificationOperationCallback modCallback = new UpsertOperationCallback(new DatasetId(datasetId), primaryKeyFields, txnCtx, txnSubsystem.getLockManager(), txnSubsystem, resource.getId(), aResource.getPartition(), resourceType, indexOp); - txnCtx.registerIndexAndCallback(resource.getId(), index, (AbstractOperationCallback) modCallback, true); + txnCtx.register(resource.getId(), index, modCallback, true); return modCallback; } catch (ACIDException e) { throw new HyracksDataException(e); http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5070d633/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/runtime/CommitRuntime.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/runtime/CommitRuntime.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/runtime/CommitRuntime.java index fe758e1..672e881 100644 --- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/runtime/CommitRuntime.java +++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/runtime/CommitRuntime.java @@ -83,7 +83,7 @@ public class CommitRuntime extends AbstractOneInputOneOutputOneFramePushRuntime @Override public void open() throws HyracksDataException { try { - transactionContext = transactionManager.getTransactionContext(txnId, false); + transactionContext = transactionManager.getTransactionContext(txnId); transactionContext.setWriteTxn(isWriteTransaction); ILogMarkerCallback callback = TaskUtil.get(ILogMarkerCallback.KEY_MARKER_CALLBACK, ctx); logRecord = new LogRecord(callback); @@ -111,9 +111,7 @@ public class CommitRuntime extends AbstractOneInputOneOutputOneFramePushRuntime * active operation count of PrimaryIndexOptracker. By maintaining the count correctly and only allowing * flushing when the count is 0, it can guarantee the no-steal policy for temporary datasets, too. */ - // TODO: Fix this for upserts. an upsert tuple right now expect to notify the opTracker twice (one for - // delete and one for insert) - transactionContext.notifyOptracker(false); + transactionContext.notifyEntityCommitted(); } else { tRef.reset(tAccess, t); try { http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5070d633/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogBuffer.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogBuffer.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogBuffer.java index 3d78ad9..6ebf52c 100644 --- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogBuffer.java +++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogBuffer.java @@ -234,27 +234,26 @@ public class LogBuffer implements ILogBuffer { private void batchUnlock(int beginOffset, int endOffset) throws ACIDException { if (endOffset > beginOffset) { logBufferTailReader.initializeScan(beginOffset, endOffset); - ITransactionContext txnCtx; - LogRecord logRecord = logBufferTailReader.next(); while (logRecord != null) { if (logRecord.getLogSource() == LogSource.LOCAL) { if (logRecord.getLogType() == LogType.ENTITY_COMMIT) { reusableTxnId.setId(logRecord.getTxnId()); reusableDatasetId.setId(logRecord.getDatasetId()); - txnCtx = txnSubsystem.getTransactionManager().getTransactionContext(reusableTxnId, false); + txnCtx = txnSubsystem.getTransactionManager().getTransactionContext(reusableTxnId); txnSubsystem.getLockManager().unlock(reusableDatasetId, logRecord.getPKHashValue(), LockMode.ANY, txnCtx); - txnCtx.notifyOptracker(false); + txnCtx.notifyEntityCommitted(); if (txnSubsystem.getTransactionProperties().isCommitProfilerEnabled()) { txnSubsystem.incrementEntityCommitCount(); } + } else if (logRecord.getLogType() == LogType.UPDATE) { + reusableTxnId.setId(logRecord.getTxnId()); + txnCtx = txnSubsystem.getTransactionManager().getTransactionContext(reusableTxnId); + txnCtx.notifyUpdateCommitted(logRecord.getResourceId()); } else if (logRecord.getLogType() == LogType.JOB_COMMIT || logRecord.getLogType() == LogType.ABORT) { - reusableTxnId.setId(logRecord.getTxnId()); - txnCtx = txnSubsystem.getTransactionManager().getTransactionContext(reusableTxnId, false); - txnCtx.notifyOptracker(true); notifyJobTermination(); } else if (logRecord.getLogType() == LogType.FLUSH) { notifyFlushTermination(); @@ -266,7 +265,6 @@ public class LogBuffer implements ILogBuffer { notifyReplicationTermination(); } } - logRecord = logBufferTailReader.next(); } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5070d633/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/AbstractTransactionContext.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/AbstractTransactionContext.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/AbstractTransactionContext.java new file mode 100644 index 0000000..43fe266 --- /dev/null +++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/AbstractTransactionContext.java @@ -0,0 +1,145 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.transaction.management.service.transaction; + +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; + +import org.apache.asterix.common.context.ITransactionOperationTracker; +import org.apache.asterix.common.transactions.ITransactionContext; +import org.apache.asterix.common.transactions.ITransactionManager; +import org.apache.asterix.common.transactions.TxnId; +import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex; +import org.apache.hyracks.storage.common.IModificationOperationCallback; +import org.apache.hyracks.util.annotations.ThreadSafe; + +@ThreadSafe +public abstract class AbstractTransactionContext implements ITransactionContext { + + protected final TxnId txnId; + protected final Map<Long, ITransactionOperationTracker> txnOpTrackers; + private final AtomicLong firstLSN; + private final AtomicLong lastLSN; + private final AtomicInteger txnState; + private final AtomicBoolean isWriteTxn; + private boolean isTimeout = false; + + protected AbstractTransactionContext(TxnId txnId) { + this.txnId = txnId; + firstLSN = new AtomicLong(-1); + lastLSN = new AtomicLong(-1); + txnState = new AtomicInteger(ITransactionManager.ACTIVE); + isTimeout = false; + isWriteTxn = new AtomicBoolean(); + txnOpTrackers = new HashMap<>(); + } + + @Override + public long getFirstLSN() { + return firstLSN.get(); + } + + @Override + public void setLastLSN(long newValue) { + firstLSN.compareAndSet(-1, newValue); + lastLSN.set(Math.max(lastLSN.get(), newValue)); + } + + @Override + public void setTxnState(int txnState) { + this.txnState.set(txnState); + } + + @Override + public int getTxnState() { + return txnState.get(); + } + + @Override + public TxnId getTxnId() { + return txnId; + } + + @Override + public synchronized void setTimeout(boolean isTimeout) { + this.isTimeout = isTimeout; + } + + @Override + public synchronized boolean isTimeout() { + return isTimeout; + } + + @Override + public void setWriteTxn(boolean isWriteTxn) { + this.isWriteTxn.set(isWriteTxn); + } + + @Override + public boolean isWriteTxn() { + return isWriteTxn.get(); + } + + @Override + public long getLastLSN() { + return lastLSN.get(); + } + + @Override + public void complete() { + try { + if (txnState.get() == ITransactionManager.ABORTED) { + cleanupForAbort(); + } + } finally { + synchronized (txnOpTrackers) { + txnOpTrackers.forEach((resource, opTracker) -> opTracker.afterTransaction(resource)); + } + } + } + + @Override + public void register(long resourceId, ILSMIndex index, IModificationOperationCallback callback, + boolean primaryIndex) { + synchronized (txnOpTrackers) { + if (!txnOpTrackers.containsKey(resourceId)) { + final ITransactionOperationTracker txnOpTracker = + (ITransactionOperationTracker) index.getOperationTracker(); + txnOpTrackers.put(resourceId, txnOpTracker); + txnOpTracker.beforeTransaction(resourceId); + } + } + } + + public String prettyPrint() { + StringBuilder sb = new StringBuilder(); + sb.append("\n" + txnId + "\n"); + sb.append("isWriteTxn: " + isWriteTxn + "\n"); + sb.append("firstLSN: " + firstLSN.get() + "\n"); + sb.append("lastLSN: " + lastLSN.get() + "\n"); + sb.append("TransactionState: " + txnState + "\n"); + sb.append("isTimeout: " + isTimeout + "\n"); + return sb.toString(); + } + + protected abstract void cleanupForAbort(); +} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5070d633/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/AtomicTransactionContext.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/AtomicTransactionContext.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/AtomicTransactionContext.java new file mode 100644 index 0000000..1d132a8 --- /dev/null +++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/AtomicTransactionContext.java @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.transaction.management.service.transaction; + +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.asterix.common.context.PrimaryIndexOperationTracker; +import org.apache.asterix.common.exceptions.ACIDException; +import org.apache.asterix.common.transactions.TxnId; +import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex; +import org.apache.hyracks.storage.am.lsm.common.api.ILSMOperationTracker; +import org.apache.hyracks.storage.am.lsm.common.api.LSMOperationType; +import org.apache.hyracks.storage.common.IModificationOperationCallback; +import org.apache.hyracks.util.annotations.ThreadSafe; + +@ThreadSafe +public class AtomicTransactionContext extends AbstractTransactionContext { + + private final Map<Long, ILSMOperationTracker> opTrackers = new HashMap<>(); + private final Map<Long, AtomicInteger> indexPendingOps = new HashMap<>(); + private final Map<Long, IModificationOperationCallback> callbacks = new HashMap<>(); + + public AtomicTransactionContext(TxnId txnId) { + super(txnId); + } + + @Override + public void register(long resourceId, ILSMIndex index, IModificationOperationCallback callback, + boolean primaryIndex) { + super.register(resourceId, index, callback, primaryIndex); + synchronized (txnOpTrackers) { + if (primaryIndex && !opTrackers.containsKey(resourceId)) { + opTrackers.put(resourceId, index.getOperationTracker()); + callbacks.put(resourceId, callback); + indexPendingOps.put(resourceId, new AtomicInteger(0)); + } + } + } + + @Override + public void notifyUpdateCommitted(long resourceId) { + try { + opTrackers.get(resourceId).completeOperation(null, LSMOperationType.MODIFICATION, null, + callbacks.get(resourceId)); + } catch (HyracksDataException e) { + throw new ACIDException(e); + } + } + + @Override + public void notifyEntityCommitted() { + throw new IllegalStateException("Unexpected entity commit in atomic transaction"); + } + + @Override + public void beforeOperation(long resourceId) { + indexPendingOps.get(resourceId).incrementAndGet(); + } + + @Override + public void afterOperation(long resourceId) { + indexPendingOps.get(resourceId).decrementAndGet(); + } + + @Override + public void cleanupForAbort() { + // each opTracker should be cleaned + opTrackers.forEach((resId, opTracker) -> ((PrimaryIndexOperationTracker) opTracker) + .cleanupNumActiveOperationsForAbortedJob(indexPendingOps.get(resId).get())); + } + + @Override + public int hashCode() { + return Long.hashCode(txnId.getId()); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + AtomicTransactionContext that = (AtomicTransactionContext) o; + return this.txnId.equals(that.txnId); + } +} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5070d633/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/EntityLevelTransactionContext.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/EntityLevelTransactionContext.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/EntityLevelTransactionContext.java new file mode 100644 index 0000000..e195451 --- /dev/null +++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/EntityLevelTransactionContext.java @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.transaction.management.service.transaction; + +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.asterix.common.context.PrimaryIndexOperationTracker; +import org.apache.asterix.common.exceptions.ACIDException; +import org.apache.asterix.common.transactions.TxnId; +import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex; +import org.apache.hyracks.storage.am.lsm.common.api.LSMOperationType; +import org.apache.hyracks.storage.common.IModificationOperationCallback; +import org.apache.hyracks.util.annotations.ThreadSafe; + +@ThreadSafe +public class EntityLevelTransactionContext extends AbstractTransactionContext { + + private PrimaryIndexOperationTracker primaryIndexOpTracker; + private IModificationOperationCallback primaryIndexCallback; + private final AtomicInteger pendingOps; + + public EntityLevelTransactionContext(TxnId txnId) { + super(txnId); + pendingOps = new AtomicInteger(0); + } + + @Override + public void register(long resourceId, ILSMIndex index, IModificationOperationCallback callback, + boolean primaryIndex) { + super.register(resourceId, index, callback, primaryIndex); + synchronized (txnOpTrackers) { + if (primaryIndex && primaryIndexOpTracker == null) { + primaryIndexCallback = callback; + primaryIndexOpTracker = (PrimaryIndexOperationTracker) index.getOperationTracker(); + } + } + } + + @Override + public void beforeOperation(long resourceId) { + pendingOps.incrementAndGet(); + } + + @Override + public void notifyUpdateCommitted(long resourceId) { + // no op + } + + @Override + public void notifyEntityCommitted() { + try { + primaryIndexOpTracker.completeOperation(null, LSMOperationType.MODIFICATION, null, primaryIndexCallback); + } catch (HyracksDataException e) { + throw new ACIDException(e); + } + } + + @Override + public void afterOperation(long resourceId) { + pendingOps.decrementAndGet(); + } + + @Override + protected void cleanupForAbort() { + if (primaryIndexOpTracker != null) { + primaryIndexOpTracker.cleanupNumActiveOperationsForAbortedJob(pendingOps.get()); + } + } + + @Override + public int hashCode() { + return Long.hashCode(txnId.getId()); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + EntityLevelTransactionContext that = (EntityLevelTransactionContext) o; + return this.txnId.equals(that.txnId); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5070d633/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/FieldsHashValueGenerator.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/FieldsHashValueGenerator.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/FieldsHashValueGenerator.java deleted file mode 100644 index c408f1d..0000000 --- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/FieldsHashValueGenerator.java +++ /dev/null @@ -1,41 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.asterix.transaction.management.service.transaction; - -import org.apache.hyracks.api.dataflow.value.IBinaryHashFunction; -import org.apache.hyracks.api.exceptions.HyracksDataException; -import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference; - -public class FieldsHashValueGenerator { - public static int computeFieldsHashValue(ITupleReference tuple, int[] fieldIndexes, - IBinaryHashFunction[] fieldHashFunctions) throws HyracksDataException { - int h = 0; - for (int i = 0; i < fieldIndexes.length; i++) { - int primaryKeyFieldIdx = fieldIndexes[i]; - int fh = fieldHashFunctions[i].hash(tuple.getFieldData(primaryKeyFieldIdx), - tuple.getFieldStart(primaryKeyFieldIdx), tuple.getFieldLength(primaryKeyFieldIdx)); - h = h * 31 + fh; - if (h < 0) { - h = h * (-1); - } - } - return h; - } -} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5070d633/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/MutableResourceId.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/MutableResourceId.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/MutableResourceId.java deleted file mode 100644 index c8e134c..0000000 --- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/MutableResourceId.java +++ /dev/null @@ -1,48 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.asterix.transaction.management.service.transaction; - -public class MutableResourceId { - long id; - - public MutableResourceId(long id) { - this.id = id; - } - - public void setId(long id) { - this.id = id; - } - - public long getId() { - return id; - } - - @Override - public int hashCode() { - return (int) id; - } - - @Override - public boolean equals(Object o) { - if ((o == null) || !(o instanceof MutableResourceId)) { - return false; - } - return ((MutableResourceId) o).id == this.id; - } -} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5070d633/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/TransactionContext.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/TransactionContext.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/TransactionContext.java deleted file mode 100644 index 1681a27..0000000 --- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/TransactionContext.java +++ /dev/null @@ -1,271 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.asterix.transaction.management.service.transaction; - -import java.util.HashMap; -import java.util.Map; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicLong; - -import org.apache.asterix.common.context.ITransactionOperationTracker; -import org.apache.asterix.common.context.PrimaryIndexOperationTracker; -import org.apache.asterix.common.exceptions.ACIDException; -import org.apache.asterix.common.transactions.AbstractOperationCallback; -import org.apache.asterix.common.transactions.ITransactionContext; -import org.apache.asterix.common.transactions.ITransactionManager; -import org.apache.asterix.common.transactions.LogRecord; -import org.apache.asterix.common.transactions.TxnId; -import org.apache.hyracks.api.exceptions.HyracksDataException; -import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex; -import org.apache.hyracks.storage.am.lsm.common.api.LSMOperationType; -import org.apache.hyracks.storage.common.IModificationOperationCallback; - -/* - * An object of TransactionContext is created and accessed(read/written) by multiple threads which work for - * a single job identified by a txnId. Thus, the member variables in the object can be read/written - * concurrently. Please see each variable declaration to know which one is accessed concurrently and - * which one is not. - */ -public class TransactionContext implements ITransactionContext { - - private static final long serialVersionUID = -6105616785783310111L; - - // txnId is set once and read concurrently. - private final TxnId txnId; - - // There are no concurrent writers on both firstLSN and lastLSN - // since both values are updated by serialized log appenders. - // But readers and writers can be different threads, - // so both LSNs are atomic variables in order to be read and written - // atomically. - private final AtomicLong firstLSN; - private final AtomicLong lastLSN; - - // txnState is read and written concurrently. - private final AtomicInteger txnState; - - // isTimeout is read and written under the lockMgr's tableLatch - // Thus, no other synchronization is required separately. - private boolean isTimeout; - - // isWriteTxn can be set concurrently by multiple threads. - private final AtomicBoolean isWriteTxn; - - // isMetadataTxn is accessed by a single thread since the metadata is not - // partitioned - private boolean isMetadataTxn; - - // indexMap is concurrently accessed by multiple threads, - // so those threads are synchronized on indexMap object itself - private final Map<Long, ITransactionOperationTracker> indexMap; - - // TODO: fix ComponentLSNs' issues. - // primaryIndex, primaryIndexCallback, and primaryIndexOptracker will be - // modified accordingly - // when the issues of componentLSNs are fixed. - private ILSMIndex primaryIndex; - private AbstractOperationCallback primaryIndexCallback; - private PrimaryIndexOperationTracker primaryIndexOpTracker; - - // The following three variables are used as temporary variables in order to - // avoid object creations. - // Those are used in synchronized methods. - private final LogRecord logRecord; - - private final AtomicInteger transactorNumActiveOperations; - - // TODO: implement transactionContext pool in order to avoid object - // creations. - // also, the pool can throttle the number of concurrent active jobs at every - // moment. - public TransactionContext(TxnId txnId) throws ACIDException { - this.txnId = txnId; - firstLSN = new AtomicLong(-1); - lastLSN = new AtomicLong(-1); - txnState = new AtomicInteger(ITransactionManager.ACTIVE); - isTimeout = false; - isWriteTxn = new AtomicBoolean(false); - isMetadataTxn = false; - indexMap = new HashMap<>(); - primaryIndex = null; - logRecord = new LogRecord(); - transactorNumActiveOperations = new AtomicInteger(0); - } - - @Override - public void registerIndexAndCallback(long resourceId, ILSMIndex index, AbstractOperationCallback callback, - boolean isPrimaryIndex) { - synchronized (indexMap) { - if (isPrimaryIndex && primaryIndex == null) { - primaryIndex = index; - primaryIndexCallback = callback; - primaryIndexOpTracker = (PrimaryIndexOperationTracker) index.getOperationTracker(); - } - if (!indexMap.containsKey(resourceId)) { - final ITransactionOperationTracker txnOpTracker = - (ITransactionOperationTracker) index.getOperationTracker(); - indexMap.put(resourceId, txnOpTracker); - txnOpTracker.beforeTransaction(resourceId); - } - } - } - - public PrimaryIndexOperationTracker getPrimaryIndexOpTracker() { - synchronized (indexMap) { - return primaryIndexOpTracker; - } - } - - // [Notice] - // This method is called sequentially by the LogAppender threads. - @Override - public void setLastLSN(long LSN) { - firstLSN.compareAndSet(-1, LSN); - lastLSN.set(Math.max(lastLSN.get(), LSN)); - } - - @Override - public void notifyOptracker(boolean isJobLevelCommit) { - try { - /** - * in case of transaction abort {@link TransactionContext#cleanupForAbort()} will - * clean the primaryIndexOpTracker state. - */ - if (isJobLevelCommit && isMetadataTxn && txnState.get() != ITransactionManager.ABORTED) { - primaryIndexOpTracker.exclusiveJobCommitted(); - } else if (!isJobLevelCommit) { - primaryIndexOpTracker.completeOperation(null, LSMOperationType.MODIFICATION, null, - (IModificationOperationCallback) primaryIndexCallback); - } - } catch (HyracksDataException e) { - throw new IllegalStateException(e); - } - } - - @Override - public void setWriteTxn(boolean isWriteTxn) { - this.isWriteTxn.set(isWriteTxn); - } - - @Override - public boolean isWriteTxn() { - return isWriteTxn.get(); - } - - @Override - public long getFirstLSN() { - return firstLSN.get(); - } - - @Override - public long getLastLSN() { - return lastLSN.get(); - } - - @Override - public TxnId getTxnId() { - return txnId; - } - - @Override - public void setTimeout(boolean isTimeout) { - this.isTimeout = isTimeout; - } - - @Override - public boolean isTimeout() { - return isTimeout; - } - - @Override - public void setTxnState(int txnState) { - this.txnState.set(txnState); - } - - @Override - public int getTxnState() { - return txnState.get(); - } - - @Override - public int hashCode() { - return Long.hashCode(txnId.getId()); - } - - @Override - public boolean equals(Object o) { - return (o == this); - } - - @Override - public void setMetadataTransaction(boolean isMetadataTxn) { - this.isMetadataTxn = isMetadataTxn; - } - - @Override - public boolean isMetadataTransaction() { - return isMetadataTxn; - } - - @Override - public String prettyPrint() { - StringBuilder sb = new StringBuilder(); - sb.append("\n" + txnId + "\n"); - sb.append("isWriteTxn: " + isWriteTxn + "\n"); - sb.append("firstLSN: " + firstLSN.get() + "\n"); - sb.append("lastLSN: " + lastLSN.get() + "\n"); - sb.append("TransactionState: " + txnState + "\n"); - sb.append("isTimeout: " + isTimeout + "\n"); - return sb.toString(); - } - - public LogRecord getLogRecord() { - return logRecord; - } - - @Override - public void incrementNumActiveOperations() { - transactorNumActiveOperations.incrementAndGet(); - } - - @Override - public void decrementNumActiveOperations() { - transactorNumActiveOperations.decrementAndGet(); - } - - @Override - public void complete() { - try { - if (txnState.get() == ITransactionManager.ABORTED) { - cleanupForAbort(); - } - } finally { - synchronized (indexMap) { - indexMap.forEach((resource, opTracker) -> opTracker.afterTransaction(resource)); - } - } - } - - private void cleanupForAbort() { - if (primaryIndexOpTracker != null) { - primaryIndexOpTracker.cleanupNumActiveOperationsForAbortedJob(transactorNumActiveOperations.get()); - } - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5070d633/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/TransactionContextFactory.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/TransactionContextFactory.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/TransactionContextFactory.java new file mode 100644 index 0000000..4a465a4 --- /dev/null +++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/TransactionContextFactory.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.transaction.management.service.transaction; + +import static org.apache.asterix.common.transactions.ITransactionManager.AtomicityLevel; + +import org.apache.asterix.common.transactions.ITransactionContext; +import org.apache.asterix.common.transactions.TransactionOptions; +import org.apache.asterix.common.transactions.TxnId; + +public class TransactionContextFactory { + + private TransactionContextFactory() { + } + + public static ITransactionContext create(TxnId txnId, TransactionOptions options) { + final AtomicityLevel atomicityLevel = options.getAtomicityLevel(); + switch (atomicityLevel) { + case ATOMIC: + return new AtomicTransactionContext(txnId); + case ENTITY_LEVEL: + return new EntityLevelTransactionContext(txnId); + default: + throw new IllegalStateException("Unknown transaction context type: " + atomicityLevel); + } + } +} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5070d633/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/TransactionManager.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/TransactionManager.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/TransactionManager.java index 1799ea1..c03369b 100644 --- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/TransactionManager.java +++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/TransactionManager.java @@ -18,6 +18,7 @@ */ package org.apache.asterix.transaction.management.service.transaction; +import java.io.IOException; import java.io.OutputStream; import java.util.Map; import java.util.Set; @@ -27,124 +28,93 @@ import java.util.logging.Level; import java.util.logging.Logger; import org.apache.asterix.common.exceptions.ACIDException; -import org.apache.asterix.common.transactions.DatasetId; import org.apache.asterix.common.transactions.ITransactionContext; import org.apache.asterix.common.transactions.ITransactionManager; import org.apache.asterix.common.transactions.ITransactionSubsystem; -import org.apache.asterix.common.transactions.TxnId; import org.apache.asterix.common.transactions.LogRecord; +import org.apache.asterix.common.transactions.TransactionOptions; +import org.apache.asterix.common.transactions.TxnId; import org.apache.asterix.common.utils.TransactionUtil; import org.apache.hyracks.api.lifecycle.ILifeCycleComponent; +import org.apache.hyracks.util.annotations.ThreadSafe; -/** - * An implementation of the @see ITransactionManager interface that provides - * implementation of APIs for governing the lifecycle of a transaction. - */ +@ThreadSafe public class TransactionManager implements ITransactionManager, ILifeCycleComponent { - public static final boolean IS_DEBUG_MODE = false;//true private static final Logger LOGGER = Logger.getLogger(TransactionManager.class.getName()); private final ITransactionSubsystem txnSubsystem; - private Map<TxnId, ITransactionContext> transactionContextRepository = new ConcurrentHashMap<>(); - private AtomicLong maxTxnId = new AtomicLong(0); + private final Map<TxnId, ITransactionContext> txnCtxRepository = new ConcurrentHashMap<>(); + private final AtomicLong maxTxnId = new AtomicLong(0); public TransactionManager(ITransactionSubsystem provider) { this.txnSubsystem = provider; } @Override - public void abortTransaction(ITransactionContext txnCtx, DatasetId datasetId, int PKHashVal) throws ACIDException { - if (txnCtx.getTxnState() != ITransactionManager.ABORTED) { - txnCtx.setTxnState(ITransactionManager.ABORTED); - } - try { - if (txnCtx.isWriteTxn()) { - LogRecord logRecord = ((TransactionContext) txnCtx).getLogRecord(); - TransactionUtil.formJobTerminateLogRecord(txnCtx, logRecord, false); - txnSubsystem.getLogManager().log(logRecord); - txnSubsystem.getRecoveryManager().rollbackTransaction(txnCtx); - } - } catch (Exception ae) { - String msg = "Could not complete rollback! System is in an inconsistent state"; - if (LOGGER.isLoggable(Level.SEVERE)) { - LOGGER.severe(msg); - } - ae.printStackTrace(); - throw new ACIDException(msg, ae); - } finally { - txnCtx.complete(); - txnSubsystem.getLockManager().releaseLocks(txnCtx); - transactionContextRepository.remove(txnCtx.getTxnId()); + public synchronized ITransactionContext beginTransaction(TxnId txnId, TransactionOptions options) + throws ACIDException { + ITransactionContext txnCtx = txnCtxRepository.get(txnId); + if (txnCtx != null) { + throw new ACIDException("Transaction with the same (" + txnId + ") already exists"); } + txnCtx = TransactionContextFactory.create(txnId, options); + txnCtxRepository.put(txnId, txnCtx); + ensureMaxTxnId(txnId.getId()); + return txnCtx; } @Override - public ITransactionContext beginTransaction(TxnId txnId) throws ACIDException { - return getTransactionContext(txnId, true); - } - - @Override - public ITransactionContext getTransactionContext(TxnId txnId, boolean createIfNotExist) throws ACIDException { - setMaxTxnId(txnId.getId()); - ITransactionContext txnCtx = transactionContextRepository.get(txnId); + public ITransactionContext getTransactionContext(TxnId txnId) throws ACIDException { + ITransactionContext txnCtx = txnCtxRepository.get(txnId); if (txnCtx == null) { - if (createIfNotExist) { - synchronized (this) { - txnCtx = transactionContextRepository.get(txnId); - if (txnCtx == null) { - txnCtx = new TransactionContext(txnId); - transactionContextRepository.put(txnId, txnCtx); - } - } - } else { - throw new ACIDException("TransactionContext of " + txnId + " doesn't exist."); - } + throw new ACIDException("Transaction " + txnId + " doesn't exist."); } return txnCtx; } @Override - public void commitTransaction(ITransactionContext txnCtx, DatasetId datasetId, int PKHashVal) - throws ACIDException { - //Only job-level commits call this method. + public void commitTransaction(TxnId txnId) throws ACIDException { + final ITransactionContext txnCtx = getTransactionContext(txnId); try { if (txnCtx.isWriteTxn()) { - LogRecord logRecord = ((TransactionContext) txnCtx).getLogRecord(); + LogRecord logRecord = new LogRecord(); TransactionUtil.formJobTerminateLogRecord(txnCtx, logRecord, true); txnSubsystem.getLogManager().log(logRecord); + txnCtx.setTxnState(ITransactionManager.COMMITTED); } - } catch (Exception ae) { + } catch (Exception e) { if (LOGGER.isLoggable(Level.SEVERE)) { LOGGER.severe(" caused exception in commit !" + txnCtx.getTxnId()); } - throw ae; + throw e; } finally { txnCtx.complete(); txnSubsystem.getLockManager().releaseLocks(txnCtx); - transactionContextRepository.remove(txnCtx.getTxnId()); - txnCtx.setTxnState(ITransactionManager.COMMITTED); + txnCtxRepository.remove(txnCtx.getTxnId()); } } @Override - public void completedTransaction(ITransactionContext txnContext, DatasetId datasetId, int PKHashVal, - boolean success) throws ACIDException { - if (!success) { - abortTransaction(txnContext, datasetId, PKHashVal); - } else { - commitTransaction(txnContext, datasetId, PKHashVal); - } - } - - @Override - public ITransactionSubsystem getTransactionSubsystem() { - return txnSubsystem; - } - - public void setMaxTxnId(long txnId) { - long maxId = maxTxnId.get(); - if (txnId > maxId) { - maxTxnId.compareAndSet(maxId, txnId); + public void abortTransaction(TxnId txnId) throws ACIDException { + final ITransactionContext txnCtx = getTransactionContext(txnId); + try { + if (txnCtx.isWriteTxn()) { + LogRecord logRecord = new LogRecord(); + TransactionUtil.formJobTerminateLogRecord(txnCtx, logRecord, false); + txnSubsystem.getLogManager().log(logRecord); + txnSubsystem.getRecoveryManager().rollbackTransaction(txnCtx); + txnCtx.setTxnState(ITransactionManager.ABORTED); + } + } catch (ACIDException e) { + String msg = "Could not complete rollback! System is in an inconsistent state"; + if (LOGGER.isLoggable(Level.SEVERE)) { + LOGGER.log(Level.SEVERE, msg, e); + } + throw new ACIDException(msg, e); + } finally { + txnCtx.complete(); + txnSubsystem.getLockManager().releaseLocks(txnCtx); + txnCtxRepository.remove(txnCtx.getTxnId()); } } @@ -167,45 +137,41 @@ public class TransactionManager implements ITransactionManager, ILifeCycleCompon @Override public void dumpState(OutputStream os) { - //#. dump TxnContext dumpTxnContext(os); } + private void ensureMaxTxnId(long txnId) { + maxTxnId.updateAndGet(current -> Math.max(current, txnId)); + } + private void dumpTxnContext(OutputStream os) { TxnId txnId; ITransactionContext txnCtx; StringBuilder sb = new StringBuilder(); - try { sb.append("\n>>dump_begin\t>>----- [ConfVars] -----"); - Set<Map.Entry<TxnId, ITransactionContext>> entrySet = transactionContextRepository.entrySet(); - if (entrySet != null) { - for (Map.Entry<TxnId, ITransactionContext> entry : entrySet) { - if (entry != null) { - txnId = entry.getKey(); - if (txnId != null) { - sb.append("\n" + txnId); - } else { - sb.append("\nJID:null"); - } - - txnCtx = entry.getValue(); - if (txnCtx != null) { - sb.append(txnCtx.prettyPrint()); - } else { - sb.append("\nTxnCtx:null"); - } + Set<Map.Entry<TxnId, ITransactionContext>> entrySet = txnCtxRepository.entrySet(); + for (Map.Entry<TxnId, ITransactionContext> entry : entrySet) { + if (entry != null) { + txnId = entry.getKey(); + if (txnId != null) { + sb.append("\n" + txnId); + } else { + sb.append("\nJID:null"); + } + + txnCtx = entry.getValue(); + if (txnCtx != null) { + sb.append(((AbstractTransactionContext) txnCtx).prettyPrint()); + } else { + sb.append("\nTxnCtx:null"); } } } - sb.append("\n>>dump_end\t>>----- [ConfVars] -----\n"); os.write(sb.toString().getBytes()); - } catch (Exception e) { - //ignore exception and continue dumping as much as possible. - if (IS_DEBUG_MODE) { - e.printStackTrace(); - } + } catch (IOException e) { + LOGGER.log(Level.WARNING, "exception while dumping state", e); } } }
