Repository: asterixdb
Updated Branches:
  refs/heads/master 317628afe -> a691dd422


[ASTERIXDB-2429] Fix the upsert of primary key index

- user model changes: no
- storage format changes: no
- interface changes: no

Details:
- Previously the primary key index is not properly maintained during
upsert. Since there is no secondary key in the primary key index, the
old value would always point to the primary key, which is always equal
to the new value. As a result, the primary key index is nevered
maintainined during upsert.
- This patch fixes this bug with two changes:
First, if there is a primary key index, we would perform upsert anyway
no matter whether old value == new value
Second, use a boolean variable to indicate whether the operation
is upsert or delete since for the primary key index, old value cannot
provide such information.

Change-Id: I925bd42ba67f70e94f5f5bc2d24151c8e2e20baf
Reviewed-on: https://asterix-gerrit.ics.uci.edu/2825
Sonar-Qube: Jenkins <[email protected]>
Tested-by: Jenkins <[email protected]>
Integration-Tests: Jenkins <[email protected]>
Contrib: Jenkins <[email protected]>
Reviewed-by: abdullah alamoudi <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/asterixdb/repo
Commit: http://git-wip-us.apache.org/repos/asf/asterixdb/commit/a691dd42
Tree: http://git-wip-us.apache.org/repos/asf/asterixdb/tree/a691dd42
Diff: http://git-wip-us.apache.org/repos/asf/asterixdb/diff/a691dd42

Branch: refs/heads/master
Commit: a691dd422cfee860779d71f1e7dbde528660aebc
Parents: 317628a
Author: luochen01 <[email protected]>
Authored: Mon Jul 30 17:12:27 2018 -0700
Committer: Luo Chen <[email protected]>
Committed: Tue Jul 31 09:05:12 2018 -0700

----------------------------------------------------------------------
 ...IntroduceSecondaryIndexInsertDeleteRule.java |  6 ++
 .../LangExpressionToPlanTranslator.java         |  5 ++
 .../org/apache/asterix/utils/RebalanceUtil.java |  4 +-
 .../change-feed-with-meta-pk-index.1.ddl.sqlpp  | 60 ++++++++++++++++++++
 ...hange-feed-with-meta-pk-index.2.update.sqlpp | 25 ++++++++
 ...change-feed-with-meta-pk-index.3.query.sqlpp | 23 ++++++++
 .../change-feed-with-meta-pk-index.4.ddl.sqlpp  | 21 +++++++
 .../primary-secondary-btree.4.query.sqlpp       | 28 +++++++++
 .../change-feed-with-meta-pk-index.1.adm        |  1 +
 .../primary-secondary-btree.2.adm               |  1 +
 .../resources/runtimets/testsuite_sqlpp.xml     |  5 ++
 .../metadata/declared/MetadataProvider.java     | 46 +++++++++------
 .../asterix/metadata/utils/DatasetUtil.java     | 13 +++--
 .../LSMPrimaryUpsertOperatorNodePushable.java   |  9 +++
 .../LSMSecondaryUpsertOperatorDescriptor.java   |  9 ++-
 .../LSMSecondaryUpsertOperatorNodePushable.java | 27 ++++++++-
 .../algebra/metadata/IMetadataProvider.java     |  5 +-
 .../IndexInsertDeleteUpsertOperator.java        | 15 +++++
 .../logical/InsertDeleteUpsertOperator.java     | 25 ++++++++
 .../logical/visitors/UsedVariableVisitor.java   |  3 +
 .../IndexInsertDeleteUpsertPOperator.java       | 11 ++--
 .../SetAlgebricksPhysicalOperatorsRule.java     | 17 ++++--
 22 files changed, 319 insertions(+), 40 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/asterixdb/blob/a691dd42/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/IntroduceSecondaryIndexInsertDeleteRule.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/IntroduceSecondaryIndexInsertDeleteRule.java
 
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/IntroduceSecondaryIndexInsertDeleteRule.java
index ae74832..e123715 100644
--- 
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/IntroduceSecondaryIndexInsertDeleteRule.java
+++ 
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/IntroduceSecondaryIndexInsertDeleteRule.java
@@ -496,6 +496,12 @@ public class IntroduceSecondaryIndexInsertDeleteRule 
implements IAlgebraicRewrit
                     indexUpdate.getInputs().add(new 
MutableObject<ILogicalOperator>(assignCoordinates));
                 }
             }
