Enhanced Insert AQL

The optional "as Variable" provides a variable binding for the inserted records
The optional "returning Query" allows users to run simple
queries/functions on the records returned by the insert, and can refer
to the variable bound in "as Variable"

Allow commits to be non-sink operators (contnue job pipeline after commit)

Additionally, this change makes small modifications to
the extension code to prepare for the BAD extension

Also made the OptimizerTests able to work for Extensions

Change-Id: I65789d2a861d15232dd29156a6987d0635ec6c94
Reviewed-on: https://asterix-gerrit.ics.uci.edu/1150
Reviewed-by: abdullah alamoudi <bamou...@gmail.com>
Tested-by: Jenkins <jenk...@fulliautomatix.ics.uci.edu>


Project: http://git-wip-us.apache.org/repos/asf/asterixdb/repo
Commit: http://git-wip-us.apache.org/repos/asf/asterixdb/commit/afa909a5
Tree: http://git-wip-us.apache.org/repos/asf/asterixdb/tree/afa909a5
Diff: http://git-wip-us.apache.org/repos/asf/asterixdb/diff/afa909a5

Branch: refs/heads/master
Commit: afa909a5794e1f24da4d619119ca2bc58f486959
Parents: e9b2adf
Author: Steven Glenn Jacobs <sjaco...@ucr.edu>
Authored: Sun Oct 16 10:35:30 2016 -0700
Committer: Steven Jacobs <sjaco...@ucr.edu>
Committed: Sun Oct 16 11:07:33 2016 -0700

----------------------------------------------------------------------
 asterixdb/asterix-active/pom.xml                |  18 ++
 .../active/IActiveEntityEventsListener.java     |   2 +-
 .../algebra/extension/IExtensionStatement.java  |   9 +-
 .../algebra/operators/CommitOperator.java       |  39 +++-
 .../operators/physical/CommitPOperator.java     |   6 +-
 .../operators/physical/CommitRuntime.java       |  74 +++---
 .../physical/CommitRuntimeFactory.java          |   8 +-
 .../operators/physical/UpsertCommitRuntime.java |  16 +-
 .../asterix/optimizer/base/RuleCollections.java |   4 +-
 .../rules/IntroduceAutogenerateIDRule.java      |  37 ++-
 .../rules/IntroduceDynamicTypeCastRule.java     |  42 ++--
 ...troduceRapidFrameFlushProjectAssignRule.java |   6 +-
 ...IntroduceSecondaryIndexInsertDeleteRule.java | 109 +++++----
 .../IntroduceStaticTypeCastForInsertRule.java   |  11 +-
 .../rules/ReplaceSinkOpWithCommitOpRule.java    | 166 -------------
 .../rules/SetupCommitExtensionOpRule.java       | 168 +++++++++++++
 .../SweepIllegalNonfunctionalFunctions.java     |   4 +-
 .../optimizer/rules/UnnestToDataScanRule.java   |   2 +-
 .../am/AbstractIntroduceAccessMethodRule.java   | 132 +++++------
 .../subplan/InlineAllNtsInSubplanVisitor.java   |   4 +-
 ...neLeftNtsInSubplanJoinFlatteningVisitor.java |   4 +-
 .../SubplanSpecialFlatteningCheckVisitor.java   |   4 +-
 .../asterix/translator/CompiledStatements.java  |  20 +-
 .../asterix/translator/IStatementExecutor.java  |   4 +-
 .../LangExpressionToPlanTranslator.java         | 198 +++++++++++-----
 .../asterix/api/http/servlet/FeedServlet.java   |  52 -----
 .../asterix/app/translator/QueryTranslator.java | 234 ++++++++++---------
 .../app/bootstrap/TestNodeController.java       |   2 +-
 .../asterix/test/optimizer/OptimizerTest.java   |  15 +-
 .../asterix/test/runtime/ExecutionTestUtil.java |  11 +-
 .../queries/insert-return-custom-result.aql     |  48 ++++
 .../results/insert-return-custom-result.plan    |  17 ++
 .../insert-return-records.1.ddl.aql             |  36 +++
 .../insert-return-records.3.query.aql           |  34 +++
 .../insert-returning-fieldname.1.ddl.aql        |  37 +++
 .../insert-returning-fieldname.3.query.aql      |  30 +++
 .../insert-with-bad-return.1.ddl.aql            |  36 +++
 .../insert-with-bad-return.3.query.aql          |  37 +++
 .../upsert-return-custom-result.1.ddl.aql       |  37 +++
 .../upsert-return-custom-result.3.query.aql     |  43 ++++
 .../insert-return-records.1.adm                 |   5 +
 .../insert-returning-fieldname.1.adm            |   1 +
 .../upsert-return-custom-result.1.adm           |   5 +
 .../src/test/resources/runtimets/testsuite.xml  |  21 ++
 .../asterix-doc/src/site/markdown/aql/manual.md |  10 +-
 .../api/IActiveLifecycleEventSubscriber.java    |  40 ++++
 .../feed/api/IFeedLifecycleEventSubscriber.java |  37 ---
 .../ActiveLifecycleEventSubscriber.java         |  67 ++++++
 .../feed/management/FeedEventsListener.java     |  32 +--
 .../FeedLifecycleEventSubscriber.java           |  66 ------
 .../external/feed/watch/FeedActivity.java       | 116 ---------
 .../feed/watch/FeedActivityDetails.java         |  27 +++
 .../aql/statement/SubscribeFeedStatement.java   |  12 +-
 .../asterix-lang-aql/src/main/javacc/AQL.jj     |  16 +-
 .../lang/common/statement/InsertStatement.java  |  37 ++-
 .../lang/common/statement/UpsertStatement.java  |   6 +-
 .../asterix-lang-sqlpp/src/main/javacc/SQLPP.jj |   2 +-
 .../org/apache/asterix/om/base/ADateTime.java   |  19 +-
 .../java/org/apache/asterix/om/base/AUUID.java  |  18 +-
 .../core/algebra/base/LogicalOperatorTag.java   |   2 +-
 .../AbstractDelegatedLogicalOperator.java       |  60 +++++
 .../AbstractExtensibleLogicalOperator.java      |  60 -----
 .../operators/logical/DelegateOperator.java     | 125 ++++++++++
 .../operators/logical/ExtensionOperator.java    | 124 ----------
 .../operators/logical/IOperatorDelegate.java    |  54 +++++
 .../operators/logical/IOperatorExtension.java   |  54 -----
 .../visitors/CardinalityInferenceVisitor.java   |   4 +-
 .../visitors/FDsAndEquivClassesVisitor.java     |   4 +-
 .../visitors/IsomorphismOperatorVisitor.java    |   8 +-
 .../IsomorphismVariableMappingVisitor.java      |   4 +-
 ...OperatorDeepCopyWithNewVariablesVisitor.java |   4 +-
 .../visitors/LogicalPropertiesVisitor.java      |   4 +-
 .../visitors/OperatorDeepCopyVisitor.java       |   6 +-
 .../visitors/PrimaryKeyVariablesVisitor.java    |   4 +-
 .../visitors/ProducedVariableVisitor.java       |   4 +-
 .../logical/visitors/SchemaVariableVisitor.java |   4 +-
 .../visitors/SubstituteVariableVisitor.java     |   4 +-
 .../logical/visitors/UsedVariableVisitor.java   |   4 +-
 .../LogicalOperatorPrettyPrintVisitor.java      |   4 +-
 .../visitors/ILogicalOperatorVisitor.java       |   4 +-
 ...placeNtsWithSubplanInputOperatorVisitor.java |   4 +-
 81 files changed, 1680 insertions(+), 1158 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/asterixdb/blob/afa909a5/asterixdb/asterix-active/pom.xml
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-active/pom.xml b/asterixdb/asterix-active/pom.xml
index 9f4e650..efba47e 100644
--- a/asterixdb/asterix-active/pom.xml
+++ b/asterixdb/asterix-active/pom.xml
@@ -1,3 +1,21 @@
+<!--
+ ! Licensed to the Apache Software Foundation (ASF) under one
+ ! or more contributor license agreements. See the NOTICE file
+ ! distributed with this work for additional information
+ ! regarding copyright ownership. The ASF licenses this file
+ ! to you under the Apache License, Version 2.0 (the
+ ! "License"); you may not use this file except in compliance
+ ! with the License. You may obtain a copy of the License at
+ !
+ ! http://www.apache.org/licenses/LICENSE-2.0
+ !
+ ! Unless required by applicable law or agreed to in writing,
+ ! software distributed under the License is distributed on an
+ ! "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ ! KIND, either express or implied.    See the License for the
+ ! specific language governing permissions and limitations
+ ! under the License.
+ !-->
 <project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; 
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
   <modelVersion>4.0.0</modelVersion>
   <parent>

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/afa909a5/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IActiveEntityEventsListener.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IActiveEntityEventsListener.java
 
