Repository: asterixdb Updated Branches: refs/heads/master e5a65429d -> 2c04ae075
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2c04ae07/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/runtime/CommitRuntime.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/runtime/CommitRuntime.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/runtime/CommitRuntime.java index 672e881..d298bef 100644 --- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/runtime/CommitRuntime.java +++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/runtime/CommitRuntime.java @@ -27,9 +27,9 @@ import org.apache.asterix.common.transactions.ILogManager; import org.apache.asterix.common.transactions.ILogMarkerCallback; import org.apache.asterix.common.transactions.ITransactionContext; import org.apache.asterix.common.transactions.ITransactionManager; -import org.apache.asterix.common.transactions.TxnId; import org.apache.asterix.common.transactions.LogRecord; import org.apache.asterix.common.transactions.LogType; +import org.apache.asterix.common.transactions.TxnId; import org.apache.asterix.common.utils.TransactionUtil; import org.apache.hyracks.algebricks.runtime.operators.base.AbstractOneInputOneOutputOneFramePushRuntime; import org.apache.hyracks.api.comm.IFrame; @@ -53,7 +53,6 @@ public class CommitRuntime extends AbstractOneInputOneOutputOneFramePushRuntime protected final TxnId txnId; protected final int datasetId; protected final int[] primaryKeyFields; - protected final boolean isTemporaryDatasetWriteJob; protected final boolean isWriteTransaction; protected final long[] longHashes; protected final IHyracksTaskContext ctx; @@ -63,7 +62,7 @@ public class CommitRuntime extends AbstractOneInputOneOutputOneFramePushRuntime protected final boolean isSink; public CommitRuntime(IHyracksTaskContext ctx, TxnId txnId, int datasetId, int[] primaryKeyFields, - boolean isTemporaryDatasetWriteJob, boolean isWriteTransaction, int resourcePartition, boolean isSink) { + boolean isWriteTransaction, int resourcePartition, boolean isSink) { this.ctx = ctx; INcApplicationContext appCtx = (INcApplicationContext) ctx.getJobletContext().getServiceContext().getApplicationContext(); @@ -73,7 +72,6 @@ public class CommitRuntime extends AbstractOneInputOneOutputOneFramePushRuntime this.datasetId = datasetId; this.primaryKeyFields = primaryKeyFields; this.tRef = new FrameTupleReference(); - this.isTemporaryDatasetWriteJob = isTemporaryDatasetWriteJob; this.isWriteTransaction = isWriteTransaction; this.resourcePartition = resourcePartition; this.isSink = isSink; @@ -102,27 +100,15 @@ public class CommitRuntime extends AbstractOneInputOneOutputOneFramePushRuntime tAccess.reset(buffer); int nTuple = tAccess.getTupleCount(); for (int t = 0; t < nTuple; t++) { - if (isTemporaryDatasetWriteJob) { - /** - * This "if branch" is for writes over temporary datasets. A temporary dataset does not require any lock - * and does not generate any write-ahead update and commit log but generates flush log and job commit - * log. However, a temporary dataset still MUST guarantee no-steal policy so that this notification call - * should be delivered to PrimaryIndexOptracker and used correctly in order to decrement number of - * active operation count of PrimaryIndexOptracker. By maintaining the count correctly and only allowing - * flushing when the count is 0, it can guarantee the no-steal policy for temporary datasets, too. - */ - transactionContext.notifyEntityCommitted(); - } else { - tRef.reset(tAccess, t); - try { - formLogRecord(buffer, t); - logMgr.log(logRecord); - if (!isSink) { - appendTupleToFrame(t); - } - } catch (ACIDException e) { - throw new HyracksDataException(e); + tRef.reset(tAccess, t); + try { + formLogRecord(buffer, t); + logMgr.log(logRecord); + if (!isSink) { + appendTupleToFrame(t); } + } catch (ACIDException e) { + throw new HyracksDataException(e); } } IFrame message = TaskUtil.get(HyracksConstants.KEY_MESSAGE, ctx); http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2c04ae07/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/runtime/CommitRuntimeFactory.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/runtime/CommitRuntimeFactory.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/runtime/CommitRuntimeFactory.java index 58f7e69..91db197 100644 --- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/runtime/CommitRuntimeFactory.java +++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/runtime/CommitRuntimeFactory.java @@ -34,17 +34,15 @@ public class CommitRuntimeFactory implements IPushRuntimeFactory { protected final TxnId txnId; protected final int datasetId; protected final int[] primaryKeyFields; - protected final boolean isTemporaryDatasetWriteJob; protected final boolean isWriteTransaction; protected int[] datasetPartitions; protected final boolean isSink; - public CommitRuntimeFactory(TxnId txnId, int datasetId, int[] primaryKeyFields, boolean isTemporaryDatasetWriteJob, - boolean isWriteTransaction, int[] datasetPartitions, boolean isSink) { + public CommitRuntimeFactory(TxnId txnId, int datasetId, int[] primaryKeyFields, boolean isWriteTransaction, + int[] datasetPartitions, boolean isSink) { this.txnId = txnId; this.datasetId = datasetId; this.primaryKeyFields = primaryKeyFields; - this.isTemporaryDatasetWriteJob = isTemporaryDatasetWriteJob; this.isWriteTransaction = isWriteTransaction; this.datasetPartitions = datasetPartitions; this.isSink = isSink; @@ -58,8 +56,7 @@ public class CommitRuntimeFactory implements IPushRuntimeFactory { @Override public IPushRuntime createPushRuntime(IHyracksTaskContext ctx) throws HyracksDataException { IJobletEventListenerFactory fact = ctx.getJobletContext().getJobletEventListenerFactory(); - return new CommitRuntime(ctx, ((IJobEventListenerFactory) fact).getTxnId(txnId), datasetId, - primaryKeyFields, isTemporaryDatasetWriteJob, isWriteTransaction, - datasetPartitions[ctx.getTaskAttemptId().getTaskId().getPartition()], isSink); + return new CommitRuntime(ctx, ((IJobEventListenerFactory) fact).getTxnId(txnId), datasetId, primaryKeyFields, + isWriteTransaction, datasetPartitions[ctx.getTaskAttemptId().getTaskId().getPartition()], isSink); } }