+
+            if (primaryIndexModificationOp.getOperation() == Kind.UPSERT) {
+                indexUpdate.setUpsertIndicatorExpr(new MutableObject<>(
+                        new 
VariableReferenceExpression(primaryIndexModificationOp.getUpsertIndicatorVar())));
+            }
+
             context.computeAndSetTypeEnvironmentForOperator(indexUpdate);
             if (!primaryIndexModificationOp.isBulkload() || 
secondaryIndexTotalCnt == 1) {
                 currentTop = indexUpdate;

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/a691dd42/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/LangExpressionToPlanTranslator.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/LangExpressionToPlanTranslator.java
 
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/LangExpressionToPlanTranslator.java
index aa4fb75..2c1e0f7 100644
--- 
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/LangExpressionToPlanTranslator.java
+++ 
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/LangExpressionToPlanTranslator.java
@@ -92,6 +92,7 @@ import org.apache.asterix.om.constants.AsterixConstantValue;
 import org.apache.asterix.om.functions.BuiltinFunctions;
 import org.apache.asterix.om.functions.FunctionInfo;
 import org.apache.asterix.om.types.ARecordType;
+import org.apache.asterix.om.types.BuiltinType;
 import org.apache.asterix.om.types.IAType;
 import 
org.apache.asterix.translator.CompiledStatements.CompiledInsertStatement;
 import 
org.apache.asterix.translator.CompiledStatements.CompiledLoadFromFileStatement;
@@ -522,6 +523,8 @@ class LangExpressionToPlanTranslator
             // A change feed, we don't need the assign to access PKs
             upsertOp = new InsertDeleteUpsertOperator(targetDatasource, 
varRef, varRefsForLoading, metaExpSingletonList,
                     InsertDeleteUpsertOperator.Kind.UPSERT, false);
+            upsertOp.setUpsertIndicatorVar(context.newVar());
+            upsertOp.setUpsertIndicatorVarType(BuiltinType.ABOOLEAN);
             // Create and add a new variable used for representing the 
original record
             upsertOp.setPrevRecordVar(context.newVar());
             upsertOp.setPrevRecordType(targetDatasource.getItemType());
@@ -556,6 +559,8 @@ class LangExpressionToPlanTranslator
             
upsertOp.setAdditionalFilteringExpressions(additionalFilteringExpressions);
             upsertOp.getInputs().add(new MutableObject<>(assign));
             upsertOp.setSourceLocation(sourceLoc);
+            upsertOp.setUpsertIndicatorVar(context.newVar());
+            upsertOp.setUpsertIndicatorVarType(BuiltinType.ABOOLEAN);
             // Create and add a new variable used for representing the 
original record
             ARecordType recordType = (ARecordType) 
targetDatasource.getItemType();
             upsertOp.setPrevRecordVar(context.newVar());

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/a691dd42/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/RebalanceUtil.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/RebalanceUtil.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/RebalanceUtil.java
index b74d739..483987c 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/RebalanceUtil.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/RebalanceUtil.java
@@ -383,8 +383,8 @@ public class RebalanceUtil {
 
     // Gets the primary key permutation for upserts.
     private static int[] getPrimaryKeyPermutationForUpsert(Dataset dataset) {
-        // prev record first
-        int f = 1;
+        // upsertIndicatorVar + prev record
+        int f = 2;
         // add the previous meta second
         if (dataset.hasMetaPart()) {
             f++;

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/a691dd42/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/change-feed-with-meta-pk-index/change-feed-with-meta-pk-index.1.ddl.sqlpp
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/change-feed-with-meta-pk-index/change-feed-with-meta-pk-index.1.ddl.sqlpp
 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/change-feed-with-meta-pk-index/change-feed-with-meta-pk-index.1.ddl.sqlpp
new file mode 100644
index 0000000..332aa0d
--- /dev/null
+++ 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/change-feed-with-meta-pk-index/change-feed-with-meta-pk-index.1.ddl.sqlpp
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+ * Description  : Create a change feed with meta-data, create a primary key 
index and then ingest data (with deletes)
+ * Expected Res : Success
+ * Date         : 18th Jun 2018
+ */
+
+drop dataverse KeyVerse if exists;
+create dataverse KeyVerse;
+use KeyVerse;
+
+create type DocumentType as open{
+};
+
+create type KVMetaType as open{
+`key`:string,
+vbucket:int32,
+seq:int64,
+cas:int64,
+expiration:int32,
+flags:int32,
+revSeq:int64,
+lockTime:int32
+};
+
+create dataset KVStore(DocumentType) with meta(KVMetaType)primary key 
meta().`key`;
+create primary index primary_idx on KVStore;
+
+create feed KVChangeStream with {
+  "adapter-name" : "adapter",
+  "type-name" : "DocumentType",
+  "meta-type-name" : "KVMetaType",
+  "reader" : 
"org.apache.asterix.external.input.record.reader.kv.KVTestReaderFactory",
+  "parser" : "record-with-metadata",
+  "format" : "dcp",
+  "record-format" : "json",
+  "change-feed" : "true",
+  "key-indexes" : "0",
+  "key-indicators" : "1",
+  "num-of-records" : "1000",
+  "delete-cycle" : "5"
+};
+

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/a691dd42/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/change-feed-with-meta-pk-index/change-feed-with-meta-pk-index.2.update.sqlpp
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/change-feed-with-meta-pk-index/change-feed-with-meta-pk-index.2.update.sqlpp
 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/change-feed-with-meta-pk-index/change-feed-with-meta-pk-index.2.update.sqlpp
new file mode 100644
index 0000000..03a83ff
--- /dev/null
+++ 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/change-feed-with-meta-pk-index/change-feed-with-meta-pk-index.2.update.sqlpp
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use KeyVerse;
+
+set `wait-for-completion-feed` "true";
+connect feed KVChangeStream to dataset KVStore;
+
+start feed KVChangeStream;

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/a691dd42/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/change-feed-with-meta-pk-index/change-feed-with-meta-pk-index.3.query.sqlpp
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/change-feed-with-meta-pk-index/change-feed-with-meta-pk-index.3.query.sqlpp
 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/change-feed-with-meta-pk-index/change-feed-with-meta-pk-index.3.query.sqlpp
new file mode 100644
index 0000000..6e29992
--- /dev/null
+++ 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/change-feed-with-meta-pk-index/change-feed-with-meta-pk-index.3.query.sqlpp
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use KeyVerse;
+
+select count(*)
+from KVStore x;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/a691dd42/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/change-feed-with-meta-pk-index/change-feed-with-meta-pk-index.4.ddl.sqlpp
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/change-feed-with-meta-pk-index/change-feed-with-meta-pk-index.4.ddl.sqlpp
 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/change-feed-with-meta-pk-index/change-feed-with-meta-pk-index.4.ddl.sqlpp
new file mode 100644
index 0000000..89469bd
--- /dev/null
+++ 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/change-feed-with-meta-pk-index/change-feed-with-meta-pk-index.4.ddl.sqlpp
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+
+drop dataverse KeyVerse;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/a691dd42/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/upsert/primary-secondary-btree/primary-secondary-btree.4.query.sqlpp
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/upsert/primary-secondary-btree/primary-secondary-btree.4.query.sqlpp
 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/upsert/primary-secondary-btree/primary-secondary-btree.4.query.sqlpp
new file mode 100644
index 0000000..3f08b2f
--- /dev/null
+++ 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/upsert/primary-secondary-btree/primary-secondary-btree.4.query.sqlpp
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+ * Description  : Upsert into a dataset which has a b-tree secondary index
+ * Expected Res : Success
+ * Date         : Sep 15th 2015
+ */
+
+use test;
+
+select count(*)
+from UpsertTo x;

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/a691dd42/asterixdb/asterix-app/src/test/resources/runtimets/results/feeds/change-feed-with-meta-pk-index/change-feed-with-meta-pk-index.1.adm
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-app/src/test/resources/runtimets/results/feeds/change-feed-with-meta-pk-index/change-feed-with-meta-pk-index.1.adm
 
b/asterixdb/asterix-app/src/test/resources/runtimets/results/feeds/change-feed-with-meta-pk-index/change-feed-with-meta-pk-index.1.adm
new file mode 100644
index 0000000..d0d0910
--- /dev/null
+++ 
b/asterixdb/asterix-app/src/test/resources/runtimets/results/feeds/change-feed-with-meta-pk-index/change-feed-with-meta-pk-index.1.adm
@@ -0,0 +1 @@
+{ "$1": 804 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/a691dd42/asterixdb/asterix-app/src/test/resources/runtimets/results/upsert/primary-secondary-btree/primary-secondary-btree.2.adm
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-app/src/test/resources/runtimets/results/upsert/primary-secondary-btree/primary-secondary-btree.2.adm
 
b/asterixdb/asterix-app/src/test/resources/runtimets/results/upsert/primary-secondary-btree/primary-secondary-btree.2.adm
new file mode 100644
index 0000000..71c9709
--- /dev/null
+++ 
b/asterixdb/asterix-app/src/test/resources/runtimets/results/upsert/primary-secondary-btree/primary-secondary-btree.2.adm
@@ -0,0 +1 @@
+{ "$1": 9 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/a691dd42/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml 
b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml
index 9ca1fbd..dedabbe 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml
@@ -8865,6 +8865,11 @@
       </compilation-unit>
     </test-case>
     <test-case FilePath="feeds">
+      <compilation-unit name="change-feed-with-meta-pk-index">
+        <output-dir compare="Text">change-feed-with-meta-pk-index</output-dir>
+      </compilation-unit>
+    </test-case>
+    <test-case FilePath="feeds">
       <compilation-unit name="change-feed-with-meta-with-mixed-index">
         <output-dir 
compare="Text">change-feed-with-meta-with-mixed-index</output-dir>
       </compilation-unit>

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/a691dd42/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
index 695dcb8..b5bcece 100644
--- 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
+++ 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
@@ -57,6 +57,7 @@ import 
org.apache.asterix.external.provider.AdapterFactoryProvider;
 import org.apache.asterix.external.util.ExternalDataConstants;
 import org.apache.asterix.external.util.FeedConstants;
 import org.apache.asterix.formats.base.IDataFormat;
+import org.apache.asterix.formats.nontagged.BinaryBooleanInspector;
 import org.apache.asterix.formats.nontagged.BinaryComparatorFactoryProvider;
 import org.apache.asterix.formats.nontagged.LinearizeComparatorFactoryProvider;
 import org.apache.asterix.formats.nontagged.TypeTraitProvider;
@@ -668,7 +669,7 @@ public class MetadataProvider implements 
IMetadataProvider<DataSourceId, String>
             boolean bulkload) throws AlgebricksException {
         return getIndexInsertOrDeleteOrUpsertRuntime(IndexOperation.INSERT, 
dataSourceIndex, propagatedSchema,
                 inputSchemas, typeEnv, primaryKeys, secondaryKeys, 
additionalNonKeyFields, filterExpr, recordDesc,
-                context, spec, bulkload, null, null);
+                context, spec, bulkload, null, null, null);
     }
 
     @Override
@@ -680,7 +681,7 @@ public class MetadataProvider implements 
IMetadataProvider<DataSourceId, String>
             throws AlgebricksException {
         return getIndexInsertOrDeleteOrUpsertRuntime(IndexOperation.DELETE, 
dataSourceIndex, propagatedSchema,
                 inputSchemas, typeEnv, primaryKeys, secondaryKeys, 
additionalNonKeyFields, filterExpr, recordDesc,
-                context, spec, false, null, null);
+                context, spec, false, null, null, null);
     }
 
     @Override
@@ -688,12 +689,12 @@ public class MetadataProvider implements 
IMetadataProvider<DataSourceId, String>
             IDataSourceIndex<String, DataSourceId> dataSourceIndex, 
IOperatorSchema propagatedSchema,
             IOperatorSchema[] inputSchemas, IVariableTypeEnvironment typeEnv, 
List<LogicalVariable> primaryKeys,
             List<LogicalVariable> secondaryKeys, List<LogicalVariable> 
additionalFilteringKeys,
-            ILogicalExpression filterExpr, List<LogicalVariable> 
prevSecondaryKeys,
+            ILogicalExpression filterExpr, LogicalVariable upsertIndicatorVar, 
List<LogicalVariable> prevSecondaryKeys,
             LogicalVariable prevAdditionalFilteringKey, RecordDescriptor 
recordDesc, JobGenContext context,
             JobSpecification spec) throws AlgebricksException {
         return getIndexInsertOrDeleteOrUpsertRuntime(IndexOperation.UPSERT, 
dataSourceIndex, propagatedSchema,
                 inputSchemas, typeEnv, primaryKeys, secondaryKeys, 
additionalFilteringKeys, filterExpr, recordDesc,
-                context, spec, false, prevSecondaryKeys, 
prevAdditionalFilteringKey);
+                context, spec, false, upsertIndicatorVar, prevSecondaryKeys, 
prevAdditionalFilteringKey);
     }
 
     @Override
@@ -1042,8 +1043,8 @@ public class MetadataProvider implements 
IMetadataProvider<DataSourceId, String>
             List<LogicalVariable> primaryKeys, List<LogicalVariable> 
secondaryKeys,
             List<LogicalVariable> additionalNonKeyFields, ILogicalExpression 
filterExpr,
             RecordDescriptor inputRecordDesc, JobGenContext context, 
JobSpecification spec, boolean bulkload,
-            List<LogicalVariable> prevSecondaryKeys, LogicalVariable 
prevAdditionalFilteringKey)
-            throws AlgebricksException {
+            LogicalVariable upsertIndicatorVar, List<LogicalVariable> 
prevSecondaryKeys,
+            LogicalVariable prevAdditionalFilteringKey) throws 
AlgebricksException {
         String indexName = dataSourceIndex.getId();
         String dataverseName = 
dataSourceIndex.getDataSource().getId().getDataverseName();
         String datasetName = 
dataSourceIndex.getDataSource().getId().getDatasourceName();
@@ -1062,18 +1063,19 @@ public class MetadataProvider implements 
IMetadataProvider<DataSourceId, String>
             case BTREE:
                 return getBTreeRuntime(dataverseName, datasetName, indexName, 
propagatedSchema, primaryKeys,
                         secondaryKeys, additionalNonKeyFields, filterFactory, 
inputRecordDesc, context, spec, indexOp,
-                        bulkload, prevSecondaryKeys, 
prevAdditionalFilteringKeys);
+                        bulkload, upsertIndicatorVar, prevSecondaryKeys, 
prevAdditionalFilteringKeys);
             case RTREE:
                 return getRTreeRuntime(dataverseName, datasetName, indexName, 
propagatedSchema, primaryKeys,
                         secondaryKeys, additionalNonKeyFields, filterFactory, 
inputRecordDesc, context, spec, indexOp,
-                        bulkload, prevSecondaryKeys, 
prevAdditionalFilteringKeys);
+                        bulkload, upsertIndicatorVar, prevSecondaryKeys, 
prevAdditionalFilteringKeys);
             case SINGLE_PARTITION_WORD_INVIX:
             case SINGLE_PARTITION_NGRAM_INVIX:
             case LENGTH_PARTITIONED_WORD_INVIX:
             case LENGTH_PARTITIONED_NGRAM_INVIX:
                 return getInvertedIndexRuntime(dataverseName, datasetName, 
indexName, propagatedSchema, primaryKeys,
                         secondaryKeys, additionalNonKeyFields, filterFactory, 
inputRecordDesc, context, spec, indexOp,
-                        secondaryIndex.getIndexType(), bulkload, 
prevSecondaryKeys, prevAdditionalFilteringKeys);
+                        secondaryIndex.getIndexType(), bulkload, 
upsertIndicatorVar, prevSecondaryKeys,
+                        prevAdditionalFilteringKeys);
             default:
                 throw new AlgebricksException(
                         indexOp.name() + "Insert, upsert, and delete not 
implemented for index type: "
@@ -1085,8 +1087,9 @@ public class MetadataProvider implements 
IMetadataProvider<DataSourceId, String>
             String datasetName, String indexName, IOperatorSchema 
propagatedSchema, List<LogicalVariable> primaryKeys,
             List<LogicalVariable> secondaryKeys, List<LogicalVariable> 
additionalNonKeyFields,
             AsterixTupleFilterFactory filterFactory, RecordDescriptor 
inputRecordDesc, JobGenContext context,
-            JobSpecification spec, IndexOperation indexOp, boolean bulkload, 
List<LogicalVariable> prevSecondaryKeys,
-            List<LogicalVariable> prevAdditionalFilteringKeys) throws 
AlgebricksException {
+            JobSpecification spec, IndexOperation indexOp, boolean bulkload, 
LogicalVariable upsertIndicatorVar,
+            List<LogicalVariable> prevSecondaryKeys, List<LogicalVariable> 
prevAdditionalFilteringKeys)
+            throws AlgebricksException {
         Dataset dataset = MetadataManagerUtil.findExistingDataset(mdTxnCtx, 
dataverseName, datasetName);
         int numKeys = primaryKeys.size() + secondaryKeys.size();
         int numFilterFields = DatasetUtil.getFilterField(dataset) == null ? 0 
: 1;
@@ -1153,8 +1156,10 @@ public class MetadataProvider implements 
IMetadataProvider<DataSourceId, String>
                         GlobalConfig.DEFAULT_TREE_FILL_FACTOR, false, 
numElementsHint, false, idfh, null,
                         BulkLoadUsage.LOAD, dataset.getDatasetId());
             } else if (indexOp == IndexOperation.UPSERT) {
+                int upsertIndicatorFieldIndex = 
propagatedSchema.findVariable(upsertIndicatorVar);
                 op = new LSMSecondaryUpsertOperatorDescriptor(spec, 
inputRecordDesc, fieldPermutation, idfh,
-                        filterFactory, modificationCallbackFactory, 
prevFieldPermutation);
+                        filterFactory, modificationCallbackFactory, 
upsertIndicatorFieldIndex,
+                        BinaryBooleanInspector.FACTORY, prevFieldPermutation);
             } else {
                 op = new LSMTreeInsertDeleteOperatorDescriptor(spec, 
inputRecordDesc, fieldPermutation, indexOp, idfh,
                         filterFactory, false, modificationCallbackFactory);
@@ -1169,8 +1174,9 @@ public class MetadataProvider implements 
IMetadataProvider<DataSourceId, String>
             String datasetName, String indexName, IOperatorSchema 
propagatedSchema, List<LogicalVariable> primaryKeys,
             List<LogicalVariable> secondaryKeys, List<LogicalVariable> 
additionalNonKeyFields,
             AsterixTupleFilterFactory filterFactory, RecordDescriptor 
recordDesc, JobGenContext context,
-            JobSpecification spec, IndexOperation indexOp, boolean bulkload, 
List<LogicalVariable> prevSecondaryKeys,
-            List<LogicalVariable> prevAdditionalFilteringKeys) throws 
AlgebricksException {
+            JobSpecification spec, IndexOperation indexOp, boolean bulkload, 
LogicalVariable upsertIndicatorVar,
+            List<LogicalVariable> prevSecondaryKeys, List<LogicalVariable> 
prevAdditionalFilteringKeys)
+            throws AlgebricksException {
         Dataset dataset = MetadataManagerUtil.findExistingDataset(mdTxnCtx, 
dataverseName, datasetName);
         String itemTypeName = dataset.getItemTypeName();
         IAType itemType = MetadataManager.INSTANCE
@@ -1250,8 +1256,10 @@ public class MetadataProvider implements 
IMetadataProvider<DataSourceId, String>
                     GlobalConfig.DEFAULT_TREE_FILL_FACTOR, false, 
numElementsHint, false, indexDataflowHelperFactory,
                     null, BulkLoadUsage.LOAD, dataset.getDatasetId());
         } else if (indexOp == IndexOperation.UPSERT) {
+            int upsertIndicatorFieldIndex = 
propagatedSchema.findVariable(upsertIndicatorVar);
             op = new LSMSecondaryUpsertOperatorDescriptor(spec, recordDesc, 
fieldPermutation,
-                    indexDataflowHelperFactory, filterFactory, 
modificationCallbackFactory, prevFieldPermutation);
+                    indexDataflowHelperFactory, filterFactory, 
modificationCallbackFactory, upsertIndicatorFieldIndex,
+                    BinaryBooleanInspector.FACTORY, prevFieldPermutation);
         } else {
             op = new LSMTreeInsertDeleteOperatorDescriptor(spec, recordDesc, 
fieldPermutation, indexOp,
                     indexDataflowHelperFactory, filterFactory, false, 
modificationCallbackFactory);
@@ -1264,8 +1272,8 @@ public class MetadataProvider implements 
IMetadataProvider<DataSourceId, String>
             List<LogicalVariable> secondaryKeys, List<LogicalVariable> 
additionalNonKeyFields,
             AsterixTupleFilterFactory filterFactory, RecordDescriptor 
recordDesc, JobGenContext context,
             JobSpecification spec, IndexOperation indexOp, IndexType 
indexType, boolean bulkload,
-            List<LogicalVariable> prevSecondaryKeys, List<LogicalVariable> 
prevAdditionalFilteringKeys)
-            throws AlgebricksException {
+            LogicalVariable upsertIndicatorVar, List<LogicalVariable> 
prevSecondaryKeys,
+            List<LogicalVariable> prevAdditionalFilteringKeys) throws 
AlgebricksException {
         // Check the index is length-partitioned or not.
         boolean isPartitioned;
         if (indexType == IndexType.LENGTH_PARTITIONED_WORD_INVIX
@@ -1359,8 +1367,10 @@ public class MetadataProvider implements 
IMetadataProvider<DataSourceId, String>
                         GlobalConfig.DEFAULT_TREE_FILL_FACTOR, false, 
numElementsHint, false, indexDataFlowFactory,
                         null, BulkLoadUsage.LOAD, dataset.getDatasetId());
             } else if (indexOp == IndexOperation.UPSERT) {
+                int upsertIndicatorFieldIndex = 
propagatedSchema.findVariable(upsertIndicatorVar);
                 op = new LSMSecondaryUpsertOperatorDescriptor(spec, 
recordDesc, fieldPermutation, indexDataFlowFactory,
-                        filterFactory, modificationCallbackFactory, 
prevFieldPermutation);
+                        filterFactory, modificationCallbackFactory, 
upsertIndicatorFieldIndex,
+                        BinaryBooleanInspector.FACTORY, prevFieldPermutation);
             } else {
                 op = new LSMTreeInsertDeleteOperatorDescriptor(spec, 
recordDesc, fieldPermutation, indexOp,
                         indexDataFlowFactory, filterFactory, false, 
modificationCallbackFactory);

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/a691dd42/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtil.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtil.java
 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtil.java
index bbdfadf..28c612f 100644
--- 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtil.java
+++ 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtil.java
@@ -407,15 +407,20 @@ public class DatasetUtil {
         IIndexDataflowHelperFactory idfh =
                 new 
IndexDataflowHelperFactory(storageComponentProvider.getStorageManager(), 
splitsAndConstraint.first);
         LSMPrimaryUpsertOperatorDescriptor op;
-        ITypeTraits[] outputTypeTraits =
-                new ITypeTraits[inputRecordDesc.getFieldCount() + 
(dataset.hasMetaPart() ? 2 : 1) + numFilterFields];
-        ISerializerDeserializer<?>[] outputSerDes = new 
ISerializerDeserializer[inputRecordDesc.getFieldCount()
+        ITypeTraits[] outputTypeTraits = new 
ITypeTraits[inputRecordDesc.getFieldCount() + 1
+                + (dataset.hasMetaPart() ? 2 : 1) + numFilterFields];
+        ISerializerDeserializer<?>[] outputSerDes = new 
ISerializerDeserializer[inputRecordDesc.getFieldCount() + 1
                 + (dataset.hasMetaPart() ? 2 : 1) + numFilterFields];
         IDataFormat dataFormat = metadataProvider.getDataFormat();
 
-        // add the previous record first
         int f = 0;
+        // add the upsert indicator var
+        outputSerDes[f] = 
dataFormat.getSerdeProvider().getSerializerDeserializer(BuiltinType.ABOOLEAN);
+        outputTypeTraits[f] = 
dataFormat.getTypeTraitProvider().getTypeTrait(BuiltinType.ABOOLEAN);
+        f++;
+        // add the previous record
         outputSerDes[f] = 
dataFormat.getSerdeProvider().getSerializerDeserializer(itemType);
+        outputTypeTraits[f] = 
dataFormat.getTypeTraitProvider().getTypeTrait(itemType);
         f++;
         // add the previous meta second
         if (dataset.hasMetaPart()) {

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/a691dd42/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryUpsertOperatorNodePushable.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryUpsertOperatorNodePushable.java
 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryUpsertOperatorNodePushable.java
index 3df1b13..ba8074f 100644
--- 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryUpsertOperatorNodePushable.java
+++ 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryUpsertOperatorNodePushable.java
@@ -30,6 +30,7 @@ import org.apache.asterix.common.dataflow.LSMIndexUtil;
 import org.apache.asterix.common.exceptions.ACIDException;
 import org.apache.asterix.common.transactions.ILogMarkerCallback;
 import org.apache.asterix.common.transactions.PrimaryIndexLogMarkerCallback;
+import org.apache.asterix.om.base.ABoolean;
 import org.apache.asterix.om.pointables.nonvisitor.ARecordPointable;
 import org.apache.asterix.om.types.ARecordType;
 import org.apache.asterix.om.types.ATypeTag;
@@ -158,11 +159,13 @@ public class LSMPrimaryUpsertOperatorNodePushable extends 
LSMIndexInsertUpdateDe
                             if (cursor.hasNext()) {
                                 cursor.next();
                                 prevTuple = cursor.getTuple();
+                                appendUpsertIndicator(!isDelete);
                                 appendFilterToPrevTuple();
                                 appendPrevRecord();
                                 appendPreviousMeta();
                                 appendFilterToOutput();
                             } else {
+                                appendUpsertIndicator(!isDelete);
                                 appendPreviousTupleAsMissing();
                             }
                         } finally {
@@ -170,6 +173,7 @@ public class LSMPrimaryUpsertOperatorNodePushable extends 
LSMIndexInsertUpdateDe
                         }
                     } else {
                         searchCallback.before(key); // lock
+                        appendUpsertIndicator(!isDelete);
                         appendPreviousTupleAsMissing();
                     }
                     if (isDelete && prevTuple != null) {
@@ -330,6 +334,11 @@ public class LSMPrimaryUpsertOperatorNodePushable extends 
LSMIndexInsertUpdateDe
         }
     }
 
+    private void appendUpsertIndicator(boolean isUpsert) throws IOException {
+        recordDesc.getFields()[0].serialize(isUpsert ? ABoolean.TRUE : 
ABoolean.FALSE, dos);
+        tb.addFieldEndOffset();
+    }
+
     private void appendPrevRecord() throws IOException {
         dos.write(prevTuple.getFieldData(numOfPrimaryKeys), 
prevTuple.getFieldStart(numOfPrimaryKeys),
                 prevTuple.getFieldLength(numOfPrimaryKeys));

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/a691dd42/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMSecondaryUpsertOperatorDescriptor.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMSecondaryUpsertOperatorDescriptor.java
 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMSecondaryUpsertOperatorDescriptor.java
index 958288a..df658b6 100644
--- 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMSecondaryUpsertOperatorDescriptor.java
+++ 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMSecondaryUpsertOperatorDescriptor.java
@@ -19,6 +19,7 @@
 package org.apache.asterix.runtime.operators;
 
 import 
org.apache.asterix.common.dataflow.LSMTreeInsertDeleteOperatorDescriptor;
+import org.apache.hyracks.algebricks.data.IBinaryBooleanInspectorFactory;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.dataflow.IOperatorNodePushable;
 import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider;
@@ -34,14 +35,19 @@ public class LSMSecondaryUpsertOperatorDescriptor extends 
LSMTreeInsertDeleteOpe
 
     private static final long serialVersionUID = 1L;
     private final int[] prevValuePermutation;
+    private final int upsertIndiatorFieldIndex;
+    private final IBinaryBooleanInspectorFactory 
upsertIndicatorInspectorFactory;
 
     public LSMSecondaryUpsertOperatorDescriptor(IOperatorDescriptorRegistry 
spec, RecordDescriptor outRecDesc,
             int[] fieldPermutation, IIndexDataflowHelperFactory 
indexHelperFactory,
             ITupleFilterFactory tupleFilterFactory, 
IModificationOperationCallbackFactory modificationOpCallbackFactory,
+            int upsertIndicatorFieldIndex, IBinaryBooleanInspectorFactory 
upsertIndicatorInspectorFactory,
             int[] prevValuePermutation) {
         super(spec, outRecDesc, fieldPermutation, IndexOperation.UPSERT, 
indexHelperFactory, tupleFilterFactory, false,
                 modificationOpCallbackFactory);
         this.prevValuePermutation = prevValuePermutation;
+        this.upsertIndiatorFieldIndex = upsertIndicatorFieldIndex;
+        this.upsertIndicatorInspectorFactory = upsertIndicatorInspectorFactory;
     }
 
     @Override
@@ -49,6 +55,7 @@ public class LSMSecondaryUpsertOperatorDescriptor extends 
LSMTreeInsertDeleteOpe
             IRecordDescriptorProvider recordDescProvider, int partition, int 
nPartitions) throws HyracksDataException {
         RecordDescriptor intputRecDesc = 
recordDescProvider.getInputRecordDescriptor(getActivityId(), 0);
         return new LSMSecondaryUpsertOperatorNodePushable(ctx, partition, 
indexHelperFactory, modCallbackFactory,
-                tupleFilterFactory, fieldPermutation, intputRecDesc, 
prevValuePermutation);
+                tupleFilterFactory, fieldPermutation, intputRecDesc, 
upsertIndiatorFieldIndex,
+                upsertIndicatorInspectorFactory, prevValuePermutation);
     }
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/a691dd42/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMSecondaryUpsertOperatorNodePushable.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMSecondaryUpsertOperatorNodePushable.java
 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMSecondaryUpsertOperatorNodePushable.java