b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IActiveEntityEventsListener.java
index 2dd9fe7..d0fb5e8 100644
--- 
a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IActiveEntityEventsListener.java
+++ 
b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IActiveEntityEventsListener.java
@@ -31,6 +31,6 @@ public interface IActiveEntityEventsListener {
 
     public EntityId getEntityId();
 
-    public boolean isEntityConnectedToDataset(String dataverseName, String 
datasetName);
+    public boolean isEntityUsingDataset(String dataverseName, String 
datasetName);
 
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/afa909a5/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/extension/IExtensionStatement.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/extension/IExtensionStatement.java
 
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/extension/IExtensionStatement.java
index e88962a..d15ae6f 100644
--- 
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/extension/IExtensionStatement.java
+++ 
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/extension/IExtensionStatement.java
@@ -21,8 +21,11 @@ package org.apache.asterix.algebra.extension;
 import org.apache.asterix.lang.common.base.Statement;
 import org.apache.asterix.metadata.declared.AqlMetadataProvider;
 import org.apache.asterix.translator.IStatementExecutor;
+import org.apache.asterix.translator.IStatementExecutor.ResultDelivery;
+import org.apache.asterix.translator.IStatementExecutor.Stats;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.api.client.IHyracksClientConnection;
+import org.apache.hyracks.api.dataset.IHyracksDataset;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 
 /**
@@ -36,15 +39,17 @@ public interface IExtensionStatement extends Statement {
     }
 
     /**
-     * Called when the {@code IQueryTranslator} encounters an extension 
statement.
+     * Called when the {@code IStatementExecutor} encounters an extension 
statement.
      * An implementation class should implement the actual processing of the 
statement in this method.
      *
      * @param queryTranslator
      * @param metadataProvider
      * @param statementExecutor
      * @param hcc
+     * @param resultSetIdCounter
      * @throws Exception
      */
     void handle(IStatementExecutor statementExecutor, AqlMetadataProvider 
metadataProvider,
-            IHyracksClientConnection hcc) throws HyracksDataException, 
AlgebricksException;
+            IHyracksClientConnection hcc, IHyracksDataset hdc, ResultDelivery 
resultDelivery, Stats stats,
+            int resultSetIdCounter) throws HyracksDataException, 
AlgebricksException;
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/afa909a5/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/CommitOperator.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/CommitOperator.java
 
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/CommitOperator.java
index 8dd6d33..1ee130c 100644
--- 
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/CommitOperator.java
+++ 
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/CommitOperator.java
@@ -19,23 +19,32 @@
 
 package org.apache.asterix.algebra.operators;
 
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
 
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
-import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractExtensibleLogicalOperator;
-import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.IOperatorExtension;
+import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractDelegatedLogicalOperator;
+import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.IOperatorDelegate;
 import 
org.apache.hyracks.algebricks.core.algebra.visitors.ILogicalExpressionReferenceTransform;
 