index b928131..2dc7f5e 100644
--- 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMSecondaryUpsertOperatorNodePushable.java
+++ 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMSecondaryUpsertOperatorNodePushable.java
@@ -19,15 +19,19 @@
 package org.apache.asterix.runtime.operators;
 
 import java.nio.ByteBuffer;
+import java.util.Arrays;
 
 import org.apache.asterix.om.types.ATypeTag;
 import org.apache.asterix.om.types.TypeTagUtil;
 import 
org.apache.asterix.transaction.management.opcallbacks.AbstractIndexModificationOperationCallback;
 import 
org.apache.asterix.transaction.management.opcallbacks.AbstractIndexModificationOperationCallback.Operation;
+import org.apache.hyracks.algebricks.data.IBinaryBooleanInspector;
+import org.apache.hyracks.algebricks.data.IBinaryBooleanInspectorFactory;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.dataflow.common.comm.util.FrameUtils;
+import org.apache.hyracks.dataflow.common.data.accessors.FrameTupleReference;
 import org.apache.hyracks.dataflow.common.utils.TupleUtils;
 import 
org.apache.hyracks.storage.am.common.api.IModificationOperationCallbackFactory;
 import org.apache.hyracks.storage.am.common.api.ITupleFilterFactory;
@@ -56,22 +60,31 @@ import 
org.apache.hyracks.storage.am.lsm.common.dataflow.LSMIndexInsertUpdateDel
 public class LSMSecondaryUpsertOperatorNodePushable extends 