-public class CommitOperator extends AbstractExtensibleLogicalOperator {
+public class CommitOperator extends AbstractDelegatedLogicalOperator {
 
-    private final List<LogicalVariable> primaryKeyLogicalVars;
-    private final LogicalVariable upsertVar;
+    private List<LogicalVariable> primaryKeyLogicalVars;
+    private LogicalVariable upsertVar;
+    private boolean isSink;
 
-    public CommitOperator(List<LogicalVariable> primaryKeyLogicalVars, 
LogicalVariable upsertVar) {
+    public CommitOperator(boolean isSink) {
+        this.isSink = isSink;
+        this.upsertVar = null;
+        primaryKeyLogicalVars = new ArrayList<>();
+    }
+
+    public CommitOperator(List<LogicalVariable> primaryKeyLogicalVars, 
LogicalVariable upsertVar, boolean isSink) {
         this.primaryKeyLogicalVars = primaryKeyLogicalVars;
         this.upsertVar = upsertVar;
+        this.isSink = isSink;
     }
 
     @Override
@@ -44,9 +53,23 @@ public class CommitOperator extends 
AbstractExtensibleLogicalOperator {
         return false;
     }
 
+    public boolean isSink() {
+        return isSink;
+    }
+
+    public void setSink(boolean isSink) {
+        this.isSink = isSink;
+    }
+
+    //Provided for Extensions but not used by core
+    public void setVars(List<LogicalVariable> primaryKeyLogicalVars, 
LogicalVariable upsertVar) {
+        this.primaryKeyLogicalVars = primaryKeyLogicalVars;
+        this.upsertVar = upsertVar;
+    }
+
     @Override
-    public IOperatorExtension newInstance() {
-        return new CommitOperator(primaryKeyLogicalVars, upsertVar);
+    public IOperatorDelegate newInstance() {
+        return new CommitOperator(primaryKeyLogicalVars, upsertVar, isSink);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/afa909a5/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/CommitPOperator.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/CommitPOperator.java
 
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/CommitPOperator.java
index 1a26021..5a1c929 100644
--- 
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/CommitPOperator.java
+++ 
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/CommitPOperator.java
@@ -47,15 +47,17 @@ public class CommitPOperator extends 
AbstractPhysicalOperator {
     private final String dataverse;
     private final String dataset;
     private final LogicalVariable upsertVar;
+    private final boolean isSink;
 
     public CommitPOperator(JobId jobId, String dataverse, String dataset, int 
datasetId,
-            List<LogicalVariable> primaryKeyLogicalVars, LogicalVariable 
upsertVar) {
+            List<LogicalVariable> primaryKeyLogicalVars, LogicalVariable 
upsertVar, boolean isSink) {
         this.jobId = jobId;
         this.datasetId = datasetId;
         this.primaryKeyLogicalVars = primaryKeyLogicalVars;
         this.upsertVar = upsertVar;
         this.dataverse = dataverse;
         this.dataset = dataset;
+        this.isSink = isSink;
     }
 
     @Override
@@ -105,7 +107,7 @@ public class CommitPOperator extends 
AbstractPhysicalOperator {
         }
         runtime = new CommitRuntimeFactory(jobId, datasetId, primaryKeyFields,
                 metadataProvider.isTemporaryDatasetWriteJob(), 
metadataProvider.isWriteTransaction(), upsertVarIdx,
-                datasetPartitions);
+                datasetPartitions, isSink);
         builder.contributeMicroOperator(op, runtime, recDesc);
         ILogicalOperator src = op.getInputs().get(0).getValue();
         builder.contributeGraphEdge(src, 0, op, 0);

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/afa909a5/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/CommitRuntime.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/CommitRuntime.java
 
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/CommitRuntime.java
index c6c71f6..a1fa788 100644
--- 
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/CommitRuntime.java
+++ 
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/CommitRuntime.java
@@ -31,8 +31,7 @@ import org.apache.asterix.common.transactions.JobId;
 import org.apache.asterix.common.transactions.LogRecord;
 import org.apache.asterix.common.transactions.LogType;
 import org.apache.asterix.common.utils.TransactionUtil;
-import org.apache.hyracks.algebricks.runtime.base.IPushRuntime;
-import org.apache.hyracks.api.comm.IFrameWriter;
+import 
org.apache.hyracks.algebricks.runtime.operators.base.AbstractOneInputOneOutputOneFramePushRuntime;
 import org.apache.hyracks.api.comm.VSizeFrame;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
@@ -45,7 +44,7 @@ import 
org.apache.hyracks.dataflow.common.io.MessagingFrameTupleAppender;
 import org.apache.hyracks.dataflow.common.util.TaskUtils;
 import org.apache.hyracks.storage.am.bloomfilter.impls.MurmurHash128Bit;
 
-public class CommitRuntime implements IPushRuntime {
+public class CommitRuntime extends 
AbstractOneInputOneOutputOneFramePushRuntime {
 
     private final static long SEED = 0L;
 
@@ -57,27 +56,27 @@ public class CommitRuntime implements IPushRuntime {
     protected final boolean isTemporaryDatasetWriteJob;
     protected final boolean isWriteTransaction;
     protected final long[] longHashes;
-    protected final FrameTupleReference frameTupleReference;
     protected final IHyracksTaskContext ctx;
     protected final int resourcePartition;
     protected ITransactionContext transactionContext;
     protected LogRecord logRecord;
-    protected FrameTupleAccessor frameTupleAccessor;
+    protected final boolean isSink;
 
     public CommitRuntime(IHyracksTaskContext ctx, JobId jobId, int datasetId, 
int[] primaryKeyFields,
-            boolean isTemporaryDatasetWriteJob, boolean isWriteTransaction, 
int resourcePartition) {
+            boolean isTemporaryDatasetWriteJob, boolean isWriteTransaction, 
int resourcePartition, boolean isSink) {
         this.ctx = ctx;
-        IAsterixAppRuntimeContext runtimeCtx =
-                (IAsterixAppRuntimeContext) 
ctx.getJobletContext().getApplicationContext().getApplicationObject();
+        IAsterixAppRuntimeContext runtimeCtx = (IAsterixAppRuntimeContext) 
ctx.getJobletContext()
+                .getApplicationContext().getApplicationObject();
         this.transactionManager = 
runtimeCtx.getTransactionSubsystem().getTransactionManager();
         this.logMgr = runtimeCtx.getTransactionSubsystem().getLogManager();
         this.jobId = jobId;
         this.datasetId = datasetId;
         this.primaryKeyFields = primaryKeyFields;
-        this.frameTupleReference = new FrameTupleReference();
+        this.tRef = new FrameTupleReference();
         this.isTemporaryDatasetWriteJob = isTemporaryDatasetWriteJob;
         this.isWriteTransaction = isWriteTransaction;
         this.resourcePartition = resourcePartition;
+        this.isSink = isSink;
         longHashes = new long[2];
     }
 
@@ -86,9 +85,14 @@ public class CommitRuntime implements IPushRuntime {
         try {
             transactionContext = 
transactionManager.getTransactionContext(jobId, false);
             transactionContext.setWriteTxn(isWriteTransaction);
-            ILogMarkerCallback callback =
-                    TaskUtils.<ILogMarkerCallback> 
get(ILogMarkerCallback.KEY_MARKER_CALLBACK, ctx);
+            ILogMarkerCallback callback = TaskUtils.<ILogMarkerCallback> 
get(ILogMarkerCallback.KEY_MARKER_CALLBACK,
+                    ctx);
             logRecord = new LogRecord(callback);
+            if (isSink) {
+                return;
+            }
+            initAccessAppend(ctx);
+            writer.open();
         } catch (ACIDException e) {
             throw new HyracksDataException(e);
         }
@@ -96,26 +100,27 @@ public class CommitRuntime implements IPushRuntime {
 
     @Override
     public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
-        frameTupleAccessor.reset(buffer);
-        int nTuple = frameTupleAccessor.getTupleCount();
+        tAccess.reset(buffer);
+        int nTuple = tAccess.getTupleCount();
         for (int t = 0; t < nTuple; t++) {
             if (isTemporaryDatasetWriteJob) {
                 /**
-                 * This "if branch" is for writes over temporary datasets.
-                 * A temporary dataset does not require any lock and does not 
generate any write-ahead
-                 * update and commit log but generates flush log and job 
commit log.
-                 * However, a temporary dataset still MUST guarantee no-steal 
policy so that this
-                 * notification call should be delivered to 
PrimaryIndexOptracker and used correctly in order
-                 * to decrement number of active operation count of 
PrimaryIndexOptracker.
-                 * By maintaining the count correctly and only allowing 
flushing when the count is 0, it can
-                 * guarantee the no-steal policy for temporary datasets, too.
+                 * This "if branch" is for writes over temporary datasets. A 
temporary dataset does not require any lock
+                 * and does not generate any write-ahead update and commit log 
but generates flush log and job commit
+                 * log. However, a temporary dataset still MUST guarantee 
no-steal policy so that this notification call
+                 * should be delivered to PrimaryIndexOptracker and used 
correctly in order to decrement number of
+                 * active operation count of PrimaryIndexOptracker. By 
maintaining the count correctly and only allowing
+                 * flushing when the count is 0, it can guarantee the no-steal 
policy for temporary datasets, too.
                  */
                 transactionContext.notifyOptracker(false);
             } else {
-                frameTupleReference.reset(frameTupleAccessor, t);
+                tRef.reset(tAccess, t);
                 try {
                     formLogRecord(buffer, t);
                     logMgr.log(logRecord);
+                    if (!isSink) {
+                        appendTupleToFrame(t);
+                    }
                 } catch (ACIDException e) {
                     throw new HyracksDataException(e);
                 }
@@ -141,8 +146,8 @@ public class CommitRuntime implements IPushRuntime {
     }
 
     protected void formLogRecord(ByteBuffer buffer, int t) {
-        int pkHash = computePrimaryKeyHashValue(frameTupleReference, 
primaryKeyFields);
-        TransactionUtil.formEntityCommitLogRecord(logRecord, 
transactionContext, datasetId, pkHash, frameTupleReference,
+        int pkHash = computePrimaryKeyHashValue(tRef, primaryKeyFields);
+        TransactionUtil.formEntityCommitLogRecord(logRecord, 
transactionContext, datasetId, pkHash, tRef,
                 primaryKeyFields, resourcePartition, LogType.ENTITY_COMMIT);
     }
 
@@ -153,22 +158,27 @@ public class CommitRuntime implements IPushRuntime {
 
     @Override
     public void fail() throws HyracksDataException {
-
+        failed = true;
+        if (isSink) {
+            return;
+        }
+        writer.fail();
     }
 
     @Override
     public void close() throws HyracksDataException {
-
-    }
-
-    @Override
-    public void setFrameWriter(int index, IFrameWriter writer, 
RecordDescriptor recordDesc) {
-        throw new IllegalStateException();
+        if (isSink) {
+            return;
+        }
+        flushIfNotFailed();
+        writer.close();
+        appender.reset(frame, true);
     }
 
     @Override
     public void setInputRecordDescriptor(int index, RecordDescriptor 
recordDescriptor) {
-        this.frameTupleAccessor = new FrameTupleAccessor(recordDescriptor);
+        this.inputRecordDesc = recordDescriptor;
+        this.tAccess = new FrameTupleAccessor(inputRecordDesc);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/afa909a5/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/CommitRuntimeFactory.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/CommitRuntimeFactory.java
 
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/CommitRuntimeFactory.java
index 4f28b9d..9486a19 100644
--- 
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/CommitRuntimeFactory.java
+++ 
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/CommitRuntimeFactory.java
@@ -36,9 +36,10 @@ public class CommitRuntimeFactory implements 
IPushRuntimeFactory {
     private final boolean isWriteTransaction;
     private final int upsertVarIdx;
     private int[] datasetPartitions;
+    private final boolean isSink;
 
     public CommitRuntimeFactory(JobId jobId, int datasetId, int[] 
primaryKeyFields, boolean isTemporaryDatasetWriteJob,
-            boolean isWriteTransaction, int upsertVarIdx, int[] 
datasetPartitions) {
+            boolean isWriteTransaction, int upsertVarIdx, int[] 
datasetPartitions, boolean isSink) {
         this.jobId = jobId;
         this.datasetId = datasetId;
         this.primaryKeyFields = primaryKeyFields;
@@ -46,6 +47,7 @@ public class CommitRuntimeFactory implements 
IPushRuntimeFactory {
         this.isWriteTransaction = isWriteTransaction;
         this.upsertVarIdx = upsertVarIdx;
         this.datasetPartitions = datasetPartitions;
+        this.isSink = isSink;
     }
 
     @Override
@@ -58,10 +60,10 @@ public class CommitRuntimeFactory implements 
IPushRuntimeFactory {
         if (upsertVarIdx >= 0) {
             return new UpsertCommitRuntime(ctx, jobId, datasetId, 
primaryKeyFields, isTemporaryDatasetWriteJob,
                     isWriteTransaction, 
datasetPartitions[ctx.getTaskAttemptId().getTaskId().getPartition()],
-                    upsertVarIdx);
+                    upsertVarIdx, isSink);
         } else {
             return new CommitRuntime(ctx, jobId, datasetId, primaryKeyFields, 
isTemporaryDatasetWriteJob,
-                    isWriteTransaction, 
datasetPartitions[ctx.getTaskAttemptId().getTaskId().getPartition()]);
+                    isWriteTransaction, 
datasetPartitions[ctx.getTaskAttemptId().getTaskId().getPartition()], isSink);
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/afa909a5/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/UpsertCommitRuntime.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/UpsertCommitRuntime.java
 
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/UpsertCommitRuntime.java
index 7358700..53e0f62 100644
--- 
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/UpsertCommitRuntime.java
+++ 
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/UpsertCommitRuntime.java
@@ -30,25 +30,25 @@ public class UpsertCommitRuntime extends CommitRuntime {
     private final int upsertIdx;
 
     public UpsertCommitRuntime(IHyracksTaskContext ctx, JobId jobId, int 
datasetId, int[] primaryKeyFields,
-            boolean isTemporaryDatasetWriteJob, boolean isWriteTransaction, 
int resourcePartition, int upsertIdx) {
+            boolean isTemporaryDatasetWriteJob, boolean isWriteTransaction, 
int resourcePartition, int upsertIdx,
+            boolean isSink) {
         super(ctx, jobId, datasetId, primaryKeyFields, 
isTemporaryDatasetWriteJob, isWriteTransaction,
-                resourcePartition);
+                resourcePartition, isSink);
         this.upsertIdx = upsertIdx;
     }
 
     @Override
     protected void formLogRecord(ByteBuffer buffer, int t) {
-        boolean isNull = 
ABooleanSerializerDeserializer.getBoolean(buffer.array(),
-                frameTupleAccessor.getFieldSlotsLength() + 
frameTupleAccessor.getTupleStartOffset(t)
-                        + frameTupleAccessor.getFieldStartOffset(t, upsertIdx) 
+ 1);
+        boolean isNull = 
ABooleanSerializerDeserializer.getBoolean(buffer.array(), 
tAccess.getFieldSlotsLength()
+                + tAccess.getTupleStartOffset(t) + 
tAccess.getFieldStartOffset(t, upsertIdx) + 1);
         if (isNull) {
             // Previous record not found (insert)
             super.formLogRecord(buffer, t);
         } else {
             // Previous record found (delete + insert)
-            int pkHash = computePrimaryKeyHashValue(frameTupleReference, 
primaryKeyFields);
-            TransactionUtil.formEntityCommitLogRecord(logRecord, 
transactionContext, datasetId, pkHash,
-                    frameTupleReference, primaryKeyFields, resourcePartition, 
LogType.UPSERT_ENTITY_COMMIT);
+            int pkHash = computePrimaryKeyHashValue(tRef, primaryKeyFields);
+            TransactionUtil.formEntityCommitLogRecord(logRecord, 
transactionContext, datasetId, pkHash, tRef,
+                    primaryKeyFields, resourcePartition, 
LogType.UPSERT_ENTITY_COMMIT);
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/afa909a5/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/base/RuleCollections.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/base/RuleCollections.java
 
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/base/RuleCollections.java
index cd8d747..56a3bd0 100644
--- 
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/base/RuleCollections.java
+++ 
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/base/RuleCollections.java
@@ -66,7 +66,7 @@ import 
org.apache.asterix.optimizer.rules.RemoveRedundantListifyRule;
 import org.apache.asterix.optimizer.rules.RemoveRedundantSelectRule;
 import org.apache.asterix.optimizer.rules.RemoveSortInFeedIngestionRule;
 import org.apache.asterix.optimizer.rules.RemoveUnusedOneToOneEquiJoinRule;
-import org.apache.asterix.optimizer.rules.ReplaceSinkOpWithCommitOpRule;
+import org.apache.asterix.optimizer.rules.SetupCommitExtensionOpRule;
 import org.apache.asterix.optimizer.rules.ResolveVariableRule;
 import org.apache.asterix.optimizer.rules.SetAsterixPhysicalOperatorsRule;
 import org.apache.asterix.optimizer.rules.SetClosedRecordConstructorsRule;
@@ -317,7 +317,7 @@ public final class RuleCollections {
         List<IAlgebraicRewriteRule> physicalRewritesAllLevels = new 
LinkedList<>();
         physicalRewritesAllLevels.add(new PullSelectOutOfEqJoin());
         //Turned off the following rule for now not to change OptimizerTest 
results.
-        physicalRewritesAllLevels.add(new ReplaceSinkOpWithCommitOpRule());
+        physicalRewritesAllLevels.add(new SetupCommitExtensionOpRule());
         physicalRewritesAllLevels.add(new 
SetAlgebricksPhysicalOperatorsRule());
         physicalRewritesAllLevels.add(new SetAsterixPhysicalOperatorsRule());
         physicalRewritesAllLevels.add(new 
AddEquivalenceClassForRecordConstructorRule());

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/afa909a5/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/IntroduceAutogenerateIDRule.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/IntroduceAutogenerateIDRule.java
 
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/IntroduceAutogenerateIDRule.java
index fee1c96..c8d2760 100644
--- 
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/IntroduceAutogenerateIDRule.java
+++ 
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/IntroduceAutogenerateIDRule.java
@@ -18,9 +18,11 @@
  */
 package org.apache.asterix.optimizer.rules;
 
+import java.util.ArrayDeque;
 import java.util.ArrayList;
 import java.util.List;
 
+import org.apache.asterix.algebra.operators.CommitOperator;
 import org.apache.asterix.lang.common.util.FunctionUtil;
 import org.apache.asterix.metadata.declared.AqlDataSource;
 import org.apache.asterix.metadata.declared.AqlDataSource.AqlDataSourceType;
@@ -44,6 +46,7 @@ import 
org.apache.hyracks.algebricks.core.algebra.expressions.VariableReferenceE
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.AssignOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.DataSourceScanOperator;
+import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.DelegateOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.InsertDeleteUpsertOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.InsertDeleteUpsertOperator.Kind;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.ProjectOperator;
@@ -62,13 +65,35 @@ public class IntroduceAutogenerateIDRule implements 
IAlgebraicRewriteRule {
     public boolean rewritePost(Mutable<ILogicalOperator> opRef, 
IOptimizationContext context)
             throws AlgebricksException {
 
-        // match: [insert to internal dataset with autogenerated id] - assign 
- project
+        // match: commit OR distribute-result OR SINK - ... followed by:
+        // [insert to internal dataset with autogenerated id] - assign - 
project
         // produce: insert - assign - assign* - project
         // **
         // OR [insert to internal dataset with autogenerated id] - assign - 
[datasource scan]
         // produce insert - assign - assign* - datasource scan
 
         AbstractLogicalOperator currentOp = (AbstractLogicalOperator) 
opRef.getValue();
+        if (currentOp.getOperatorTag() == 
LogicalOperatorTag.DELEGATE_OPERATOR) {
+            DelegateOperator dOp = (DelegateOperator) currentOp;
+            if (!(dOp.getDelegate() instanceof CommitOperator)) {
+                return false;
+            } else if (!((CommitOperator) dOp.getDelegate()).isSink()) {
+                return false;
+            }
+
+        } else if (currentOp.getOperatorTag() != 
LogicalOperatorTag.DISTRIBUTE_RESULT
+                && currentOp.getOperatorTag() != LogicalOperatorTag.SINK) {
+            return false;
+        }
+        ArrayDeque<AbstractLogicalOperator> opStack = new ArrayDeque<>();
+        opStack.push(currentOp);
+        while (currentOp.getInputs().size() == 1) {
+            currentOp = (AbstractLogicalOperator) 
currentOp.getInputs().get(0).getValue();
+            if (currentOp.getOperatorTag() == 
LogicalOperatorTag.INSERT_DELETE_UPSERT) {
+                break;
+            }
+            opStack.push(currentOp);
+        }
         if (currentOp.getOperatorTag() != 
LogicalOperatorTag.INSERT_DELETE_UPSERT) {
             return false;
         }
@@ -95,7 +120,8 @@ public class IntroduceAutogenerateIDRule implements 
IAlgebraicRewriteRule {
         AssignOperator assignOp = (AssignOperator) parentOp;
         LogicalVariable inputRecord;
 
-        //bug here. will not work for internal datasets with filters since the 
pattern becomes [project-assign-assign-insert] <-this should be fixed->
+        //TODO: bug here. will not work for internal datasets with filters 
since the pattern becomes 
+        //[project-assign-assign-insert]
         AbstractLogicalOperator grandparentOp = (AbstractLogicalOperator) 
parentOp.getInputs().get(0).getValue();
         if (grandparentOp.getOperatorTag() == LogicalOperatorTag.PROJECT) {
             ProjectOperator projectOp = (ProjectOperator) grandparentOp;
@@ -122,7 +148,12 @@ public class IntroduceAutogenerateIDRule implements 
IAlgebraicRewriteRule {
         VariableUtilities.substituteVariables(insertOp, inputRecord, v, 
context);
         context.computeAndSetTypeEnvironmentForOperator(newAssign);
         context.computeAndSetTypeEnvironmentForOperator(assignOp);
-        context.computeAndSetTypeEnvironmentForOperator(insertOp);
+        context.computeAndSetTypeEnvironmentForOperator(insertOp);;
+        for (AbstractLogicalOperator op : opStack) {
+            VariableUtilities.substituteVariables(op, inputRecord, v, context);
+            context.computeAndSetTypeEnvironmentForOperator(op);
+        }
+
         return true;
     }
 

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/afa909a5/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/IntroduceDynamicTypeCastRule.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/IntroduceDynamicTypeCastRule.java
 
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/IntroduceDynamicTypeCastRule.java
index f6cd015..28bcc7f 100644
--- 
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/IntroduceDynamicTypeCastRule.java
+++ 
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/IntroduceDynamicTypeCastRule.java
@@ -22,6 +22,7 @@ package org.apache.asterix.optimizer.rules;
 import java.util.ArrayList;
 import java.util.List;
 
+import org.apache.asterix.algebra.operators.CommitOperator;
 import org.apache.asterix.lang.common.util.FunctionUtil;
 import org.apache.asterix.metadata.declared.AqlDataSource;
 import org.apache.asterix.om.functions.AsterixBuiltinFunctions;
@@ -46,6 +47,8 @@ import 
org.apache.hyracks.algebricks.core.algebra.expressions.VariableReferenceE
 import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.AssignOperator;
+import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.DelegateOperator;
+import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.DistributeResultOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.InsertDeleteUpsertOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.visitors.VariableUtilities;
 import org.apache.hyracks.algebricks.core.rewriter.base.IAlgebraicRewriteRule;
@@ -90,11 +93,19 @@ public class IntroduceDynamicTypeCastRule implements 
IAlgebraicRewriteRule {
         // We identify INSERT and DISTRIBUTE_RESULT operators.
         AbstractLogicalOperator op1 = (AbstractLogicalOperator) 
opRef.getValue();
         switch (op1.getOperatorTag()) {
-            case SINK: {
+            case SINK:
+            case DELEGATE_OPERATOR: {
                 /**
-                 * pattern match: sink insert assign
-                 * resulting plan: sink-insert-project-assign
+                 * pattern match: commit insert assign
+                 * resulting plan: commit-insert-project-assign
                  */
+                if (op1.getOperatorTag() == 
LogicalOperatorTag.DELEGATE_OPERATOR) {
+                    DelegateOperator eOp = (DelegateOperator) op1;
+                    if (!(eOp.getDelegate() instanceof CommitOperator)) {
+                        return false;
+                    }
+                }
+
                 AbstractLogicalOperator op2 = (AbstractLogicalOperator) 
op1.getInputs().get(0).getValue();
                 if (op2.getOperatorTag() == 
LogicalOperatorTag.INSERT_DELETE_UPSERT) {
                     InsertDeleteUpsertOperator insertDeleteOp = 
(InsertDeleteUpsertOperator) op2;
@@ -131,21 +142,8 @@ public class IntroduceDynamicTypeCastRule implements 
IAlgebraicRewriteRule {
                 // Remember this is the operator we need to modify
                 op = op1;
 
-                // The Variable we want is the (hopefully singular, hopefully 
record-typed) live variable
-                // of the singular input operator of the DISTRIBUTE_RESULT
-                if (op.getInputs().size() > 1) {
-                    // Hopefully not possible?
-                    throw new AlgebricksException(
-                            "output-record-type defined for expression with 
multiple input operators");
-                }
-                AbstractLogicalOperator input = (AbstractLogicalOperator) 
op.getInputs().get(0).getValue();
-                List<LogicalVariable> liveVars = new ArrayList<>();
-                VariableUtilities.getLiveVariables(input, liveVars);
-                if (liveVars.size() > 1) {
-                    throw new AlgebricksException(
-                            "Expression with multiple fields cannot be cast to 
output-record-type!");
-                }
-                recordVar = liveVars.get(0);
+                recordVar = ((VariableReferenceExpression) 
((DistributeResultOperator) op).getExpressions().get(0)
+                        .getValue()).getVariableReference();
                 break;
             }
             default: {
@@ -210,15 +208,15 @@ public class IntroduceDynamicTypeCastRule implements 
IAlgebraicRewriteRule {
                 if (var.equals(recordVar)) {
                     /** insert an assign operator to call the function 
on-top-of the variable */
                     IAType actualType = (IAType) env.getVarType(var);
-                    AbstractFunctionCallExpression cast =
-                            new 
ScalarFunctionCallExpression(FunctionUtil.getFunctionInfo(fd));
+                    AbstractFunctionCallExpression cast = new 
ScalarFunctionCallExpression(
+                            FunctionUtil.getFunctionInfo(fd));
                     cast.getArguments()
                             .add(new MutableObject<ILogicalExpression>(new 
VariableReferenceExpression(var)));
                     /** enforce the required record type */
                     TypeCastUtils.setRequiredAndInputTypes(cast, 
requiredRecordType, actualType);
                     LogicalVariable newAssignVar = context.newVar();
-                    AssignOperator newAssignOperator =
-                            new AssignOperator(newAssignVar, new 
MutableObject<ILogicalExpression>(cast));
+                    AssignOperator newAssignOperator = new 
AssignOperator(newAssignVar,
+                            new MutableObject<ILogicalExpression>(cast));
                     newAssignOperator.getInputs().add(new 
MutableObject<ILogicalOperator>(op));
                     opRef.setValue(newAssignOperator);
                     
context.computeAndSetTypeEnvironmentForOperator(newAssignOperator);

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/afa909a5/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/IntroduceRapidFrameFlushProjectAssignRule.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/IntroduceRapidFrameFlushProjectAssignRule.java
 
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/IntroduceRapidFrameFlushProjectAssignRule.java
index a8368a7..f655b24 100644
--- 
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/IntroduceRapidFrameFlushProjectAssignRule.java
+++ 
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/IntroduceRapidFrameFlushProjectAssignRule.java
@@ -27,7 +27,7 @@ import 
org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext;
 import org.apache.hyracks.algebricks.core.algebra.base.LogicalOperatorTag;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.AssignOperator;
-import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.ExtensionOperator;
+import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.DelegateOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.ProjectOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.physical.AssignPOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.physical.StreamProjectPOperator;
@@ -50,10 +50,10 @@ public class IntroduceRapidFrameFlushProjectAssignRule 
implements IAlgebraicRewr
     }
 
     private boolean checkIfRuleIsApplicable(AbstractLogicalOperator op) {
-        if (op.getOperatorTag() != LogicalOperatorTag.EXTENSION_OPERATOR) {
+        if (op.getOperatorTag() != LogicalOperatorTag.DELEGATE_OPERATOR) {
             return false;
         }
-        ExtensionOperator extensionOp = (ExtensionOperator) op;
+        DelegateOperator extensionOp = (DelegateOperator) op;
         if (!(extensionOp.getDelegate() instanceof CommitOperator)) {
             return false;
         }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/afa909a5/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/IntroduceSecondaryIndexInsertDeleteRule.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/IntroduceSecondaryIndexInsertDeleteRule.java
 
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/IntroduceSecondaryIndexInsertDeleteRule.java
index c487a96..8362dd7 100644
--- 
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/IntroduceSecondaryIndexInsertDeleteRule.java
+++ 
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/IntroduceSecondaryIndexInsertDeleteRule.java
@@ -24,6 +24,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
+import org.apache.asterix.algebra.operators.CommitOperator;
 import org.apache.asterix.common.config.DatasetConfig.DatasetType;
 import org.apache.asterix.common.config.DatasetConfig.IndexType;
 import org.apache.asterix.common.exceptions.AsterixException;
@@ -66,6 +67,7 @@ import 
org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator.ExecutionMode;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.AssignOperator;
+import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.DelegateOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.IndexInsertDeleteUpsertOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.InsertDeleteUpsertOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.InsertDeleteUpsertOperator.Kind;
@@ -90,20 +92,28 @@ public class IntroduceSecondaryIndexInsertDeleteRule 
implements IAlgebraicRewrit
     @Override
     public boolean rewritePost(Mutable<ILogicalOperator> opRef, 
IOptimizationContext context)
             throws AlgebricksException {
-        AbstractLogicalOperator sinkOp = (AbstractLogicalOperator) 
opRef.getValue();
-        if (sinkOp.getOperatorTag() != LogicalOperatorTag.SINK) {
+        AbstractLogicalOperator op0 = (AbstractLogicalOperator) 
opRef.getValue();
+        if (op0.getOperatorTag() != LogicalOperatorTag.DELEGATE_OPERATOR
+                && op0.getOperatorTag() != LogicalOperatorTag.SINK) {
             return false;
         }
-        if (sinkOp.getInputs().get(0).getValue().getOperatorTag() != 
LogicalOperatorTag.INSERT_DELETE_UPSERT) {
+        if (op0.getOperatorTag() == LogicalOperatorTag.DELEGATE_OPERATOR) {
+            DelegateOperator eOp = (DelegateOperator) op0;
+            if (!(eOp.getDelegate() instanceof CommitOperator)) {
+                return false;
+            }
+        }
+        AbstractLogicalOperator op1 = (AbstractLogicalOperator) 
op0.getInputs().get(0).getValue();
+        if (op1.getOperatorTag() != LogicalOperatorTag.INSERT_DELETE_UPSERT) {
             return false;
         }
         /** find the record variable */
-        InsertDeleteUpsertOperator primaryIndexModificationOp =
-                (InsertDeleteUpsertOperator) 
sinkOp.getInputs().get(0).getValue();
+        InsertDeleteUpsertOperator primaryIndexModificationOp = 
(InsertDeleteUpsertOperator) op0.getInputs().get(0)
+                .getValue();
         boolean isBulkload = primaryIndexModificationOp.isBulkload();
         ILogicalExpression newRecordExpr = 
primaryIndexModificationOp.getPayloadExpression().getValue();
-        List<Mutable<ILogicalExpression>> newMetaExprs =
-                
primaryIndexModificationOp.getAdditionalNonFilteringExpressions();
+        List<Mutable<ILogicalExpression>> newMetaExprs = 
primaryIndexModificationOp
+                .getAdditionalNonFilteringExpressions();
         LogicalVariable newRecordVar;
         LogicalVariable newMetaVar = null;
 
@@ -111,9 +121,8 @@ public class IntroduceSecondaryIndexInsertDeleteRule 
implements IAlgebraicRewrit
          * inputOp is the assign operator which extracts primary keys from the 
input
          * variables (record or meta)
          */
-        AbstractLogicalOperator inputOp =
-                (AbstractLogicalOperator) 
primaryIndexModificationOp.getInputs().get(0).getValue();
-
+        AbstractLogicalOperator inputOp = (AbstractLogicalOperator) 
primaryIndexModificationOp.getInputs().get(0)
+                .getValue();
         newRecordVar = getRecordVar(context, inputOp, newRecordExpr, 0);
         if (newMetaExprs != null && !newMetaExprs.isEmpty()) {
             if (newMetaExprs.size() > 1) {
@@ -168,7 +177,7 @@ public class IntroduceSecondaryIndexInsertDeleteRule 
implements IAlgebraicRewrit
         // At this point, we have the data type info, and the indexes info as 
well
         int secondaryIndexTotalCnt = indexes.size() - 1;
         if (secondaryIndexTotalCnt > 0) {
-            sinkOp.getInputs().clear();
+            op0.getInputs().clear();
         } else {
             return false;
         }
@@ -226,8 +235,8 @@ public class IntroduceSecondaryIndexInsertDeleteRule 
implements IAlgebraicRewrit
                      * is solved
                      */
                     || primaryIndexModificationOp.getOperation() == 
Kind.DELETE) {
-                injectFieldAccessesForIndexes(context, dataset, indexes, 
fieldVarsForNewRecord, recType,
-                        metaType, newRecordVar, newMetaVar, 
primaryIndexModificationOp, false);
+                injectFieldAccessesForIndexes(context, dataset, indexes, 
fieldVarsForNewRecord, recType, metaType,
+                        newRecordVar, newMetaVar, primaryIndexModificationOp, 
false);
                 if (replicateOp != null) {
                     
context.computeAndSetTypeEnvironmentForOperator(replicateOp);
                 }
@@ -237,13 +246,12 @@ public class IntroduceSecondaryIndexInsertDeleteRule 
implements IAlgebraicRewrit
              * https://issues.apache.org/jira/browse/ASTERIXDB-1507
              * is solved
              */) {
-                List<LogicalVariable> beforeOpMetaVars =
-                        
primaryIndexModificationOp.getBeforeOpAdditionalNonFilteringVars();
+                List<LogicalVariable> beforeOpMetaVars = 
primaryIndexModificationOp
+                        .getBeforeOpAdditionalNonFilteringVars();
                 LogicalVariable beforeOpMetaVar = beforeOpMetaVars == null ? 
null : beforeOpMetaVars.get(0);
-                currentTop =
-                        injectFieldAccessesForIndexes(context, dataset, 
indexes, fieldVarsForBeforeOperation, recType,
-                                metaType, 
primaryIndexModificationOp.getBeforeOpRecordVar(), beforeOpMetaVar,
-                                currentTop, true);
+                currentTop = injectFieldAccessesForIndexes(context, dataset, 
indexes, fieldVarsForBeforeOperation,
+                        recType, metaType, 
primaryIndexModificationOp.getBeforeOpRecordVar(), beforeOpMetaVar,
+                        currentTop, true);
             }
         } catch (AsterixException e) {
             throw new AlgebricksException(e);
@@ -264,12 +272,11 @@ public class IntroduceSecondaryIndexInsertDeleteRule 
implements IAlgebraicRewrit
             ILogicalOperator replicateOutput;
 
             for (int i = 0; i < secondaryKeyFields.size(); i++) {
-                IndexFieldId indexFieldId =
-                        new 
IndexFieldId(index.getKeyFieldSourceIndicators().get(i), 
secondaryKeyFields.get(i));
+                IndexFieldId indexFieldId = new 
IndexFieldId(index.getKeyFieldSourceIndicators().get(i),
+                        secondaryKeyFields.get(i));
                 LogicalVariable skVar = 
fieldVarsForNewRecord.get(indexFieldId);
                 secondaryKeyVars.add(skVar);
-                secondaryExpressions.add(new MutableObject<ILogicalExpression>(
-                        new VariableReferenceExpression(skVar)));
+                secondaryExpressions.add(new 
MutableObject<ILogicalExpression>(new VariableReferenceExpression(skVar)));
                 if (primaryIndexModificationOp.getOperation() == Kind.UPSERT) {
                     beforeOpSecondaryExpressions.add(new 
MutableObject<ILogicalExpression>(
                             new 
VariableReferenceExpression(fieldVarsForBeforeOperation.get(indexFieldId))));
@@ -279,10 +286,10 @@ public class IntroduceSecondaryIndexInsertDeleteRule 
implements IAlgebraicRewrit
             IndexInsertDeleteUpsertOperator indexUpdate;
             if (index.getIndexType() != IndexType.RTREE) {
                 // Create an expression per key
-                Mutable<ILogicalExpression> filterExpression =
-                        (primaryIndexModificationOp.getOperation() == 
Kind.UPSERT) ? null
-                                : createFilterExpression(secondaryKeyVars,
-                                        
context.getOutputTypeEnvironment(currentTop), index.isEnforcingKeyFileds());
+                Mutable<ILogicalExpression> filterExpression = 
(primaryIndexModificationOp
+                        .getOperation() == Kind.UPSERT) ? null
+                                : createFilterExpression(secondaryKeyVars, 
context.getOutputTypeEnvironment(currentTop),
+                                        index.isEnforcingKeyFileds());
                 AqlIndex dataSourceIndex = new AqlIndex(index, dataverseName, 
datasetName, mp);
 
                 // Introduce the TokenizeOperator only when doing bulk-load,
@@ -311,8 +318,7 @@ public class IntroduceSecondaryIndexInsertDeleteRule 
implements IAlgebraicRewrit
                     // Check the field type of the secondary key.
                     IAType secondaryKeyType;
                     Pair<IAType, Boolean> keyPairType = 
Index.getNonNullableOpenFieldType(
-                            index.getKeyFieldTypes().get(0), 
secondaryKeyFields.get(0),
-                            recType);
+                            index.getKeyFieldTypes().get(0), 
secondaryKeyFields.get(0), recType);
                     secondaryKeyType = keyPairType.first;
 
                     List<Object> varTypes = new ArrayList<>();
@@ -333,8 +339,7 @@ public class IntroduceSecondaryIndexInsertDeleteRule 
implements IAlgebraicRewrit
                     // TokenizeOperator to tokenize [SK, PK] pairs
                     TokenizeOperator tokenUpdate = new 
TokenizeOperator(dataSourceIndex,
                             
primaryIndexModificationOp.getPrimaryKeyExpressions(), secondaryExpressions,
-                            tokenizeKeyVars,
-                            filterExpression, 
primaryIndexModificationOp.getOperation(),
+                            tokenizeKeyVars, filterExpression, 
primaryIndexModificationOp.getOperation(),
                             primaryIndexModificationOp.isBulkload(), 
isPartitioned, varTypes);
                     tokenUpdate.getInputs().add(new 
MutableObject<ILogicalOperator>(currentTop));
                     
context.computeAndSetTypeEnvironmentForOperator(tokenUpdate);
@@ -350,8 +355,8 @@ public class IntroduceSecondaryIndexInsertDeleteRule 
implements IAlgebraicRewrit
                     // When TokenizeOperator is not needed
                     indexUpdate = new 
IndexInsertDeleteUpsertOperator(dataSourceIndex,
                             
primaryIndexModificationOp.getPrimaryKeyExpressions(), secondaryExpressions,
-                            filterExpression,
-                            primaryIndexModificationOp.getOperation(), 
primaryIndexModificationOp.isBulkload(),
+                            filterExpression, 
primaryIndexModificationOp.getOperation(),
+                            primaryIndexModificationOp.isBulkload(),
                             
primaryIndexModificationOp.getAdditionalNonFilteringExpressions() == null ? 0
                                     : 
primaryIndexModificationOp.getAdditionalNonFilteringExpressions().size());
                     
indexUpdate.setAdditionalFilteringExpressions(filteringExpressions);
@@ -360,8 +365,8 @@ public class IntroduceSecondaryIndexInsertDeleteRule 
implements IAlgebraicRewrit
                     if (primaryIndexModificationOp.getOperation() == 
Kind.UPSERT) {
                         
indexUpdate.setBeforeOpSecondaryKeyExprs(beforeOpSecondaryExpressions);
                         if (filteringFields != null) {
-                            
indexUpdate.setBeforeOpAdditionalFilteringExpression(new 
MutableObject<ILogicalExpression>(
-                                    new VariableReferenceExpression(
+                            
indexUpdate.setBeforeOpAdditionalFilteringExpression(
+                                    new MutableObject<ILogicalExpression>(new 
VariableReferenceExpression(
                                             
primaryIndexModificationOp.getBeforeOpFilterVar())));
                         }
                     }
@@ -472,7 +477,7 @@ public class IntroduceSecondaryIndexInsertDeleteRule 
implements IAlgebraicRewrit
             }
             if (primaryIndexModificationOp.isBulkload()) {
                 // For bulk load, we connect all fanned out insert operator to 
a single SINK operator
-                sinkOp.getInputs().add(new 
MutableObject<ILogicalOperator>(indexUpdate));
+                op0.getInputs().add(new 
MutableObject<ILogicalOperator>(indexUpdate));
             }
 
         }
@@ -483,16 +488,15 @@ public class IntroduceSecondaryIndexInsertDeleteRule 
implements IAlgebraicRewrit
         if (!primaryIndexModificationOp.isBulkload()) {
             // If this is an upsert, we need to
             // Remove the current input to the SINK operator (It is actually 
already removed above)
-            sinkOp.getInputs().clear();
+            op0.getInputs().clear();
             // Connect the last index update to the SINK
-            sinkOp.getInputs().add(new 
MutableObject<ILogicalOperator>(currentTop));
+            op0.getInputs().add(new 
MutableObject<ILogicalOperator>(currentTop));
         }
         return true;
     }
 
     private LogicalVariable getRecordVar(IOptimizationContext context, 
AbstractLogicalOperator inputOp,
-            ILogicalExpression recordExpr,
-            int expectedRecordIndex) throws AlgebricksException {
+            ILogicalExpression recordExpr, int expectedRecordIndex) throws 
AlgebricksException {
         if (exprIsRecord(context.getOutputTypeEnvironment(inputOp), 
recordExpr)) {
             return ((VariableReferenceExpression) 
recordExpr).getVariableReference();
         } else {
@@ -554,12 +558,10 @@ public class IntroduceSecondaryIndexInsertDeleteRule 
implements IAlgebraicRewrit
                 ARecordType sourceType = dataset.hasMetaPart()
                         ? indicators.get(i).intValue() == 
Index.RECORD_INDICATOR ? recType : metaType : recType;
                 LogicalVariable sourceVar = dataset.hasMetaPart()
-                        ? indicators.get(i).intValue() == 
Index.RECORD_INDICATOR ? recordVar : metaVar
-                        : recordVar;
+                        ? indicators.get(i).intValue() == 
Index.RECORD_INDICATOR ? recordVar : metaVar : recordVar;
                 LogicalVariable fieldVar = context.newVar();
                 // create record variable ref
-                Mutable<ILogicalExpression> varRef =
-                        new MutableObject<>(new 
VariableReferenceExpression(sourceVar));
+                Mutable<ILogicalExpression> varRef = new MutableObject<>(new 
VariableReferenceExpression(sourceVar));
                 IAType fieldType = 
sourceType.getSubFieldType(indexFieldId.fieldName);
                 AbstractFunctionCallExpression theFieldAccessFunc;
                 if (fieldType == null) {
@@ -567,24 +569,22 @@ public class IntroduceSecondaryIndexInsertDeleteRule 
implements IAlgebraicRewrit
                     // make handling of records with incorrect value type for 
this field easier and cleaner
                     context.addNotToBeInlinedVar(fieldVar);
                     // create field access
-                    AbstractFunctionCallExpression fieldAccessFunc =
-                            getOpenOrNestedFieldAccessFunction(varRef, 
indexFieldId.fieldName);
+                    AbstractFunctionCallExpression fieldAccessFunc = 
getOpenOrNestedFieldAccessFunction(varRef,
+                            indexFieldId.fieldName);
                     // create cast
                     theFieldAccessFunc = new ScalarFunctionCallExpression(
                             
FunctionUtil.getFunctionInfo(AsterixBuiltinFunctions.CAST_TYPE));
                     // The first argument is the field
-                    theFieldAccessFunc.getArguments()
-                            .add(new 
MutableObject<ILogicalExpression>(fieldAccessFunc));
-                    TypeCastUtils.setRequiredAndInputTypes(theFieldAccessFunc, 
skTypes.get(i),
-                            BuiltinType.ANY);
+                    theFieldAccessFunc.getArguments().add(new 
MutableObject<ILogicalExpression>(fieldAccessFunc));
+                    TypeCastUtils.setRequiredAndInputTypes(theFieldAccessFunc, 
skTypes.get(i), BuiltinType.ANY);
                 } else {
                     // Get the desired field position
                     int pos = indexFieldId.fieldName.size() > 1 ? -1
                             : 
sourceType.getFieldIndex(indexFieldId.fieldName.get(0));
                     // Field not found --> This is either an open field or a 
nested field. it can't be accessed by index
-                    theFieldAccessFunc =
-                            (pos == -1) ? 
getOpenOrNestedFieldAccessFunction(varRef, indexFieldId.fieldName)
-                                    : getClosedFieldAccessFunction(varRef, 
pos);
+                    theFieldAccessFunc = (pos == -1)
+                            ? getOpenOrNestedFieldAccessFunction(varRef, 
indexFieldId.fieldName)
+                            : getClosedFieldAccessFunction(varRef, pos);
                 }
                 vars.add(fieldVar);
                 exprs.add(new 
MutableObject<ILogicalExpression>(theFieldAccessFunc));
@@ -648,8 +648,7 @@ public class IntroduceSecondaryIndexInsertDeleteRule 
implements IAlgebraicRewrit
     }
 
     private static Mutable<ILogicalExpression> 
constantToMutableLogicalExpression(IAObject constantObject) {
-        return new MutableObject<>(
-                new ConstantExpression(new 
AsterixConstantValue(constantObject)));
+        return new MutableObject<>(new ConstantExpression(new 
AsterixConstantValue(constantObject)));
     }
 
     private Mutable<ILogicalExpression> 
createFilterExpression(List<LogicalVariable> secondaryKeyVars,

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/afa909a5/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/IntroduceStaticTypeCastForInsertRule.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/IntroduceStaticTypeCastForInsertRule.java
 
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/IntroduceStaticTypeCastForInsertRule.java
index 89280be..2eaad98 100644
--- 
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/IntroduceStaticTypeCastForInsertRule.java
+++ 
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/IntroduceStaticTypeCastForInsertRule.java
@@ -22,6 +22,7 @@ package org.apache.asterix.optimizer.rules;
 import java.util.ArrayList;
 import java.util.List;
 
+import org.apache.asterix.algebra.operators.CommitOperator;
 import org.apache.asterix.metadata.declared.AqlDataSource;
 import org.apache.asterix.om.typecomputer.base.TypeCastUtils;
 import org.apache.asterix.om.types.IAType;
@@ -38,6 +39,7 @@ import 
org.apache.hyracks.algebricks.core.algebra.expressions.AbstractFunctionCa
 import 
org.apache.hyracks.algebricks.core.algebra.expressions.IVariableTypeEnvironment;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.AssignOperator;
+import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.DelegateOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.InsertDeleteUpsertOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.visitors.VariableUtilities;
 import org.apache.hyracks.algebricks.core.rewriter.base.IAlgebraicRewriteRule;
@@ -87,9 +89,16 @@ public class IntroduceStaticTypeCastForInsertRule implements 
IAlgebraicRewriteRu
         List<LogicalVariable> producedVariables = new 
ArrayList<LogicalVariable>();
         LogicalVariable oldRecordVariable;
 
-        if (op1.getOperatorTag() != LogicalOperatorTag.SINK) {
+        if (op1.getOperatorTag() != LogicalOperatorTag.DELEGATE_OPERATOR
+                && op1.getOperatorTag() != LogicalOperatorTag.SINK) {
             return false;
         }
+        if (op1.getOperatorTag() == LogicalOperatorTag.DELEGATE_OPERATOR) {
+            DelegateOperator eOp = (DelegateOperator) op1;
+            if (!(eOp.getDelegate() instanceof CommitOperator)) {
+                return false;
+            }
+        }
         AbstractLogicalOperator op2 = (AbstractLogicalOperator) 
op1.getInputs().get(0).getValue();
         if (op2.getOperatorTag() != LogicalOperatorTag.INSERT_DELETE_UPSERT) {
             return false;

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/afa909a5/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/ReplaceSinkOpWithCommitOpRule.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/ReplaceSinkOpWithCommitOpRule.java
 
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/ReplaceSinkOpWithCommitOpRule.java
deleted file mode 100644
index 0c36d0b..0000000
--- 
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/ReplaceSinkOpWithCommitOpRule.java
+++ /dev/null
@@ -1,166 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.optimizer.rules;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.asterix.algebra.operators.CommitOperator;
-import org.apache.asterix.algebra.operators.physical.CommitPOperator;
-import org.apache.asterix.common.transactions.JobId;
-import org.apache.asterix.lang.common.util.FunctionUtil;
-import org.apache.asterix.metadata.declared.AqlMetadataProvider;
-import org.apache.asterix.metadata.declared.DatasetDataSource;
-import org.apache.asterix.om.functions.AsterixBuiltinFunctions;
-import org.apache.commons.lang3.mutable.Mutable;
-import org.apache.commons.lang3.mutable.MutableObject;
-import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
-import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression;
-import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
-import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext;
-import org.apache.hyracks.algebricks.core.algebra.base.LogicalOperatorTag;
-import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
-import 
org.apache.hyracks.algebricks.core.algebra.expressions.AbstractFunctionCallExpression;
-import 
org.apache.hyracks.algebricks.core.algebra.expressions.ScalarFunctionCallExpression;
-import 
org.apache.hyracks.algebricks.core.algebra.expressions.VariableReferenceExpression;
-import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
-import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.AssignOperator;
-import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.ExtensionOperator;
-import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.IndexInsertDeleteUpsertOperator;
-import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.InsertDeleteUpsertOperator;
-import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.InsertDeleteUpsertOperator.Kind;
-import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.SinkOperator;
-import org.apache.hyracks.algebricks.core.rewriter.base.IAlgebraicRewriteRule;
-
-public class ReplaceSinkOpWithCommitOpRule implements IAlgebraicRewriteRule {
-
-    @Override
-    public boolean rewritePre(Mutable<ILogicalOperator> opRef, 
IOptimizationContext context)
-            throws AlgebricksException {
-        return false;
-    }
-
-    @Override
-    public boolean rewritePost(Mutable<ILogicalOperator> opRef, 
IOptimizationContext context)
-            throws AlgebricksException {
-
-        AbstractLogicalOperator op = (AbstractLogicalOperator) 
opRef.getValue();
-        if (op.getOperatorTag() != LogicalOperatorTag.SINK) {
-            return false;
-        }
-        SinkOperator sinkOperator = (SinkOperator) op;
-
-        List<Mutable<ILogicalExpression>> primaryKeyExprs = null;
-        int datasetId = 0;
-        String dataverse = null;
-        String datasetName = null;
-        AbstractLogicalOperator descendantOp = (AbstractLogicalOperator) 
sinkOperator.getInputs().get(0).getValue();
-        LogicalVariable upsertVar = null;
-        AssignOperator upsertFlagAssign = null;
-        while (descendantOp != null) {
-            if (descendantOp.getOperatorTag() == 
LogicalOperatorTag.INDEX_INSERT_DELETE_UPSERT) {
-                IndexInsertDeleteUpsertOperator 
indexInsertDeleteUpsertOperator = (IndexInsertDeleteUpsertOperator) 
descendantOp;
-                if (!indexInsertDeleteUpsertOperator.isBulkload()
-                        && 
indexInsertDeleteUpsertOperator.getPrevSecondaryKeyExprs() == null) {
-                    primaryKeyExprs = 
indexInsertDeleteUpsertOperator.getPrimaryKeyExpressions();
-                    datasetId = ((DatasetDataSource) 
indexInsertDeleteUpsertOperator.getDataSourceIndex()
-                            .getDataSource()).getDataset().getDatasetId();
-                    dataverse = ((DatasetDataSource) 
indexInsertDeleteUpsertOperator.getDataSourceIndex()
-                            .getDataSource()).getDataset().getDataverseName();
-                    datasetName = ((DatasetDataSource) 
indexInsertDeleteUpsertOperator.getDataSourceIndex()
-                            .getDataSource()).getDataset().getDatasetName();
-                    break;
-                }
-            } else if (descendantOp.getOperatorTag() == 
LogicalOperatorTag.INSERT_DELETE_UPSERT) {
-                InsertDeleteUpsertOperator insertDeleteUpsertOperator = 
(InsertDeleteUpsertOperator) descendantOp;
-                if (!insertDeleteUpsertOperator.isBulkload()) {
-                    primaryKeyExprs = 
insertDeleteUpsertOperator.getPrimaryKeyExpressions();
-                    datasetId = ((DatasetDataSource) 
insertDeleteUpsertOperator.getDataSource()).getDataset()
-                            .getDatasetId();
-                    dataverse = ((DatasetDataSource) 
insertDeleteUpsertOperator.getDataSource()).getDataset()
-                            .getDataverseName();
-                    datasetName = ((DatasetDataSource) 
insertDeleteUpsertOperator.getDataSource()).getDataset()
-                            .getDatasetName();
-                    if (insertDeleteUpsertOperator.getOperation() == 
Kind.UPSERT) {
-                        //we need to add a function that checks if previous 
record was found
-                        upsertVar = context.newVar();
-                        AbstractFunctionCallExpression orFunc = new 
ScalarFunctionCallExpression(
-                                
FunctionUtil.getFunctionInfo(AsterixBuiltinFunctions.OR));
-                        // is new value missing? -> this means that the 
expected operation is delete
-                        AbstractFunctionCallExpression isNewMissingFunc = new 
ScalarFunctionCallExpression(
-                                
FunctionUtil.getFunctionInfo(AsterixBuiltinFunctions.IS_MISSING));
-                        
isNewMissingFunc.getArguments().add(insertDeleteUpsertOperator.getPayloadExpression());
-                        AbstractFunctionCallExpression isPrevMissingFunc = new 
ScalarFunctionCallExpression(
-                                
FunctionUtil.getFunctionInfo(AsterixBuiltinFunctions.IS_MISSING));
-                        // argument is the previous record
-                        isPrevMissingFunc.getArguments().add(new 
MutableObject<ILogicalExpression>(
-                                new 
VariableReferenceExpression(insertDeleteUpsertOperator.getBeforeOpRecordVar())));
-                        orFunc.getArguments().add(new 
MutableObject<ILogicalExpression>(isPrevMissingFunc));
-                        orFunc.getArguments().add(new 
MutableObject<ILogicalExpression>(isNewMissingFunc));
-
-                        // AssignOperator puts in the cast var the casted 
record
-                        upsertFlagAssign = new AssignOperator(upsertVar, new 
MutableObject<ILogicalExpression>(orFunc));
-                        // Connect the current top of the plan to the cast 
operator
-                        upsertFlagAssign.getInputs()
-                                .add(new 
MutableObject<ILogicalOperator>(sinkOperator.getInputs().get(0).getValue()));
-                        sinkOperator.getInputs().clear();
-                        sinkOperator.getInputs().add(new 
MutableObject<ILogicalOperator>(upsertFlagAssign));
-                        
context.computeAndSetTypeEnvironmentForOperator(upsertFlagAssign);
-                    }
-                    break;
-                }
-            }
-            if (descendantOp.getInputs().size() < 1) {
-                break;
-            }
-            descendantOp = (AbstractLogicalOperator) 
descendantOp.getInputs().get(0).getValue();
-        }
-
-        if (primaryKeyExprs == null) {
-            return false;
-        }
-
-        //copy primaryKeyExprs
-        List<LogicalVariable> primaryKeyLogicalVars = new 
ArrayList<LogicalVariable>();
-        for (Mutable<ILogicalExpression> expr : primaryKeyExprs) {
-            VariableReferenceExpression varRefExpr = 
(VariableReferenceExpression) expr.getValue();
-            primaryKeyLogicalVars.add(new 
LogicalVariable(varRefExpr.getVariableReference().getId()));
-        }
-
-        //get JobId(TransactorId)
-        AqlMetadataProvider mp = (AqlMetadataProvider) 
context.getMetadataProvider();
-        JobId jobId = mp.getJobId();
-
-        //create the logical and physical operator
-        CommitOperator commitOperator = new 
CommitOperator(primaryKeyLogicalVars, upsertVar);
-        CommitPOperator commitPOperator = new CommitPOperator(jobId, 
dataverse, datasetName, datasetId,
-                primaryKeyLogicalVars, upsertVar);
-        commitOperator.setPhysicalOperator(commitPOperator);
-
-        //create ExtensionOperator and put the commitOperator in it.
-        ExtensionOperator extensionOperator = new 
ExtensionOperator(commitOperator);
-        extensionOperator.setPhysicalOperator(commitPOperator);
-
-        //update plan link
-        extensionOperator.getInputs().add(sinkOperator.getInputs().get(0));
-        context.computeAndSetTypeEnvironmentForOperator(extensionOperator);
-        opRef.setValue(extensionOperator);
-        return true;
-    }
-}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/afa909a5/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/SetupCommitExtensionOpRule.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/SetupCommitExtensionOpRule.java
 
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/SetupCommitExtensionOpRule.java
new file mode 100644
index 0000000..4bbfce0
--- /dev/null
+++ 
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/SetupCommitExtensionOpRule.java
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.optimizer.rules;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.asterix.algebra.operators.CommitOperator;
+import org.apache.asterix.algebra.operators.physical.CommitPOperator;
+import org.apache.asterix.common.transactions.JobId;
+import org.apache.asterix.lang.common.util.FunctionUtil;
+import org.apache.asterix.metadata.declared.AqlMetadataProvider;
+import org.apache.asterix.metadata.declared.DatasetDataSource;
+import org.apache.asterix.om.functions.AsterixBuiltinFunctions;
+import org.apache.commons.lang3.mutable.Mutable;
+import org.apache.commons.lang3.mutable.MutableObject;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression;
+import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext;
+import org.apache.hyracks.algebricks.core.algebra.base.LogicalOperatorTag;
+import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
+import 
org.apache.hyracks.algebricks.core.algebra.expressions.AbstractFunctionCallExpression;
+import 
org.apache.hyracks.algebricks.core.algebra.expressions.ScalarFunctionCallExpression;
+import 
org.apache.hyracks.algebricks.core.algebra.expressions.VariableReferenceExpression;
+import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
+import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.AssignOperator;
+import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.DelegateOperator;
+import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.IndexInsertDeleteUpsertOperator;
+import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.InsertDeleteUpsertOperator;
+import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.InsertDeleteUpsertOperator.Kind;
+import org.apache.hyracks.algebricks.core.rewriter.base.IAlgebraicRewriteRule;
+
+public class SetupCommitExtensionOpRule implements IAlgebraicRewriteRule {
+
+    @Override
+    public boolean rewritePre(Mutable<ILogicalOperator> opRef, 
IOptimizationContext context)
+            throws AlgebricksException {
+        return false;
+    }
+
+    @Override
+    public boolean rewritePost(Mutable<ILogicalOperator> opRef, 
IOptimizationContext context)
+            throws AlgebricksException {
+
+        AbstractLogicalOperator op = (AbstractLogicalOperator) 
opRef.getValue();
+        if (op.getOperatorTag() != LogicalOperatorTag.DELEGATE_OPERATOR) {
+            return false;
+        }
+        DelegateOperator eOp = (DelegateOperator) op;
+        if (!(eOp.getDelegate() instanceof CommitOperator)) {
+            return false;
+        }
+        boolean isSink = ((CommitOperator) eOp.getDelegate()).isSink();
+
+        List<Mutable<ILogicalExpression>> primaryKeyExprs = null;
+        int datasetId = 0;
+        String dataverse = null;
+        String datasetName = null;
+        AbstractLogicalOperator descendantOp = (AbstractLogicalOperator) 
eOp.getInputs().get(0).getValue();
+        LogicalVariable upsertVar = null;
+        while (descendantOp != null) {
+            if (descendantOp.getOperatorTag() == 
LogicalOperatorTag.INDEX_INSERT_DELETE_UPSERT) {
+                IndexInsertDeleteUpsertOperator operator = 
(IndexInsertDeleteUpsertOperator) descendantOp;
+                if (!operator.isBulkload() && 
operator.getPrevSecondaryKeyExprs() == null) {
+                    primaryKeyExprs = operator.getPrimaryKeyExpressions();
+                    datasetId = ((DatasetDataSource) 
operator.getDataSourceIndex().getDataSource()).getDataset()
+                            .getDatasetId();
+                    dataverse = ((DatasetDataSource) 
operator.getDataSourceIndex().getDataSource()).getDataset()
+                            .getDataverseName();
+                    datasetName = ((DatasetDataSource) 
operator.getDataSourceIndex().getDataSource()).getDataset()
+                            .getDatasetName();
+                    break;
+                }
+            } else if (descendantOp.getOperatorTag() == 
LogicalOperatorTag.INSERT_DELETE_UPSERT) {
+                InsertDeleteUpsertOperator insertDeleteUpsertOperator = 
(InsertDeleteUpsertOperator) descendantOp;
+                if (!insertDeleteUpsertOperator.isBulkload()) {
+                    primaryKeyExprs = 
insertDeleteUpsertOperator.getPrimaryKeyExpressions();
+                    datasetId = ((DatasetDataSource) 
insertDeleteUpsertOperator.getDataSource()).getDataset()
+                            .getDatasetId();
+                    dataverse = ((DatasetDataSource) 
insertDeleteUpsertOperator.getDataSource()).getDataset()
+                            .getDataverseName();
+                    datasetName = ((DatasetDataSource) 
insertDeleteUpsertOperator.getDataSource()).getDataset()
+                            .getDatasetName();
+                    if (insertDeleteUpsertOperator.getOperation() == 
Kind.UPSERT) {
+                        //we need to add a function that checks if previous 
record was found
+                        upsertVar = context.newVar();
+                        AbstractFunctionCallExpression orFunc = new 
ScalarFunctionCallExpression(
+                                
FunctionUtil.getFunctionInfo(AsterixBuiltinFunctions.OR));
+                        // is new value missing? -> this means that the 
expected operation is delete
+                        AbstractFunctionCallExpression isNewMissingFunc = new 
ScalarFunctionCallExpression(
+                                
FunctionUtil.getFunctionInfo(AsterixBuiltinFunctions.IS_MISSING));
+                        
isNewMissingFunc.getArguments().add(insertDeleteUpsertOperator.getPayloadExpression());
+                        AbstractFunctionCallExpression isPrevMissingFunc = new 
ScalarFunctionCallExpression(
+                                
FunctionUtil.getFunctionInfo(AsterixBuiltinFunctions.IS_MISSING));
+                        // argument is the previous record
+                        isPrevMissingFunc.getArguments().add(new 
MutableObject<ILogicalExpression>(
+                                new 
VariableReferenceExpression(insertDeleteUpsertOperator.getBeforeOpRecordVar())));
+                        orFunc.getArguments().add(new 
MutableObject<ILogicalExpression>(isPrevMissingFunc));
+                        orFunc.getArguments().add(new 
MutableObject<ILogicalExpression>(isNewMissingFunc));
+
+                        // AssignOperator puts in the cast var the casted 
record
+                        AssignOperator upsertFlagAssign = new 
AssignOperator(upsertVar,
+                                new MutableObject<ILogicalExpression>(orFunc));
+                        // Connect the current top of the plan to the cast 
operator
+                        upsertFlagAssign.getInputs()
+                                .add(new 
MutableObject<ILogicalOperator>(eOp.getInputs().get(0).getValue()));
+                        eOp.getInputs().clear();
+                        eOp.getInputs().add(new 
MutableObject<ILogicalOperator>(upsertFlagAssign));
+                        
context.computeAndSetTypeEnvironmentForOperator(upsertFlagAssign);
+                    }
+                    break;
+                }
+            }
+            if (descendantOp.getInputs().isEmpty()) {
+                break;
+            }
+            descendantOp = (AbstractLogicalOperator) 
descendantOp.getInputs().get(0).getValue();
+        }
+
+        if (primaryKeyExprs == null) {
+            return false;
+        }
+
+        //copy primaryKeyExprs
+        List<LogicalVariable> primaryKeyLogicalVars = new ArrayList<>();
+        for (Mutable<ILogicalExpression> expr : primaryKeyExprs) {
+            VariableReferenceExpression varRefExpr = 
(VariableReferenceExpression) expr.getValue();
+            primaryKeyLogicalVars.add(new 
LogicalVariable(varRefExpr.getVariableReference().getId()));
+        }
+
+        //get JobId(TransactorId)
+        AqlMetadataProvider mp = (AqlMetadataProvider) 
context.getMetadataProvider();
+        JobId jobId = mp.getJobId();
+
+        //create the logical and physical operator
+        CommitOperator commitOperator = new 
CommitOperator(primaryKeyLogicalVars, upsertVar, isSink);
+        CommitPOperator commitPOperator = new CommitPOperator(jobId, 
dataverse, datasetName, datasetId,
+                primaryKeyLogicalVars, upsertVar, isSink);
+        commitOperator.setPhysicalOperator(commitPOperator);
+
+        //create ExtensionOperator and put the commitOperator in it.
+        DelegateOperator extensionOperator = new 
DelegateOperator(commitOperator);
+        extensionOperator.setPhysicalOperator(commitPOperator);
+
+        //update plan link
+        extensionOperator.getInputs().add(eOp.getInputs().get(0));
+        context.computeAndSetTypeEnvironmentForOperator(extensionOperator);
+        opRef.setValue(extensionOperator);
+        return true;
+    }
+}

Reply via email to