LSMIndexInsertUpdateDeleteOperatorNodePushable {
 
     private final PermutingFrameTupleReference prevValueTuple = new 
PermutingFrameTupleReference();
+    private final int upsertIndicatorFieldIndex;
+    private final IBinaryBooleanInspector upsertIndicatorInspector;
     private final int numberOfFields;
     private AbstractIndexModificationOperationCallback abstractModCallback;
+    private final boolean isPrimaryKeyIndex;
 
     public LSMSecondaryUpsertOperatorNodePushable(IHyracksTaskContext ctx, int 
partition,
             IIndexDataflowHelperFactory indexHelperFactory, 
IModificationOperationCallbackFactory modCallbackFactory,
             ITupleFilterFactory tupleFilterFactory, int[] fieldPermutation, 
RecordDescriptor inputRecDesc,
+            int upsertIndicatorFieldIndex, IBinaryBooleanInspectorFactory 
upsertIndicatorInspectorFactory,
             int[] prevValuePermutation) throws HyracksDataException {
         super(ctx, partition, indexHelperFactory, fieldPermutation, 
inputRecDesc, IndexOperation.UPSERT,
                 modCallbackFactory, tupleFilterFactory);
         this.prevValueTuple.setFieldPermutation(prevValuePermutation);
+        this.upsertIndicatorFieldIndex = upsertIndicatorFieldIndex;
+        this.upsertIndicatorInspector = 
upsertIndicatorInspectorFactory.createBinaryBooleanInspector(ctx);
         this.numberOfFields = prevValuePermutation.length;
+        // a primary key index only has primary keys, and thus these two 
permutations are the same
+        this.isPrimaryKeyIndex = Arrays.equals(fieldPermutation, 
prevValuePermutation);
     }
 
     @Override
     public void open() throws HyracksDataException {
         super.open();
+        frameTuple = new FrameTupleReference();
         abstractModCallback = (AbstractIndexModificationOperationCallback) 
modCallback;
     }
 
@@ -82,9 +95,15 @@ public class LSMSecondaryUpsertOperatorNodePushable extends 
LSMIndexInsertUpdate
         int tupleCount = accessor.getTupleCount();
         for (int i = 0; i < tupleCount; i++) {
             try {
+                frameTuple.reset(accessor, i);
+                boolean isUpsert =
+                        
upsertIndicatorInspector.getBooleanValue(frameTuple.getFieldData(upsertIndicatorFieldIndex),
+                                
frameTuple.getFieldStart(upsertIndicatorFieldIndex),
+                                
frameTuple.getFieldLength(upsertIndicatorFieldIndex));
                 // if both previous value and new value are null, then we skip
                 tuple.reset(accessor, i);
                 prevValueTuple.reset(accessor, i);
+
                 boolean isNewValueMissing = isMissing(tuple, 0);
                 boolean isOldValueMissing = isMissing(prevValueTuple, 0);
                 if (isNewValueMissing && isOldValueMissing) {
@@ -92,8 +111,10 @@ public class LSMSecondaryUpsertOperatorNodePushable extends 
LSMIndexInsertUpdate
                     continue;
                 }
                 // At least, one is not null
-                // If they are equal, then we skip
-                if (TupleUtils.equalTuples(tuple, prevValueTuple, 
numberOfFields)) {
+                if (!isPrimaryKeyIndex && TupleUtils.equalTuples(tuple, 
prevValueTuple, numberOfFields)) {
+                    // For a secondary index, if the secondary key values do 
not change, we can skip upserting it.
+                    // However, for a primary key index, we cannot do this 
because it only contains primary keys
+                    // which are always the same
                     continue;
                 }
                 if (!isOldValueMissing) {
@@ -101,7 +122,7 @@ public class LSMSecondaryUpsertOperatorNodePushable extends 
LSMIndexInsertUpdate
                     abstractModCallback.setOp(Operation.DELETE);
                     lsmAccessor.forceDelete(prevValueTuple);
                 }
-                if (!isNewValueMissing) {
+                if (isUpsert && !isNewValueMissing) {
                     // we need to insert the new value
                     abstractModCallback.setOp(Operation.INSERT);
                     lsmAccessor.forceInsert(tuple);

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/a691dd42/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/metadata/IMetadataProvider.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/metadata/IMetadataProvider.java
 
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/metadata/IMetadataProvider.java
index efa9c1c..3d004a2 100644
--- 
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/metadata/IMetadataProvider.java
+++ 
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/metadata/IMetadataProvider.java
@@ -205,8 +205,9 @@ public interface IMetadataProvider<S, I> {
             IDataSourceIndex<I, S> dataSourceIndex, IOperatorSchema 
propagatedSchema, IOperatorSchema[] inputSchemas,
             IVariableTypeEnvironment typeEnv, List<LogicalVariable> 
primaryKeys, List<LogicalVariable> secondaryKeys,
             List<LogicalVariable> additionalFilteringKeys, ILogicalExpression 
filterExpr,
-            List<LogicalVariable> prevSecondaryKeys, LogicalVariable 
prevAdditionalFilteringKeys,
-            RecordDescriptor inputDesc, JobGenContext context, 
JobSpecification spec) throws AlgebricksException;
+            LogicalVariable upsertIndicatorVar, List<LogicalVariable> 
prevSecondaryKeys,
+            LogicalVariable prevAdditionalFilteringKeys, RecordDescriptor 
inputDesc, JobGenContext context,
+            JobSpecification spec) throws AlgebricksException;
 
     public ITupleFilterFactory createTupleFilterFactory(IOperatorSchema[] 
inputSchemas,
             IVariableTypeEnvironment typeEnv, ILogicalExpression filterExpr, 
JobGenContext context)

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/a691dd42/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/IndexInsertDeleteUpsertOperator.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/IndexInsertDeleteUpsertOperator.java
 
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/IndexInsertDeleteUpsertOperator.java
index 31a1294..154fb13 100644
--- 
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/IndexInsertDeleteUpsertOperator.java
+++ 
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/IndexInsertDeleteUpsertOperator.java
@@ -49,6 +49,7 @@ public class IndexInsertDeleteUpsertOperator extends 
AbstractLogicalOperator {
     // used for upsert operations
     private List<Mutable<ILogicalExpression>> prevSecondaryKeyExprs;
     private Mutable<ILogicalExpression> prevAdditionalFilteringExpression;
+    private Mutable<ILogicalExpression> upsertIndicatorExpr;
     private final int numberOfAdditionalNonFilteringFields;
 
     public IndexInsertDeleteUpsertOperator(IDataSourceIndex<?, ?> 
dataSourceIndex,
@@ -93,6 +94,12 @@ public class IndexInsertDeleteUpsertOperator extends 
AbstractLogicalOperator {
                 }
             }
         }
+
+        // Upsert indicator var <For upsert>
+        if (upsertIndicatorExpr != null && 
visitor.transform(upsertIndicatorExpr)) {
+            b = true;
+        }
+
         // Old secondary <For upsert>
         if (prevSecondaryKeyExprs != null) {
             for (int i = 0; i < prevSecondaryKeyExprs.size(); i++) {
@@ -189,4 +196,12 @@ public class IndexInsertDeleteUpsertOperator extends 
AbstractLogicalOperator {
     public int getNumberOfAdditionalNonFilteringFields() {
         return numberOfAdditionalNonFilteringFields;
     }
+
+    public Mutable<ILogicalExpression> getUpsertIndicatorExpr() {
+        return upsertIndicatorExpr;
+    }
+
+    public void setUpsertIndicatorExpr(Mutable<ILogicalExpression> 
upsertIndicatorExpr) {
+        this.upsertIndicatorExpr = upsertIndicatorExpr;
+    }
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/a691dd42/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/InsertDeleteUpsertOperator.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/InsertDeleteUpsertOperator.java
 
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/InsertDeleteUpsertOperator.java
index 9838c12..ae90462 100644
--- 
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/InsertDeleteUpsertOperator.java
+++ 
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/InsertDeleteUpsertOperator.java
@@ -59,6 +59,9 @@ public class InsertDeleteUpsertOperator extends 
AbstractLogicalOperator {
     // previous additional fields (for UPSERT)
     private List<LogicalVariable> prevAdditionalNonFilteringVars;
     private List<Object> prevAdditionalNonFilteringTypes;
+    // a boolean variable that indicates whether it's a delete operation 
(false) or upsert operation (true)
+    private LogicalVariable upsertIndicatorVar;
+    private Object upsertIndicatorVarType;
 
     public InsertDeleteUpsertOperator(IDataSource<?> dataSource, 
Mutable<ILogicalExpression> payloadExpr,
             List<Mutable<ILogicalExpression>> primaryKeyExprs,
@@ -85,6 +88,7 @@ public class InsertDeleteUpsertOperator extends 
AbstractLogicalOperator {
     public void recomputeSchema() throws AlgebricksException {
         schema = new ArrayList<LogicalVariable>();
         if (operation == Kind.UPSERT) {
+            schema.add(upsertIndicatorVar);
             // The upsert case also produces the previous record
             schema.add(prevRecordVar);
             if (additionalNonFilteringExpressions != null) {
@@ -98,6 +102,9 @@ public class InsertDeleteUpsertOperator extends 
AbstractLogicalOperator {
     }
 
     public void getProducedVariables(Collection<LogicalVariable> 
producedVariables) {
+        if (upsertIndicatorVar != null) {
+            producedVariables.add(upsertIndicatorVar);
+        }
         if (prevRecordVar != null) {
             producedVariables.add(prevRecordVar);
         }
@@ -147,6 +154,7 @@ public class InsertDeleteUpsertOperator extends 
AbstractLogicalOperator {
             public void propagateVariables(IOperatorSchema target, 
IOperatorSchema... sources)
                     throws AlgebricksException {
                 if (operation == Kind.UPSERT) {
+                    target.addVariable(upsertIndicatorVar);
                     target.addVariable(prevRecordVar);
                     if (prevAdditionalNonFilteringVars != null) {
                         for (LogicalVariable var : 
prevAdditionalNonFilteringVars) {
@@ -171,6 +179,7 @@ public class InsertDeleteUpsertOperator extends 
AbstractLogicalOperator {
     public IVariableTypeEnvironment 
computeOutputTypeEnvironment(ITypingContext ctx) throws AlgebricksException {
         PropagatingTypeEnvironment env = 
createPropagatingAllInputsTypeEnvironment(ctx);
         if (operation == Kind.UPSERT) {
+            env.setVarType(upsertIndicatorVar, upsertIndicatorVarType);
             env.setVarType(prevRecordVar, prevRecordType);
             if (prevAdditionalNonFilteringVars != null) {
                 for (int i = 0; i < prevAdditionalNonFilteringVars.size(); 
i++) {
@@ -224,6 +233,22 @@ public class InsertDeleteUpsertOperator extends 
AbstractLogicalOperator {
         this.prevRecordVar = prevRecordVar;
     }
 
+    public LogicalVariable getUpsertIndicatorVar() {
+        return upsertIndicatorVar;
+    }
+
+    public void setUpsertIndicatorVar(LogicalVariable upsertIndicatorVar) {
+        this.upsertIndicatorVar = upsertIndicatorVar;
+    }
+
+    public Object getUpsertIndicatorVarType() {
+        return upsertIndicatorVarType;
+    }
+
+    public void setUpsertIndicatorVarType(Object upsertIndicatorVarType) {
+        this.upsertIndicatorVarType = upsertIndicatorVarType;
+    }
+
     public void setPrevRecordType(Object recordType) {
         prevRecordType = recordType;
     }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/a691dd42/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/UsedVariableVisitor.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/UsedVariableVisitor.java
 
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/UsedVariableVisitor.java
index d57a998..e66809e 100644
--- 
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/UsedVariableVisitor.java
+++ 
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/UsedVariableVisitor.java
@@ -421,6 +421,9 @@ public class UsedVariableVisitor implements 
ILogicalOperatorVisitor<Void, Void>
                 e.getValue().getUsedVariables(usedVariables);
             }
         }
+        if (op.getUpsertIndicatorExpr() != null) {
+            
op.getUpsertIndicatorExpr().getValue().getUsedVariables(usedVariables);
+        }
         return null;
     }
 

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/a691dd42/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/IndexInsertDeleteUpsertPOperator.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/IndexInsertDeleteUpsertPOperator.java
 
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/IndexInsertDeleteUpsertPOperator.java
index 92fa86d..228ca52 100644
--- 
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/IndexInsertDeleteUpsertPOperator.java
+++ 
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/IndexInsertDeleteUpsertPOperator.java
@@ -54,14 +54,16 @@ public class IndexInsertDeleteUpsertPOperator extends 
AbstractPhysicalOperator {
     private final ILogicalExpression filterExpr;
     private final IDataSourceIndex<?, ?> dataSourceIndex;
     private final List<LogicalVariable> additionalFilteringKeys;
+    private final LogicalVariable upsertIndicatorVar;
     private final List<LogicalVariable> prevSecondaryKeys;
     private final LogicalVariable prevAdditionalFilteringKey;
     private final int numOfAdditionalNonFilteringFields;
 
     public IndexInsertDeleteUpsertPOperator(List<LogicalVariable> primaryKeys, 
List<LogicalVariable> secondaryKeys,
             List<LogicalVariable> additionalFilteringKeys, 
Mutable<ILogicalExpression> filterExpr,
-            IDataSourceIndex<?, ?> dataSourceIndex, List<LogicalVariable> 
prevSecondaryKeys,
-            LogicalVariable prevAdditionalFilteringKey, int 
numOfAdditionalNonFilteringFields) {
+            IDataSourceIndex<?, ?> dataSourceIndex, LogicalVariable 
upsertIndicatorVar,
+            List<LogicalVariable> prevSecondaryKeys, LogicalVariable 
prevAdditionalFilteringKey,
+            int numOfAdditionalNonFilteringFields) {
         this.primaryKeys = primaryKeys;
         this.secondaryKeys = secondaryKeys;
         if (filterExpr != null) {
@@ -71,6 +73,7 @@ public class IndexInsertDeleteUpsertPOperator extends 
AbstractPhysicalOperator {
         }
         this.dataSourceIndex = dataSourceIndex;
         this.additionalFilteringKeys = additionalFilteringKeys;
+        this.upsertIndicatorVar = upsertIndicatorVar;
         this.prevSecondaryKeys = prevSecondaryKeys;
         this.prevAdditionalFilteringKey = prevAdditionalFilteringKey;
         this.numOfAdditionalNonFilteringFields = 
numOfAdditionalNonFilteringFields;
@@ -132,8 +135,8 @@ public class IndexInsertDeleteUpsertPOperator extends 
AbstractPhysicalOperator {
                 break;
             case UPSERT:
                 runtimeAndConstraints = 
mp.getIndexUpsertRuntime(dataSourceIndex, propagatedSchema, inputSchemas,
-                        typeEnv, primaryKeys, secondaryKeys, 
additionalFilteringKeys, filterExpr, prevSecondaryKeys,
-                        prevAdditionalFilteringKey, inputDesc, context, spec);
+                        typeEnv, primaryKeys, secondaryKeys, 
additionalFilteringKeys, filterExpr, upsertIndicatorVar,
+                        prevSecondaryKeys, prevAdditionalFilteringKey, 
inputDesc, context, spec);
                 break;
             default:
                 throw new AlgebricksException("Unsupported Operation " + 
operation);

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/a691dd42/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java
 
b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java
index d277043..4869761 100644
--- 
a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java
+++ 
b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java
@@ -356,9 +356,11 @@ public class SetAlgebricksPhysicalOperatorsRule implements 
IAlgebraicRewriteRule
                                 new IndexBulkloadPOperator(primaryKeys, 
secondaryKeys, additionalFilteringKeys,
                                         opInsDel.getFilterExpression(), 
opInsDel.getDataSourceIndex()));
                     } else {
+                        LogicalVariable upsertIndicatorVar = null;
                         List<LogicalVariable> prevSecondaryKeys = null;
                         LogicalVariable prevAdditionalFilteringKey = null;
                         if (opInsDel.getOperation() == Kind.UPSERT) {
+                            upsertIndicatorVar = 
getKey(opInsDel.getUpsertIndicatorExpr().getValue());
                             prevSecondaryKeys = new 
ArrayList<LogicalVariable>();
                             getKeys(opInsDel.getPrevSecondaryKeyExprs(), 
prevSecondaryKeys);
                             if 
(opInsDel.getPrevAdditionalFilteringExpression() != null) {
@@ -369,7 +371,7 @@ public class SetAlgebricksPhysicalOperatorsRule implements 
IAlgebraicRewriteRule
                         }
                         op.setPhysicalOperator(new 
IndexInsertDeleteUpsertPOperator(primaryKeys, secondaryKeys,
                                 additionalFilteringKeys, 
opInsDel.getFilterExpression(), opInsDel.getDataSourceIndex(),
-                                prevSecondaryKeys, prevAdditionalFilteringKey,
+                                upsertIndicatorVar, prevSecondaryKeys, 
prevAdditionalFilteringKey,
                                 
opInsDel.getNumberOfAdditionalNonFilteringFields()));
                     }
                     break;
@@ -407,12 +409,15 @@ public class SetAlgebricksPhysicalOperatorsRule 
implements IAlgebraicRewriteRule
 
     private static void getKeys(List<Mutable<ILogicalExpression>> 
keyExpressions, List<LogicalVariable> keys) {
         for (Mutable<ILogicalExpression> kExpr : keyExpressions) {
-            ILogicalExpression e = kExpr.getValue();
-            if (e.getExpressionTag() != LogicalExpressionTag.VARIABLE) {
-                throw new NotImplementedException();
-            }
-            keys.add(((VariableReferenceExpression) e).getVariableReference());
+            keys.add(getKey(kExpr.getValue()));
+        }
+    }
+
+    private static LogicalVariable getKey(ILogicalExpression keyExpression) {
+        if (keyExpression.getExpressionTag() != LogicalExpressionTag.VARIABLE) 
{
+            throw new NotImplementedException();
         }
+        return ((VariableReferenceExpression) 
keyExpression).getVariableReference();
     }
 
     private static LogicalVariable getKeysAndLoad(Mutable<ILogicalExpression> 
payloadExpr,

Reply via email to