This is an automated email from the ASF dual-hosted git repository.
alsuliman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/asterixdb.git
The following commit(s) were added to refs/heads/master by this push:
new 3584174 [ASTERIXDB-2791][IDX] Make BTree secondary indexes accept
NULLs/MISSINGs
3584174 is described below
commit 3584174b2a5cafa1ae7b90795f6dd4e0a9be6ed3
Author: Ali Alsuliman <[email protected]>
AuthorDate: Tue Jul 6 00:48:24 2021 +0300
[ASTERIXDB-2791][IDX] Make BTree secondary indexes accept NULLs/MISSINGs
- user model changes: no
- storage format changes: no
- interface changes: no
Details:
This patch is to allow BTree secondary indexes to store
NULLs and MISSINGs
Change-Id: I3342caa38f52d8d7019bbcd5bf81fc0cc01ca110
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/12065
Integration-Tests: Jenkins <[email protected]>
Tested-by: Jenkins <[email protected]>
Reviewed-by: Ali Alsuliman <[email protected]>
Reviewed-by: Murtadha Hubail <[email protected]>
---
.../IntroduceSecondaryIndexInsertDeleteRule.java | 4 +-
.../translator/LangExpressionToPlanTranslator.java | 8 +--
.../asterix/app/function/DumpIndexReader.java | 8 ++-
.../org/apache/asterix/utils/RebalanceUtil.java | 2 +-
.../ddl/index-bad-fields/index-bad-fields.004.adm | 1 +
.../insert_nulls_with_secondary_idx.4.adm | 4 ++
.../insert_nulls_with_secondary_idx.5.adm | 6 ++
.../insert_nulls_with_secondary_idx.8.adm | 6 ++
.../asterix/common/storage/IndexCheckpoint.java | 2 +-
.../metadata/declared/MetadataProvider.java | 52 +++++++--------
.../apache/asterix/metadata/utils/DatasetUtil.java | 6 +-
.../utils/SecondaryBTreeOperationsHelper.java | 20 +-----
.../LSMPrimaryUpsertOperatorNodePushable.java | 25 +++++--
.../LSMSecondaryUpsertOperatorDescriptor.java | 18 ++---
.../LSMSecondaryUpsertOperatorNodePushable.java | 77 ++++++++--------------
...daryUpsertWithNestedPlanOperatorDescriptor.java | 16 ++---
...ryUpsertWithNestedPlanOperatorNodePushable.java | 17 +++--
.../core/algebra/metadata/IMetadataProvider.java | 8 +--
.../logical/IndexInsertDeleteUpsertOperator.java | 20 +++---
.../logical/InsertDeleteUpsertOperator.java | 30 ++++-----
.../visitors/IsomorphismOperatorVisitor.java | 2 +-
.../visitors/SubstituteVariableVisitor.java | 6 +-
.../logical/visitors/UsedVariableVisitor.java | 4 +-
.../physical/IndexInsertDeleteUpsertPOperator.java | 8 +--
.../rules/SetAlgebricksPhysicalOperatorsRule.java | 9 ++-
25 files changed, 178 insertions(+), 181 deletions(-)
diff --git
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/IntroduceSecondaryIndexInsertDeleteRule.java
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/IntroduceSecondaryIndexInsertDeleteRule.java
index e567974..97954eb 100644
---
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/IntroduceSecondaryIndexInsertDeleteRule.java
+++
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/IntroduceSecondaryIndexInsertDeleteRule.java
@@ -629,8 +629,8 @@ public class IntroduceSecondaryIndexInsertDeleteRule
implements IAlgebraicRewrit
}
if (primaryIndexModificationOp.getOperation() == Kind.UPSERT) {
- indexUpdate.setUpsertIndicatorExpr(new MutableObject<>(
- new
VariableReferenceExpression(primaryIndexModificationOp.getUpsertIndicatorVar())));
+ indexUpdate.setOperationExpr(new MutableObject<>(
+ new
VariableReferenceExpression(primaryIndexModificationOp.getOperationVar())));
}
context.computeAndSetTypeEnvironmentForOperator(indexUpdate);
diff --git
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/LangExpressionToPlanTranslator.java
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/LangExpressionToPlanTranslator.java
index 16d5878..c053ab9 100644
---
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/LangExpressionToPlanTranslator.java
+++
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/LangExpressionToPlanTranslator.java
@@ -513,8 +513,8 @@ abstract class LangExpressionToPlanTranslator
upsertOp = new InsertDeleteUpsertOperator(targetDatasource,
payloadVarRef, varRefsForLoading,
Collections.singletonList(new
MutableObject<>(metaVarRef)), InsertDeleteUpsertOperator.Kind.UPSERT,
false);
- upsertOp.setUpsertIndicatorVar(context.newVar());
- upsertOp.setUpsertIndicatorVarType(BuiltinType.ABOOLEAN);
+ upsertOp.setOperationVar(context.newVar());
+ upsertOp.setOperationVarType(BuiltinType.AINT8);
// Create and add a new variable used for representing the
original record
upsertOp.setPrevRecordVar(context.newVar());
upsertOp.setPrevRecordType(targetDatasource.getItemType());
@@ -567,8 +567,8 @@ abstract class LangExpressionToPlanTranslator
upsertOp.getInputs().add(new MutableObject<>(pkeyAssignOp));
upsertOp.setAdditionalFilteringExpressions(filterExprs);
upsertOp.setSourceLocation(sourceLoc);
- upsertOp.setUpsertIndicatorVar(context.newVar());
- upsertOp.setUpsertIndicatorVarType(BuiltinType.ABOOLEAN);
+ upsertOp.setOperationVar(context.newVar());
+ upsertOp.setOperationVarType(BuiltinType.AINT8);
// Create and add a new variable used for representing the
original record
upsertOp.setPrevRecordVar(context.newVar());
upsertOp.setPrevRecordType(recordType);
diff --git
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/DumpIndexReader.java
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/DumpIndexReader.java
index 2a22ac6..1ed57c0 100644
---
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/DumpIndexReader.java
+++
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/DumpIndexReader.java
@@ -24,6 +24,8 @@ import java.nio.ByteBuffer;
import org.apache.asterix.external.api.IRawRecord;
import org.apache.asterix.external.input.record.CharArrayRecord;
+import org.apache.asterix.om.base.IAObject;
+import org.apache.asterix.om.types.ATypeTag;
import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
import org.apache.hyracks.api.exceptions.HyracksDataException;
@@ -98,7 +100,11 @@ public class DumpIndexReader extends FunctionReader {
recordBuilder.append("{\"values\":[");
for (int j = 0; j < tuple.getFieldCount(); ++j) {
bbis.setByteBuffer(ByteBuffer.wrap(tuple.getFieldData(j)),
tuple.getFieldStart(j));
-
recordBuilder.append(secondaryRecDesc.getFields()[j].deserialize(dis));
+ IAObject field = (IAObject)
secondaryRecDesc.getFields()[j].deserialize(dis);
+ if (field.getType().getTypeTag() == ATypeTag.MISSING) {
+ continue;
+ }
+ recordBuilder.append(field);
recordBuilder.append(",");
}
recordBuilder.deleteCharAt(recordBuilder.length() - 1);
diff --git
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/RebalanceUtil.java
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/RebalanceUtil.java
index ccb73b6..fcdc25a 100644
---
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/RebalanceUtil.java
+++
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/RebalanceUtil.java
@@ -389,7 +389,7 @@ public class RebalanceUtil {
// Gets the primary key permutation for upserts.
private static int[] getPrimaryKeyPermutationForUpsert(Dataset dataset) {
- // upsertIndicatorVar + prev record
+ // (upsert) operationVar + prev record
int f = 2;
// add the previous meta second
if (dataset.hasMetaPart()) {
diff --git
a/asterixdb/asterix-app/src/test/resources/runtimets/results/ddl/index-bad-fields/index-bad-fields.004.adm
b/asterixdb/asterix-app/src/test/resources/runtimets/results/ddl/index-bad-fields/index-bad-fields.004.adm
index ac11618..272c3c8 100644
---
a/asterixdb/asterix-app/src/test/resources/runtimets/results/ddl/index-bad-fields/index-bad-fields.004.adm
+++
b/asterixdb/asterix-app/src/test/resources/runtimets/results/ddl/index-bad-fields/index-bad-fields.004.adm
@@ -1 +1,2 @@
+{ "values": [ 1 ] }
{ "values": [ 95, 2 ] }
\ No newline at end of file
diff --git
a/asterixdb/asterix-app/src/test/resources/runtimets/results/misc/insert_nulls_with_secondary_idx/insert_nulls_with_secondary_idx.4.adm
b/asterixdb/asterix-app/src/test/resources/runtimets/results/misc/insert_nulls_with_secondary_idx/insert_nulls_with_secondary_idx.4.adm
index fab2047..4725f37 100644
---
a/asterixdb/asterix-app/src/test/resources/runtimets/results/misc/insert_nulls_with_secondary_idx/insert_nulls_with_secondary_idx.4.adm
+++
b/asterixdb/asterix-app/src/test/resources/runtimets/results/misc/insert_nulls_with_secondary_idx/insert_nulls_with_secondary_idx.4.adm
@@ -1,3 +1,7 @@
+{ "values": [ null, 1 ] }
+{ "values": [ null, 2 ] }
+{ "values": [ 3 ] }
+{ "values": [ 4 ] }
{ "values": [ 555, 5 ] }
{ "values": [ 888, 8 ] }
{ "values": [ 999, 9 ] }
\ No newline at end of file
diff --git
a/asterixdb/asterix-app/src/test/resources/runtimets/results/misc/insert_nulls_with_secondary_idx/insert_nulls_with_secondary_idx.5.adm
b/asterixdb/asterix-app/src/test/resources/runtimets/results/misc/insert_nulls_with_secondary_idx/insert_nulls_with_secondary_idx.5.adm
index fab2047..8d89e67 100644
---
a/asterixdb/asterix-app/src/test/resources/runtimets/results/misc/insert_nulls_with_secondary_idx/insert_nulls_with_secondary_idx.5.adm
+++
b/asterixdb/asterix-app/src/test/resources/runtimets/results/misc/insert_nulls_with_secondary_idx/insert_nulls_with_secondary_idx.5.adm
@@ -1,3 +1,9 @@
+{ "values": [ null, 1 ] }
+{ "values": [ null, 2 ] }
+{ "values": [ null, 6 ] }
+{ "values": [ 3 ] }
+{ "values": [ 4 ] }
+{ "values": [ 7 ] }
{ "values": [ 555, 5 ] }
{ "values": [ 888, 8 ] }
{ "values": [ 999, 9 ] }
\ No newline at end of file
diff --git
a/asterixdb/asterix-app/src/test/resources/runtimets/results/misc/insert_nulls_with_secondary_idx/insert_nulls_with_secondary_idx.8.adm
b/asterixdb/asterix-app/src/test/resources/runtimets/results/misc/insert_nulls_with_secondary_idx/insert_nulls_with_secondary_idx.8.adm
index 85c91e6..50f4b46 100644
---
a/asterixdb/asterix-app/src/test/resources/runtimets/results/misc/insert_nulls_with_secondary_idx/insert_nulls_with_secondary_idx.8.adm
+++
b/asterixdb/asterix-app/src/test/resources/runtimets/results/misc/insert_nulls_with_secondary_idx/insert_nulls_with_secondary_idx.8.adm
@@ -1,3 +1,9 @@
+{ "values": [ null, 1 ] }
+{ "values": [ null, 7 ] }
+{ "values": [ null, 9 ] }
+{ "values": [ 3 ] }
+{ "values": [ 6 ] }
+{ "values": [ 8 ] }
{ "values": [ 222, 2 ] }
{ "values": [ 444, 4 ] }
{ "values": [ 555, 5 ] }
\ No newline at end of file
diff --git
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/IndexCheckpoint.java
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/IndexCheckpoint.java
index 878c94e..832f3e9 100644
---
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/IndexCheckpoint.java
+++
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/IndexCheckpoint.java
@@ -38,7 +38,7 @@ public class IndexCheckpoint {
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
private static final long INITIAL_CHECKPOINT_ID = 0;
// TODO(mblow): remove this marker & related logic once we no longer are
able to read indexes prior to the fix
- private static final long HAS_NULL_MISSING_VALUES_FIX = -2;
+ private static final long HAS_NULL_MISSING_VALUES_FIX = -3;
private long id;
private long validComponentSequence;
private long lowWatermark;
diff --git
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
index 1721975..d970b3a 100644
---
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
+++
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
@@ -69,8 +69,8 @@ import
org.apache.asterix.external.operators.FeedIntakeOperatorDescriptor;
import org.apache.asterix.external.provider.AdapterFactoryProvider;
import org.apache.asterix.external.util.ExternalDataConstants;
import org.apache.asterix.formats.base.IDataFormat;
-import org.apache.asterix.formats.nontagged.BinaryBooleanInspector;
import org.apache.asterix.formats.nontagged.BinaryComparatorFactoryProvider;
+import org.apache.asterix.formats.nontagged.BinaryIntegerInspector;
import org.apache.asterix.formats.nontagged.LinearizeComparatorFactoryProvider;
import org.apache.asterix.formats.nontagged.TypeTraitProvider;
import org.apache.asterix.metadata.MetadataManager;
@@ -803,12 +803,12 @@ public class MetadataProvider implements
IMetadataProvider<DataSourceId, String>
IDataSourceIndex<String, DataSourceId> dataSourceIndex,
IOperatorSchema propagatedSchema,
IOperatorSchema[] inputSchemas, IVariableTypeEnvironment typeEnv,
List<LogicalVariable> primaryKeys,
List<LogicalVariable> secondaryKeys, List<LogicalVariable>
additionalFilteringKeys,
- ILogicalExpression filterExpr, LogicalVariable upsertIndicatorVar,
List<LogicalVariable> prevSecondaryKeys,
+ ILogicalExpression filterExpr, LogicalVariable operationVar,
List<LogicalVariable> prevSecondaryKeys,
LogicalVariable prevAdditionalFilteringKey, RecordDescriptor
recordDesc, JobGenContext context,
JobSpecification spec, List<List<AlgebricksPipeline>>
secondaryKeysPipelines) throws AlgebricksException {
return getIndexInsertOrDeleteOrUpsertRuntime(IndexOperation.UPSERT,
dataSourceIndex, propagatedSchema,
inputSchemas, typeEnv, primaryKeys, secondaryKeys,
additionalFilteringKeys, filterExpr, recordDesc,
- context, spec, false, upsertIndicatorVar, prevSecondaryKeys,
prevAdditionalFilteringKey,
+ context, spec, false, operationVar, prevSecondaryKeys,
prevAdditionalFilteringKey,
secondaryKeysPipelines, null);
}
@@ -1213,7 +1213,7 @@ public class MetadataProvider implements
IMetadataProvider<DataSourceId, String>
List<LogicalVariable> primaryKeys, List<LogicalVariable>
secondaryKeys,
List<LogicalVariable> additionalNonKeyFields, ILogicalExpression
filterExpr,
RecordDescriptor inputRecordDesc, JobGenContext context,
JobSpecification spec, boolean bulkload,
- LogicalVariable upsertIndicatorVar, List<LogicalVariable>
prevSecondaryKeys,
+ LogicalVariable operationVar, List<LogicalVariable>
prevSecondaryKeys,
LogicalVariable prevAdditionalFilteringKey,
List<List<AlgebricksPipeline>> secondaryKeysPipelines,
IOperatorSchema pipelineTopSchema) throws AlgebricksException {
String indexName = dataSourceIndex.getId();
@@ -1246,30 +1246,30 @@ public class MetadataProvider implements
IMetadataProvider<DataSourceId, String>
case BTREE:
return getBTreeRuntime(dataverseName, datasetName, indexName,
propagatedSchema, primaryKeys,
secondaryKeys, additionalNonKeyFields, filterFactory,
inputRecordDesc, context, spec, indexOp,
- bulkload, upsertIndicatorVar, prevSecondaryKeys,
prevAdditionalFilteringKeys);
+ bulkload, operationVar, prevSecondaryKeys,
prevAdditionalFilteringKeys);
case ARRAY:
if (bulkload) {
// In the case of bulk-load, we do not handle any nested
plans. We perform the exact same behavior
// as a normal B-Tree bulk load.
return getBTreeRuntime(dataverseName, datasetName,
indexName, propagatedSchema, primaryKeys,
secondaryKeys, additionalNonKeyFields,
filterFactory, inputRecordDesc, context, spec,
- indexOp, bulkload, upsertIndicatorVar,
prevSecondaryKeys, prevAdditionalFilteringKeys);
+ indexOp, bulkload, operationVar,
prevSecondaryKeys, prevAdditionalFilteringKeys);
} else {
return getArrayIndexRuntime(dataverseName, datasetName,
indexName, propagatedSchema, primaryKeys,
- additionalNonKeyFields, inputRecordDesc, spec,
indexOp, upsertIndicatorVar,
+ additionalNonKeyFields, inputRecordDesc, spec,
indexOp, operationVar,
secondaryKeysPipelines);
}
case RTREE:
return getRTreeRuntime(dataverseName, datasetName, indexName,
propagatedSchema, primaryKeys,
secondaryKeys, additionalNonKeyFields, filterFactory,
inputRecordDesc, context, spec, indexOp,
- bulkload, upsertIndicatorVar, prevSecondaryKeys,
prevAdditionalFilteringKeys);
+ bulkload, operationVar, prevSecondaryKeys,
prevAdditionalFilteringKeys);
case SINGLE_PARTITION_WORD_INVIX:
case SINGLE_PARTITION_NGRAM_INVIX:
case LENGTH_PARTITIONED_WORD_INVIX:
case LENGTH_PARTITIONED_NGRAM_INVIX:
return getInvertedIndexRuntime(dataverseName, datasetName,
indexName, propagatedSchema, primaryKeys,
secondaryKeys, additionalNonKeyFields, filterFactory,
inputRecordDesc, context, spec, indexOp,
- secondaryIndex.getIndexType(), bulkload,
upsertIndicatorVar, prevSecondaryKeys,
+ secondaryIndex.getIndexType(), bulkload, operationVar,
prevSecondaryKeys,
prevAdditionalFilteringKeys);
default:
throw new AlgebricksException(
@@ -1281,7 +1281,7 @@ public class MetadataProvider implements
IMetadataProvider<DataSourceId, String>
String datasetName, String indexName, IOperatorSchema
propagatedSchema, List<LogicalVariable> primaryKeys,
List<LogicalVariable> secondaryKeys, List<LogicalVariable>
additionalNonKeyFields,
AsterixTupleFilterFactory filterFactory, RecordDescriptor
inputRecordDesc, JobGenContext context,
- JobSpecification spec, IndexOperation indexOp, boolean bulkload,
LogicalVariable upsertIndicatorVar,
+ JobSpecification spec, IndexOperation indexOp, boolean bulkload,
LogicalVariable operationVar,
List<LogicalVariable> prevSecondaryKeys, List<LogicalVariable>
prevAdditionalFilteringKeys)
throws AlgebricksException {
Dataset dataset = MetadataManagerUtil.findExistingDataset(mdTxnCtx,
dataverseName, datasetName);
@@ -1350,10 +1350,10 @@ public class MetadataProvider implements
IMetadataProvider<DataSourceId, String>
StorageConstants.DEFAULT_TREE_FILL_FACTOR, false,
numElementsHint, false, idfh, null,
BulkLoadUsage.LOAD, dataset.getDatasetId(),
filterFactory);
} else if (indexOp == IndexOperation.UPSERT) {
- int upsertIndicatorFieldIndex =
propagatedSchema.findVariable(upsertIndicatorVar);
+ int operationFieldIndex =
propagatedSchema.findVariable(operationVar);
op = new LSMSecondaryUpsertOperatorDescriptor(spec,
inputRecordDesc, fieldPermutation, idfh,
- filterFactory, modificationCallbackFactory,
upsertIndicatorFieldIndex,
- BinaryBooleanInspector.FACTORY, prevFieldPermutation);
+ filterFactory, modificationCallbackFactory,
operationFieldIndex, BinaryIntegerInspector.FACTORY,
+ prevFieldPermutation);
} else {
op = new LSMTreeInsertDeleteOperatorDescriptor(spec,
inputRecordDesc, fieldPermutation, indexOp, idfh,
filterFactory, false, modificationCallbackFactory);
@@ -1367,8 +1367,8 @@ public class MetadataProvider implements
IMetadataProvider<DataSourceId, String>
private Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>
getArrayIndexRuntime(DataverseName dataverseName,
String datasetName, String indexName, IOperatorSchema
propagatedSchema, List<LogicalVariable> primaryKeys,
List<LogicalVariable> additionalNonKeyFields, RecordDescriptor
inputRecordDesc, JobSpecification spec,
- IndexOperation indexOp, LogicalVariable upsertIndicatorVar,
- List<List<AlgebricksPipeline>> secondaryKeysPipelines) throws
AlgebricksException {
+ IndexOperation indexOp, LogicalVariable operationVar,
List<List<AlgebricksPipeline>> secondaryKeysPipelines)
+ throws AlgebricksException {
Dataset dataset = MetadataManagerUtil.findExistingDataset(mdTxnCtx,
dataverseName, datasetName);
int numPrimaryKeys = primaryKeys.size();
@@ -1404,9 +1404,9 @@ public class MetadataProvider implements
IMetadataProvider<DataSourceId, String>
storageComponentProvider.getStorageManager(),
splitsAndConstraint.first);
IOperatorDescriptor op;
if (indexOp == IndexOperation.UPSERT) {
- int upsertIndicatorFieldIndex =
propagatedSchema.findVariable(upsertIndicatorVar);
+ int operationFieldIndex =
propagatedSchema.findVariable(operationVar);
op = new
LSMSecondaryUpsertWithNestedPlanOperatorDescriptor(spec, inputRecordDesc,
fieldPermutation,
- idfh, modificationCallbackFactory,
upsertIndicatorFieldIndex, BinaryBooleanInspector.FACTORY,
+ idfh, modificationCallbackFactory,
operationFieldIndex, BinaryIntegerInspector.FACTORY,
secondaryKeysPipelines.get(0),
secondaryKeysPipelines.get(1));
} else {
op = new
LSMSecondaryInsertDeleteWithNestedPlanOperatorDescriptor(spec, inputRecordDesc,
@@ -1422,7 +1422,7 @@ public class MetadataProvider implements
IMetadataProvider<DataSourceId, String>
String datasetName, String indexName, IOperatorSchema
propagatedSchema, List<LogicalVariable> primaryKeys,
List<LogicalVariable> secondaryKeys, List<LogicalVariable>
additionalNonKeyFields,
AsterixTupleFilterFactory filterFactory, RecordDescriptor
recordDesc, JobGenContext context,
- JobSpecification spec, IndexOperation indexOp, boolean bulkload,
LogicalVariable upsertIndicatorVar,
+ JobSpecification spec, IndexOperation indexOp, boolean bulkload,
LogicalVariable operationVar,
List<LogicalVariable> prevSecondaryKeys, List<LogicalVariable>
prevAdditionalFilteringKeys)
throws AlgebricksException {
Dataset dataset = MetadataManagerUtil.findExistingDataset(mdTxnCtx,
dataverseName, datasetName);
@@ -1505,10 +1505,10 @@ public class MetadataProvider implements
IMetadataProvider<DataSourceId, String>
StorageConstants.DEFAULT_TREE_FILL_FACTOR, false,
numElementsHint, false,
indexDataflowHelperFactory, null, BulkLoadUsage.LOAD,
dataset.getDatasetId(), filterFactory);
} else if (indexOp == IndexOperation.UPSERT) {
- int upsertIndicatorFieldIndex =
propagatedSchema.findVariable(upsertIndicatorVar);
+ int operationFieldIndex =
propagatedSchema.findVariable(operationVar);
op = new LSMSecondaryUpsertOperatorDescriptor(spec, recordDesc,
fieldPermutation,
- indexDataflowHelperFactory, filterFactory,
modificationCallbackFactory, upsertIndicatorFieldIndex,
- BinaryBooleanInspector.FACTORY, prevFieldPermutation);
+ indexDataflowHelperFactory, filterFactory,
modificationCallbackFactory, operationFieldIndex,
+ BinaryIntegerInspector.FACTORY, prevFieldPermutation);
} else {
op = new LSMTreeInsertDeleteOperatorDescriptor(spec, recordDesc,
fieldPermutation, indexOp,
indexDataflowHelperFactory, filterFactory, false,
modificationCallbackFactory);
@@ -1521,7 +1521,7 @@ public class MetadataProvider implements
IMetadataProvider<DataSourceId, String>
List<LogicalVariable> primaryKeys, List<LogicalVariable>
secondaryKeys,
List<LogicalVariable> additionalNonKeyFields,
AsterixTupleFilterFactory filterFactory,
RecordDescriptor recordDesc, JobGenContext context,
JobSpecification spec, IndexOperation indexOp,
- IndexType indexType, boolean bulkload, LogicalVariable
upsertIndicatorVar,
+ IndexType indexType, boolean bulkload, LogicalVariable
operationVar,
List<LogicalVariable> prevSecondaryKeys, List<LogicalVariable>
prevAdditionalFilteringKeys)
throws AlgebricksException {
// Check the index is length-partitioned or not.
@@ -1536,7 +1536,7 @@ public class MetadataProvider implements
IMetadataProvider<DataSourceId, String>
// Sanity checks.
if (primaryKeys.size() > 1) {
throw new AlgebricksException(
- "Cannot create inverted index on " + dataset(PLURAL) +
"with composite primary key.");
+ "Cannot create inverted index on " + dataset(PLURAL) + "
with composite primary key.");
}
// The size of secondaryKeys can be two if it receives input from its
// TokenizeOperator- [token, number of token]
@@ -1618,10 +1618,10 @@ public class MetadataProvider implements
IMetadataProvider<DataSourceId, String>
StorageConstants.DEFAULT_TREE_FILL_FACTOR, false,
numElementsHint, false, indexDataFlowFactory,
null, BulkLoadUsage.LOAD, dataset.getDatasetId(),
filterFactory);
} else if (indexOp == IndexOperation.UPSERT) {
- int upsertIndicatorFieldIndex =
propagatedSchema.findVariable(upsertIndicatorVar);
+ int upsertOperationFieldIndex =
propagatedSchema.findVariable(operationVar);
op = new LSMSecondaryUpsertOperatorDescriptor(spec,
recordDesc, fieldPermutation, indexDataFlowFactory,
- filterFactory, modificationCallbackFactory,
upsertIndicatorFieldIndex,
- BinaryBooleanInspector.FACTORY, prevFieldPermutation);
+ filterFactory, modificationCallbackFactory,
upsertOperationFieldIndex,
+ BinaryIntegerInspector.FACTORY, prevFieldPermutation);
} else {
op = new LSMTreeInsertDeleteOperatorDescriptor(spec,
recordDesc, fieldPermutation, indexOp,
indexDataFlowFactory, filterFactory, false,
modificationCallbackFactory);
diff --git
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtil.java
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtil.java
index 2e10d77..158efb2 100644
---
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtil.java
+++
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtil.java
@@ -475,9 +475,9 @@ public class DatasetUtil {
IDataFormat dataFormat = metadataProvider.getDataFormat();
int f = 0;
- // add the upsert indicator var
- outputSerDes[f] =
dataFormat.getSerdeProvider().getSerializerDeserializer(BuiltinType.ABOOLEAN);
- outputTypeTraits[f] =
dataFormat.getTypeTraitProvider().getTypeTrait(BuiltinType.ABOOLEAN);
+ // add the upsert operation var
+ outputSerDes[f] =
dataFormat.getSerdeProvider().getSerializerDeserializer(BuiltinType.AINT8);
+ outputTypeTraits[f] =
dataFormat.getTypeTraitProvider().getTypeTrait(BuiltinType.AINT8);
f++;
// add the previous record
outputSerDes[f] =
dataFormat.getSerdeProvider().getSerializerDeserializer(itemType);
diff --git
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryBTreeOperationsHelper.java
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryBTreeOperationsHelper.java
index 33f5b62..9645e7a 100644
---
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryBTreeOperationsHelper.java
+++
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryBTreeOperationsHelper.java
@@ -89,12 +89,6 @@ public class SecondaryBTreeOperationsHelper extends
SecondaryTreeIndexOperations
AlgebricksMetaOperatorDescriptor asterixAssignOp =
createExternalAssignOp(spec,
indexDetails.getKeyFieldNames().size(), secondaryRecDesc);
- // If any of the secondary fields are nullable, then add a select
op that filters nulls.
- AlgebricksMetaOperatorDescriptor selectOp = null;
- if (anySecondaryKeyIsNullable || isOverridingKeyFieldTypes) {
- selectOp = createFilterNullsSelectOp(spec,
indexDetails.getKeyFieldNames().size(), secondaryRecDesc);
- }
-
// Sort by secondary keys.
ExternalSortOperatorDescriptor sortOp = createSortOp(spec,
secondaryComparatorFactories, secondaryRecDesc);
// Create secondary BTree bulk load op.
@@ -117,12 +111,7 @@ public class SecondaryBTreeOperationsHelper extends
SecondaryTreeIndexOperations
spec.connect(new OneToOneConnectorDescriptor(spec),
secondaryBulkLoadOp, 0, metaOp, 0);
root = metaOp;
spec.connect(new OneToOneConnectorDescriptor(spec), sourceOp, 0,
asterixAssignOp, 0);
- if (anySecondaryKeyIsNullable || isOverridingKeyFieldTypes) {
- spec.connect(new OneToOneConnectorDescriptor(spec),
asterixAssignOp, 0, selectOp, 0);
- spec.connect(new OneToOneConnectorDescriptor(spec), selectOp,
0, sortOp, 0);
- } else {
- spec.connect(new OneToOneConnectorDescriptor(spec),
asterixAssignOp, 0, sortOp, 0);
- }
+ spec.connect(new OneToOneConnectorDescriptor(spec),
asterixAssignOp, 0, sortOp, 0);
spec.connect(new OneToOneConnectorDescriptor(spec), sortOp, 0,
secondaryBulkLoadOp, 0);
spec.addRoot(root);
spec.setConnectorPolicyAssignmentPolicy(new
ConnectorPolicyAssignmentPolicy());
@@ -149,13 +138,6 @@ public class SecondaryBTreeOperationsHelper extends
SecondaryTreeIndexOperations
spec.connect(new OneToOneConnectorDescriptor(spec), sourceOp, 0,
targetOp, 0);
sourceOp = targetOp;
- if (anySecondaryKeyIsNullable || isOverridingKeyFieldTypes) {
- // if any of the secondary fields are nullable, then add a
select op that filters nulls.
- // assign op ----> select op
- targetOp = createFilterNullsSelectOp(spec,
indexDetails.getKeyFieldNames().size(), secondaryRecDesc);
- spec.connect(new OneToOneConnectorDescriptor(spec), sourceOp,
0, targetOp, 0);
- sourceOp = targetOp;
- }
// no need to sort if the index is secondary primary index
if (!indexDetails.getKeyFieldNames().isEmpty()) {
diff --git
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryUpsertOperatorNodePushable.java
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryUpsertOperatorNodePushable.java
index 8e0de5f..591ff9a 100644
---
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryUpsertOperatorNodePushable.java
+++
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryUpsertOperatorNodePushable.java
@@ -30,7 +30,7 @@ import org.apache.asterix.common.dataflow.LSMIndexUtil;
import org.apache.asterix.common.exceptions.ACIDException;
import org.apache.asterix.common.transactions.ILogMarkerCallback;
import org.apache.asterix.common.transactions.PrimaryIndexLogMarkerCallback;
-import org.apache.asterix.om.base.ABoolean;
+import org.apache.asterix.om.base.AInt8;
import org.apache.asterix.om.pointables.nonvisitor.ARecordPointable;
import org.apache.asterix.om.types.ARecordType;
import org.apache.asterix.om.types.ATypeTag;
@@ -80,6 +80,9 @@ import org.apache.logging.log4j.Logger;
public class LSMPrimaryUpsertOperatorNodePushable extends
LSMIndexInsertUpdateDeleteOperatorNodePushable {
+ public static final AInt8 UPSERT_NEW = new AInt8((byte) 0);
+ public static final AInt8 UPSERT_EXISTING = new AInt8((byte) 1);
+ public static final AInt8 DELETE_EXISTING = new AInt8((byte) 2);
private static final Logger LOGGER = LogManager.getLogger();
private static final ThreadLocal<DateFormat> DATE_FORMAT =
ThreadLocal.withInitial(() -> new
SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS"));
@@ -172,21 +175,22 @@ public class LSMPrimaryUpsertOperatorNodePushable extends
LSMIndexInsertUpdateDe
if (cursor.hasNext()) {
cursor.next();
prevTuple = cursor.getTuple();
- appendUpsertIndicator(!isDelete);
+ appendOperationIndicator(!isDelete, true);
appendFilterToPrevTuple();
appendPrevRecord();
appendPreviousMeta();
appendFilterToOutput();
} else {
- appendUpsertIndicator(!isDelete);
+ appendOperationIndicator(!isDelete, false);
appendPreviousTupleAsMissing();
}
} finally {
cursor.close(); // end the search
}
} else {
+ // simple upsert into a non-filtered dataset having no
secondary indexes
searchCallback.before(key); // lock
- appendUpsertIndicator(!isDelete);
+ appendOperationIndicator(true, false);
appendPreviousTupleAsMissing();
}
beforeModification(tuple);
@@ -353,8 +357,17 @@ public class LSMPrimaryUpsertOperatorNodePushable extends
LSMIndexInsertUpdateDe
}
}
- protected void appendUpsertIndicator(boolean isUpsert) throws IOException {
- recordDesc.getFields()[0].serialize(isUpsert ? ABoolean.TRUE :
ABoolean.FALSE, dos);
+ @SuppressWarnings("unchecked") // using serializer
+ protected void appendOperationIndicator(boolean isUpsert, boolean
prevTupleExists) throws IOException {
+ if (isUpsert) {
+ if (prevTupleExists) {
+ recordDesc.getFields()[0].serialize(UPSERT_EXISTING, dos);
+ } else {
+ recordDesc.getFields()[0].serialize(UPSERT_NEW, dos);
+ }
+ } else {
+ recordDesc.getFields()[0].serialize(DELETE_EXISTING, dos);
+ }
tb.addFieldEndOffset();
}
diff --git
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMSecondaryUpsertOperatorDescriptor.java
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMSecondaryUpsertOperatorDescriptor.java
index a4b4012..3231162 100644
---
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMSecondaryUpsertOperatorDescriptor.java
+++
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMSecondaryUpsertOperatorDescriptor.java
@@ -19,7 +19,7 @@
package org.apache.asterix.runtime.operators;
import
org.apache.asterix.common.dataflow.LSMTreeInsertDeleteOperatorDescriptor;
-import org.apache.hyracks.algebricks.data.IBinaryBooleanInspectorFactory;
+import org.apache.hyracks.algebricks.data.IBinaryIntegerInspectorFactory;
import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.dataflow.IOperatorNodePushable;
import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider;
@@ -33,21 +33,21 @@ import
org.apache.hyracks.storage.am.common.ophelpers.IndexOperation;
public class LSMSecondaryUpsertOperatorDescriptor extends
LSMTreeInsertDeleteOperatorDescriptor {
- private static final long serialVersionUID = 1L;
+ private static final long serialVersionUID = 2L;
private final int[] prevValuePermutation;
- protected final int upsertIndicatorFieldIndex;
- protected final IBinaryBooleanInspectorFactory
upsertIndicatorInspectorFactory;
+ protected final int operationFieldIndex;
+ protected final IBinaryIntegerInspectorFactory operationInspectorFactory;
public LSMSecondaryUpsertOperatorDescriptor(IOperatorDescriptorRegistry
spec, RecordDescriptor outRecDesc,
int[] fieldPermutation, IIndexDataflowHelperFactory
indexHelperFactory,
ITupleFilterFactory tupleFilterFactory,
IModificationOperationCallbackFactory modificationOpCallbackFactory,
- int upsertIndicatorFieldIndex, IBinaryBooleanInspectorFactory
upsertIndicatorInspectorFactory,
+ int operationFieldIndex, IBinaryIntegerInspectorFactory
operationInspectorFactory,
int[] prevValuePermutation) {
super(spec, outRecDesc, fieldPermutation, IndexOperation.UPSERT,
indexHelperFactory, tupleFilterFactory, false,
modificationOpCallbackFactory);
this.prevValuePermutation = prevValuePermutation;
- this.upsertIndicatorFieldIndex = upsertIndicatorFieldIndex;
- this.upsertIndicatorInspectorFactory = upsertIndicatorInspectorFactory;
+ this.operationFieldIndex = operationFieldIndex;
+ this.operationInspectorFactory = operationInspectorFactory;
}
@Override
@@ -55,7 +55,7 @@ public class LSMSecondaryUpsertOperatorDescriptor extends
LSMTreeInsertDeleteOpe
IRecordDescriptorProvider recordDescProvider, int partition, int
nPartitions) throws HyracksDataException {
RecordDescriptor intputRecDesc =
recordDescProvider.getInputRecordDescriptor(getActivityId(), 0);
return new LSMSecondaryUpsertOperatorNodePushable(ctx, partition,
indexHelperFactory, modCallbackFactory,
- tupleFilterFactory, fieldPermutation, intputRecDesc,
upsertIndicatorFieldIndex,
- upsertIndicatorInspectorFactory, prevValuePermutation);
+ tupleFilterFactory, fieldPermutation, intputRecDesc,
operationFieldIndex, operationInspectorFactory,
+ prevValuePermutation);
}
}
diff --git
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMSecondaryUpsertOperatorNodePushable.java
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMSecondaryUpsertOperatorNodePushable.java
index b588323..482be06 100644
---
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMSecondaryUpsertOperatorNodePushable.java
+++
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMSecondaryUpsertOperatorNodePushable.java
@@ -19,14 +19,13 @@
package org.apache.asterix.runtime.operators;
import java.nio.ByteBuffer;
-import java.util.Arrays;
import org.apache.asterix.om.types.ATypeTag;
import org.apache.asterix.om.types.TypeTagUtil;
import
org.apache.asterix.transaction.management.opcallbacks.AbstractIndexModificationOperationCallback;
import
org.apache.asterix.transaction.management.opcallbacks.AbstractIndexModificationOperationCallback.Operation;
-import org.apache.hyracks.algebricks.data.IBinaryBooleanInspector;
-import org.apache.hyracks.algebricks.data.IBinaryBooleanInspectorFactory;
+import org.apache.hyracks.algebricks.data.IBinaryIntegerInspector;
+import org.apache.hyracks.algebricks.data.IBinaryIntegerInspectorFactory;
import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
import org.apache.hyracks.api.exceptions.HyracksDataException;
@@ -48,38 +47,32 @@ import
org.apache.hyracks.storage.am.lsm.common.dataflow.LSMIndexInsertUpdateDel
* -If old secondary index tuple == new secondary index tuple
* --do nothing
* -else
- * --If any old field is null/missing?
- * ---do nothing
- * --else
- * ---delete old secondary index tuple
- * --If any new field is null/missing?
- * ---do nothing
- * --else
- * ---insert new secondary index tuple
+ * --perform the operation based on the operation kind
*/
public class LSMSecondaryUpsertOperatorNodePushable extends
LSMIndexInsertUpdateDeleteOperatorNodePushable {
- private final PermutingFrameTupleReference prevValueTuple = new
PermutingFrameTupleReference();
+ protected static final int UPSERT_NEW =
LSMPrimaryUpsertOperatorNodePushable.UPSERT_NEW.getByteValue();
+ protected static final int UPSERT_EXISTING =
LSMPrimaryUpsertOperatorNodePushable.UPSERT_EXISTING.getByteValue();
+ protected static final int DELETE_EXISTING =
LSMPrimaryUpsertOperatorNodePushable.DELETE_EXISTING.getByteValue();
+
+ private final PermutingFrameTupleReference prevTuple = new
PermutingFrameTupleReference();
private final int numberOfFields;
- private final boolean isPrimaryKeyIndex;
- protected final int upsertIndicatorFieldIndex;
- protected final IBinaryBooleanInspector upsertIndicatorInspector;
+ protected final int operationFieldIndex;
+ protected final IBinaryIntegerInspector operationInspector;
protected AbstractIndexModificationOperationCallback abstractModCallback;
public LSMSecondaryUpsertOperatorNodePushable(IHyracksTaskContext ctx, int
partition,
IIndexDataflowHelperFactory indexHelperFactory,
IModificationOperationCallbackFactory modCallbackFactory,
ITupleFilterFactory tupleFilterFactory, int[] fieldPermutation,
RecordDescriptor inputRecDesc,
- int upsertIndicatorFieldIndex, IBinaryBooleanInspectorFactory
upsertIndicatorInspectorFactory,
- int[] prevValuePermutation) throws HyracksDataException {
+ int operationFieldIndex, IBinaryIntegerInspectorFactory
operationInspectorFactory,
+ int[] prevTuplePermutation) throws HyracksDataException {
super(ctx, partition, indexHelperFactory, fieldPermutation,
inputRecDesc, IndexOperation.UPSERT,
modCallbackFactory, tupleFilterFactory);
- this.prevValueTuple.setFieldPermutation(prevValuePermutation);
- this.upsertIndicatorFieldIndex = upsertIndicatorFieldIndex;
- this.upsertIndicatorInspector =
upsertIndicatorInspectorFactory.createBinaryBooleanInspector(ctx);
+ this.prevTuple.setFieldPermutation(prevTuplePermutation);
+ this.operationFieldIndex = operationFieldIndex;
+ this.operationInspector =
operationInspectorFactory.createBinaryIntegerInspector(ctx);
this.numberOfFields = fieldPermutation.length;
- // a primary key index only has primary keys, and thus these two
permutations are the same
- this.isPrimaryKeyIndex = Arrays.equals(fieldPermutation,
prevValuePermutation);
}
@Override
@@ -97,36 +90,24 @@ public class LSMSecondaryUpsertOperatorNodePushable extends
LSMIndexInsertUpdate
for (int i = 0; i < tupleCount; i++) {
try {
frameTuple.reset(accessor, i);
- boolean isUpsert =
-
upsertIndicatorInspector.getBooleanValue(frameTuple.getFieldData(upsertIndicatorFieldIndex),
-
frameTuple.getFieldStart(upsertIndicatorFieldIndex),
-
frameTuple.getFieldLength(upsertIndicatorFieldIndex));
- // if both previous value and new value are null, then we skip
+ int operation =
operationInspector.getIntegerValue(frameTuple.getFieldData(operationFieldIndex),
+ frameTuple.getFieldStart(operationFieldIndex),
frameTuple.getFieldLength(operationFieldIndex));
tuple.reset(accessor, i);
- prevValueTuple.reset(accessor, i);
+ prevTuple.reset(accessor, i);
- boolean newTupleHasNullOrMissing = hasNullOrMissing(tuple);
- boolean oldTupleHasNullOrMissing =
hasNullOrMissing(prevValueTuple);
- if (newTupleHasNullOrMissing && oldTupleHasNullOrMissing) {
- // No op
- continue;
- }
- // At least, one is not null
- if (!isPrimaryKeyIndex && TupleUtils.equalTuples(tuple,
prevValueTuple, numberOfFields)) {
- // For a secondary index, if the secondary key values do
not change, we can skip upserting it.
- // However, for a primary key index, we cannot do this
because it only contains primary keys
- // which are always the same
- continue;
- }
- // if all old fields are known values, then delete. skip
deleting if any is null or missing
- if (!oldTupleHasNullOrMissing) {
- abstractModCallback.setOp(Operation.DELETE);
- lsmAccessor.forceDelete(prevValueTuple);
- }
- // if all new fields are known values, then insert. skip
inserting if any is null or missing
- if (isUpsert && !newTupleHasNullOrMissing) {
+ if (operation == UPSERT_NEW) {
abstractModCallback.setOp(Operation.INSERT);
lsmAccessor.forceInsert(tuple);
+ } else if (operation == UPSERT_EXISTING) {
+ if (!TupleUtils.equalTuples(tuple, prevTuple,
numberOfFields)) {
+ abstractModCallback.setOp(Operation.DELETE);
+ lsmAccessor.forceDelete(prevTuple);
+ abstractModCallback.setOp(Operation.INSERT);
+ lsmAccessor.forceInsert(tuple);
+ }
+ } else if (operation == DELETE_EXISTING) {
+ abstractModCallback.setOp(Operation.DELETE);
+ lsmAccessor.forceDelete(prevTuple);
}
} catch (Exception e) {
throw HyracksDataException.create(e);
diff --git
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMSecondaryUpsertWithNestedPlanOperatorDescriptor.java
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMSecondaryUpsertWithNestedPlanOperatorDescriptor.java
index d077987..bbf0af1 100644
---
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMSecondaryUpsertWithNestedPlanOperatorDescriptor.java
+++
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMSecondaryUpsertWithNestedPlanOperatorDescriptor.java
@@ -20,7 +20,7 @@ package org.apache.asterix.runtime.operators;
import java.util.List;
-import org.apache.hyracks.algebricks.data.IBinaryBooleanInspectorFactory;
+import org.apache.hyracks.algebricks.data.IBinaryIntegerInspectorFactory;
import org.apache.hyracks.algebricks.runtime.base.AlgebricksPipeline;
import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.dataflow.IOperatorNodePushable;
@@ -39,11 +39,11 @@ public class
LSMSecondaryUpsertWithNestedPlanOperatorDescriptor extends LSMSecon
public LSMSecondaryUpsertWithNestedPlanOperatorDescriptor(JobSpecification
spec, RecordDescriptor outRecDesc,
int[] fieldPermutation, IIndexDataflowHelperFactory
indexHelperFactory,
- IModificationOperationCallbackFactory modCallbackFactory, int
upsertIndicatorFieldIndex,
- IBinaryBooleanInspectorFactory upsertIndicatorInspectorFactory,
- List<AlgebricksPipeline> secondaryKeysPipeline,
List<AlgebricksPipeline> prevSecondaryKeysPipeline) {
- super(spec, outRecDesc, fieldPermutation, indexHelperFactory, null,
modCallbackFactory,
- upsertIndicatorFieldIndex, upsertIndicatorInspectorFactory,
null);
+ IModificationOperationCallbackFactory modCallbackFactory, int
operationFieldIndex,
+ IBinaryIntegerInspectorFactory operationInspectorFactory,
List<AlgebricksPipeline> secondaryKeysPipeline,
+ List<AlgebricksPipeline> prevSecondaryKeysPipeline) {
+ super(spec, outRecDesc, fieldPermutation, indexHelperFactory, null,
modCallbackFactory, operationFieldIndex,
+ operationInspectorFactory, null);
this.secondaryKeysPipeline = secondaryKeysPipeline;
this.prevSecondaryKeysPipeline = prevSecondaryKeysPipeline;
}
@@ -53,7 +53,7 @@ public class
LSMSecondaryUpsertWithNestedPlanOperatorDescriptor extends LSMSecon
IRecordDescriptorProvider recordDescProvider, int partition, int
nPartitions) throws HyracksDataException {
RecordDescriptor inputRecDesc =
recordDescProvider.getInputRecordDescriptor(getActivityId(), 0);
return new LSMSecondaryUpsertWithNestedPlanOperatorNodePushable(ctx,
partition, indexHelperFactory,
- modCallbackFactory, fieldPermutation, inputRecDesc,
upsertIndicatorFieldIndex,
- upsertIndicatorInspectorFactory, secondaryKeysPipeline,
prevSecondaryKeysPipeline);
+ modCallbackFactory, fieldPermutation, inputRecDesc,
operationFieldIndex, operationInspectorFactory,
+ secondaryKeysPipeline, prevSecondaryKeysPipeline);
}
}
diff --git
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMSecondaryUpsertWithNestedPlanOperatorNodePushable.java
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMSecondaryUpsertWithNestedPlanOperatorNodePushable.java
index 303bece..3fe3e85 100644
---
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMSecondaryUpsertWithNestedPlanOperatorNodePushable.java
+++
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMSecondaryUpsertWithNestedPlanOperatorNodePushable.java
@@ -24,7 +24,7 @@ import java.util.List;
import org.apache.asterix.common.exceptions.ErrorCode;
import org.apache.asterix.common.exceptions.RuntimeDataException;
import
org.apache.asterix.transaction.management.opcallbacks.AbstractIndexModificationOperationCallback;
-import org.apache.hyracks.algebricks.data.IBinaryBooleanInspectorFactory;
+import org.apache.hyracks.algebricks.data.IBinaryIntegerInspectorFactory;
import org.apache.hyracks.algebricks.runtime.base.AlgebricksPipeline;
import org.apache.hyracks.algebricks.runtime.base.IPushRuntime;
import org.apache.hyracks.algebricks.runtime.operators.meta.PipelineAssembler;
@@ -48,12 +48,11 @@ public class
LSMSecondaryUpsertWithNestedPlanOperatorNodePushable extends LSMSec
public
LSMSecondaryUpsertWithNestedPlanOperatorNodePushable(IHyracksTaskContext ctx,
int partition,
IIndexDataflowHelperFactory indexHelperFactory,
IModificationOperationCallbackFactory modCallbackFactory,
- int[] fieldPermutation, RecordDescriptor inputRecDesc, int
upsertIndicatorFieldIndex,
- IBinaryBooleanInspectorFactory upsertIndicatorInspectorFactory,
- List<AlgebricksPipeline> secondaryKeysPipeline,
List<AlgebricksPipeline> prevSecondaryKeysPipeline)
- throws HyracksDataException {
+ int[] fieldPermutation, RecordDescriptor inputRecDesc, int
operationFieldIndex,
+ IBinaryIntegerInspectorFactory operationInspectorFactory,
List<AlgebricksPipeline> secondaryKeysPipeline,
+ List<AlgebricksPipeline> prevSecondaryKeysPipeline) throws
HyracksDataException {
super(ctx, partition, indexHelperFactory, modCallbackFactory, null,
fieldPermutation, inputRecDesc,
- upsertIndicatorFieldIndex, upsertIndicatorInspectorFactory,
null);
+ operationFieldIndex, operationInspectorFactory, null);
this.numberOfPrimaryKeyAndFilterFields = fieldPermutation.length;
this.startOfNewKeyPipelines =
buildStartOfPipelines(secondaryKeysPipeline, inputRecDesc, false);
this.startOfPrevKeyPipelines =
buildStartOfPipelines(prevSecondaryKeysPipeline, inputRecDesc, true);
@@ -110,9 +109,9 @@ public class
LSMSecondaryUpsertWithNestedPlanOperatorNodePushable extends LSMSec
// Insert all of our new keys, if the PIDX operation was also an
UPSERT (and not just a DELETE).
frameTuple.reset(accessor, i);
- if
(upsertIndicatorInspector.getBooleanValue(frameTuple.getFieldData(upsertIndicatorFieldIndex),
- frameTuple.getFieldStart(upsertIndicatorFieldIndex),
- frameTuple.getFieldLength(upsertIndicatorFieldIndex))) {
+ int operation =
operationInspector.getIntegerValue(frameTuple.getFieldData(operationFieldIndex),
+ frameTuple.getFieldStart(operationFieldIndex),
frameTuple.getFieldLength(operationFieldIndex));
+ if (operation == UPSERT_NEW || operation == UPSERT_EXISTING) {
writeTupleToPipelineStarts(buffer, i, startOfNewKeyPipelines);
}
}
diff --git
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/metadata/IMetadataProvider.java
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/metadata/IMetadataProvider.java
index 8540d0b..9bafe3e 100644
---
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/metadata/IMetadataProvider.java
+++
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/metadata/IMetadataProvider.java
@@ -218,10 +218,10 @@ public interface IMetadataProvider<S, I> {
public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>
getIndexUpsertRuntime(
IDataSourceIndex<I, S> dataSourceIndex, IOperatorSchema
propagatedSchema, IOperatorSchema[] inputSchemas,
IVariableTypeEnvironment typeEnv, List<LogicalVariable>
primaryKeys, List<LogicalVariable> secondaryKeys,
- List<LogicalVariable> additionalFilteringKeys, ILogicalExpression
filterExpr,
- LogicalVariable upsertIndicatorVar, List<LogicalVariable>
prevSecondaryKeys,
- LogicalVariable prevAdditionalFilteringKeys, RecordDescriptor
inputDesc, JobGenContext context,
- JobSpecification spec, List<List<AlgebricksPipeline>>
secondaryKeysPipelines) throws AlgebricksException;
+ List<LogicalVariable> additionalFilteringKeys, ILogicalExpression
filterExpr, LogicalVariable operationVar,
+ List<LogicalVariable> prevSecondaryKeys, LogicalVariable
prevAdditionalFilteringKeys,
+ RecordDescriptor inputDesc, JobGenContext context,
JobSpecification spec,
+ List<List<AlgebricksPipeline>> secondaryKeysPipelines) throws
AlgebricksException;
public ITupleFilterFactory createTupleFilterFactory(IOperatorSchema[]
inputSchemas,
IVariableTypeEnvironment typeEnv, ILogicalExpression filterExpr,
JobGenContext context)
diff --git
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/IndexInsertDeleteUpsertOperator.java
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/IndexInsertDeleteUpsertOperator.java
index 7b6ed26..f2ac41a 100644
---
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/IndexInsertDeleteUpsertOperator.java
+++
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/IndexInsertDeleteUpsertOperator.java
@@ -45,7 +45,7 @@ import
org.apache.hyracks.algebricks.core.algebra.visitors.ILogicalOperatorVisit
* perform. In the case of bulk-loading, {@link #operation} will be INSERT and
the {@link #bulkload} flag will be
* raised. {@link #additionalFilteringExpressions} and {@link
#numberOfAdditionalNonFilteringFields} refers to the
* additionalFilteringExpressions, numberOfAdditionalNonFilteringFields found
in the corresponding primary index
- * {@link InsertDeleteUpsertOperator} (i.e. to specify LSM filters). {@link
#upsertIndicatorExpr} also originates from
+ * {@link InsertDeleteUpsertOperator} (i.e. to specify LSM filters). {@link
#operationExpr} also originates from
* {@link InsertDeleteUpsertOperator}, and is only set when the operation is
of kind UPSERT.
* <p>
*
@@ -84,7 +84,7 @@ public class IndexInsertDeleteUpsertOperator extends
AbstractOperatorWithNestedP
// used for upsert operations
private List<Mutable<ILogicalExpression>> prevSecondaryKeyExprs;
private Mutable<ILogicalExpression> prevAdditionalFilteringExpression;
- private Mutable<ILogicalExpression> upsertIndicatorExpr;
+ private Mutable<ILogicalExpression> operationExpr;
private final int numberOfAdditionalNonFilteringFields;
public IndexInsertDeleteUpsertOperator(IDataSourceIndex<?, ?>
dataSourceIndex,
@@ -129,8 +129,8 @@ public class IndexInsertDeleteUpsertOperator extends
AbstractOperatorWithNestedP
}
}
}
- // Upsert indicator var <For upsert>
- if (upsertIndicatorExpr != null &&
visitor.transform(upsertIndicatorExpr)) {
+ // Operation indicator var <For upsert>
+ if (operationExpr != null && visitor.transform(operationExpr)) {
b = true;
}
// Old secondary <For upsert>
@@ -177,8 +177,8 @@ public class IndexInsertDeleteUpsertOperator extends
AbstractOperatorWithNestedP
e.getValue().getUsedVariables(vars);
}
}
- if (getUpsertIndicatorExpr() != null) {
- getUpsertIndicatorExpr().getValue().getUsedVariables(vars);
+ if (getOperationExpr() != null) {
+ getOperationExpr().getValue().getUsedVariables(vars);
}
}
@@ -273,11 +273,11 @@ public class IndexInsertDeleteUpsertOperator extends
AbstractOperatorWithNestedP
return numberOfAdditionalNonFilteringFields;
}
- public Mutable<ILogicalExpression> getUpsertIndicatorExpr() {
- return upsertIndicatorExpr;
+ public Mutable<ILogicalExpression> getOperationExpr() {
+ return operationExpr;
}
- public void setUpsertIndicatorExpr(Mutable<ILogicalExpression>
upsertIndicatorExpr) {
- this.upsertIndicatorExpr = upsertIndicatorExpr;
+ public void setOperationExpr(Mutable<ILogicalExpression> operationExpr) {
+ this.operationExpr = operationExpr;
}
}
diff --git
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/InsertDeleteUpsertOperator.java
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/InsertDeleteUpsertOperator.java
index ce2f801..5a54c7e 100644
---
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/InsertDeleteUpsertOperator.java
+++
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/InsertDeleteUpsertOperator.java
@@ -59,9 +59,9 @@ public class InsertDeleteUpsertOperator extends
AbstractLogicalOperator {
// previous additional fields (for UPSERT)
private List<LogicalVariable> prevAdditionalNonFilteringVars;
private List<Object> prevAdditionalNonFilteringTypes;
- // a boolean variable that indicates whether it's a delete operation
(false) or upsert operation (true)
- private LogicalVariable upsertIndicatorVar;
- private Object upsertIndicatorVarType;
+ // int describing the upsert (e.g. upserting a new tuple or to an existing
tuple or just deleting an existing one)
+ private LogicalVariable operationVar;
+ private Object operationVarType;
public InsertDeleteUpsertOperator(IDataSource<?> dataSource,
Mutable<ILogicalExpression> payloadExpr,
List<Mutable<ILogicalExpression>> primaryKeyExprs,
@@ -88,7 +88,7 @@ public class InsertDeleteUpsertOperator extends
AbstractLogicalOperator {
public void recomputeSchema() throws AlgebricksException {
schema = new ArrayList<LogicalVariable>();
if (operation == Kind.UPSERT) {
- schema.add(upsertIndicatorVar);
+ schema.add(operationVar);
// The upsert case also produces the previous record
schema.add(prevRecordVar);
if (additionalNonFilteringExpressions != null) {
@@ -103,7 +103,7 @@ public class InsertDeleteUpsertOperator extends
AbstractLogicalOperator {
public void getProducedVariables(Collection<LogicalVariable>
producedVariables) {
if (operation == Kind.UPSERT) {
- producedVariables.add(upsertIndicatorVar);
+ producedVariables.add(operationVar);
producedVariables.add(prevRecordVar);
if (prevAdditionalNonFilteringVars != null) {
producedVariables.addAll(prevAdditionalNonFilteringVars);
@@ -150,7 +150,7 @@ public class InsertDeleteUpsertOperator extends
AbstractLogicalOperator {
@Override
public void propagateVariables(IOperatorSchema target,
IOperatorSchema... sources) {
if (operation == Kind.UPSERT) {
- target.addVariable(upsertIndicatorVar);
+ target.addVariable(operationVar);
target.addVariable(prevRecordVar);
if (prevAdditionalNonFilteringVars != null) {
for (LogicalVariable var :
prevAdditionalNonFilteringVars) {
@@ -175,7 +175,7 @@ public class InsertDeleteUpsertOperator extends
AbstractLogicalOperator {
public IVariableTypeEnvironment
computeOutputTypeEnvironment(ITypingContext ctx) throws AlgebricksException {
PropagatingTypeEnvironment env =
createPropagatingAllInputsTypeEnvironment(ctx);
if (operation == Kind.UPSERT) {
- env.setVarType(upsertIndicatorVar, upsertIndicatorVarType);
+ env.setVarType(operationVar, operationVarType);
env.setVarType(prevRecordVar, prevRecordType);
if (prevAdditionalNonFilteringVars != null) {
for (int i = 0; i < prevAdditionalNonFilteringVars.size();
i++) {
@@ -229,20 +229,20 @@ public class InsertDeleteUpsertOperator extends
AbstractLogicalOperator {
this.prevRecordVar = prevRecordVar;
}
- public LogicalVariable getUpsertIndicatorVar() {
- return upsertIndicatorVar;
+ public LogicalVariable getOperationVar() {
+ return operationVar;
}
- public void setUpsertIndicatorVar(LogicalVariable upsertIndicatorVar) {
- this.upsertIndicatorVar = upsertIndicatorVar;
+ public void setOperationVar(LogicalVariable operationVar) {
+ this.operationVar = operationVar;
}
- public Object getUpsertIndicatorVarType() {
- return upsertIndicatorVarType;
+ public Object getOperationVarType() {
+ return operationVarType;
}
- public void setUpsertIndicatorVarType(Object upsertIndicatorVarType) {
- this.upsertIndicatorVarType = upsertIndicatorVarType;
+ public void setOperationVarType(Object operationVarType) {
+ this.operationVarType = operationVarType;
}
public void setPrevRecordType(Object recordType) {
diff --git
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismOperatorVisitor.java
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismOperatorVisitor.java
index a9f9626..c4b4e47 100644
---
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismOperatorVisitor.java
+++
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismOperatorVisitor.java
@@ -617,7 +617,7 @@ public class IsomorphismOperatorVisitor implements
ILogicalOperatorVisitor<Boole
|| !Objects.equals(op.getPrevSecondaryKeyExprs(),
insertOpArg.getPrevSecondaryKeyExprs())
|| !Objects.equals(op.getPrevAdditionalFilteringExpression(),
insertOpArg.getPrevAdditionalFilteringExpression())
- || !Objects.equals(op.getUpsertIndicatorExpr(),
insertOpArg.getUpsertIndicatorExpr())
+ || !Objects.equals(op.getOperationExpr(),
insertOpArg.getOperationExpr())
|| (op.getNumberOfAdditionalNonFilteringFields() != insertOpArg
.getNumberOfAdditionalNonFilteringFields())) {
return Boolean.FALSE;
diff --git
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/SubstituteVariableVisitor.java
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/SubstituteVariableVisitor.java
index 6e8b425..5aa63ae 100644
---
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/SubstituteVariableVisitor.java
+++
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/SubstituteVariableVisitor.java
@@ -421,8 +421,8 @@ public class SubstituteVariableVisitor
Pair<LogicalVariable, LogicalVariable> pair) throws
AlgebricksException {
boolean producedVarFound = false;
if (op.getOperation() == InsertDeleteUpsertOperator.Kind.UPSERT) {
- if (op.getUpsertIndicatorVar() != null &&
op.getUpsertIndicatorVar().equals(pair.first)) {
- op.setUpsertIndicatorVar(pair.second);
+ if (op.getOperationVar() != null &&
op.getOperationVar().equals(pair.first)) {
+ op.setOperationVar(pair.second);
producedVarFound = true;
} else if (op.getBeforeOpRecordVar() != null &&
op.getBeforeOpRecordVar().equals(pair.first)) {
op.setPrevRecordVar(pair.second);
@@ -453,7 +453,7 @@ public class SubstituteVariableVisitor
substUsedVariablesInExpr(op.getSecondaryKeyExpressions(), pair.first,
pair.second);
substUsedVariablesInExpr(op.getFilterExpression(), pair.first,
pair.second);
substUsedVariablesInExpr(op.getAdditionalFilteringExpressions(),
pair.first, pair.second);
- substUsedVariablesInExpr(op.getUpsertIndicatorExpr(), pair.first,
pair.second);
+ substUsedVariablesInExpr(op.getOperationExpr(), pair.first,
pair.second);
substUsedVariablesInExpr(op.getPrevSecondaryKeyExprs(), pair.first,
pair.second);
substUsedVariablesInExpr(op.getPrevAdditionalFilteringExpression(),
pair.first, pair.second);
if (!op.getNestedPlans().isEmpty()) {
diff --git
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/UsedVariableVisitor.java
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/UsedVariableVisitor.java
index 23fe3b2..4fb30b4 100644
---
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/UsedVariableVisitor.java
+++
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/UsedVariableVisitor.java
@@ -429,8 +429,8 @@ public class UsedVariableVisitor implements
ILogicalOperatorVisitor<Void, Void>
e.getValue().getUsedVariables(usedVariables);
}
}
- if (op.getUpsertIndicatorExpr() != null) {
-
op.getUpsertIndicatorExpr().getValue().getUsedVariables(usedVariables);
+ if (op.getOperationExpr() != null) {
+ op.getOperationExpr().getValue().getUsedVariables(usedVariables);
}
visitNestedPlans(op);
return null;
diff --git
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/IndexInsertDeleteUpsertPOperator.java
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/IndexInsertDeleteUpsertPOperator.java
index 2d9dabe..26581b1 100644
---
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/IndexInsertDeleteUpsertPOperator.java
+++
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/IndexInsertDeleteUpsertPOperator.java
@@ -57,14 +57,14 @@ public class IndexInsertDeleteUpsertPOperator extends
AbstractPhysicalOperator {
private final ILogicalExpression filterExpr;
private final IDataSourceIndex<?, ?> dataSourceIndex;
private final List<LogicalVariable> additionalFilteringKeys;
- private final LogicalVariable upsertIndicatorVar;
+ private final LogicalVariable operationVar;
private final List<LogicalVariable> prevSecondaryKeys;
private final LogicalVariable prevAdditionalFilteringKey;
private final int numOfAdditionalNonFilteringFields;
public IndexInsertDeleteUpsertPOperator(List<LogicalVariable> primaryKeys,
List<LogicalVariable> secondaryKeys,
List<LogicalVariable> additionalFilteringKeys,
Mutable<ILogicalExpression> filterExpr,
- IDataSourceIndex<?, ?> dataSourceIndex, LogicalVariable
upsertIndicatorVar,
+ IDataSourceIndex<?, ?> dataSourceIndex, LogicalVariable
operationVar,
List<LogicalVariable> prevSecondaryKeys, LogicalVariable
prevAdditionalFilteringKey,
int numOfAdditionalNonFilteringFields) {
this.primaryKeys = primaryKeys;
@@ -76,7 +76,7 @@ public class IndexInsertDeleteUpsertPOperator extends
AbstractPhysicalOperator {
}
this.dataSourceIndex = dataSourceIndex;
this.additionalFilteringKeys = additionalFilteringKeys;
- this.upsertIndicatorVar = upsertIndicatorVar;
+ this.operationVar = operationVar;
this.prevSecondaryKeys = prevSecondaryKeys;
this.prevAdditionalFilteringKey = prevAdditionalFilteringKey;
this.numOfAdditionalNonFilteringFields =
numOfAdditionalNonFilteringFields;
@@ -157,7 +157,7 @@ public class IndexInsertDeleteUpsertPOperator extends
AbstractPhysicalOperator {
break;
case UPSERT:
runtimeAndConstraints =
mp.getIndexUpsertRuntime(dataSourceIndex, propagatedSchema, inputSchemas,
- typeEnv, primaryKeys, secondaryKeys,
additionalFilteringKeys, filterExpr, upsertIndicatorVar,
+ typeEnv, primaryKeys, secondaryKeys,
additionalFilteringKeys, filterExpr, operationVar,
prevSecondaryKeys, prevAdditionalFilteringKey,
inputDesc, context, spec, secondaryKeyPipelines);
break;
default:
diff --git
a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java
b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java
index b95971f..cd0d996 100644
---
a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java
+++
b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java
@@ -428,11 +428,11 @@ public class SetAlgebricksPhysicalOperatorsRule
implements IAlgebraicRewriteRule
return new IndexBulkloadPOperator(primaryKeys, secondaryKeys,
additionalFilteringKeys,
opInsDel.getFilterExpression(),
opInsDel.getDataSourceIndex());
} else {
- LogicalVariable upsertIndicatorVar = null;
+ LogicalVariable operationVar = null;
List<LogicalVariable> prevSecondaryKeys = null;
LogicalVariable prevAdditionalFilteringKey = null;
if (opInsDel.getOperation() == Kind.UPSERT) {
- upsertIndicatorVar =
getKey(opInsDel.getUpsertIndicatorExpr().getValue());
+ operationVar =
getKey(opInsDel.getOperationExpr().getValue());
prevSecondaryKeys = new ArrayList<>();
getKeys(opInsDel.getPrevSecondaryKeyExprs(),
prevSecondaryKeys);
if (opInsDel.getPrevAdditionalFilteringExpression() !=
null) {
@@ -442,9 +442,8 @@ public class SetAlgebricksPhysicalOperatorsRule implements
IAlgebraicRewriteRule
}
}
return new IndexInsertDeleteUpsertPOperator(primaryKeys,
secondaryKeys, additionalFilteringKeys,
- opInsDel.getFilterExpression(),
opInsDel.getDataSourceIndex(), upsertIndicatorVar,
- prevSecondaryKeys, prevAdditionalFilteringKey,
- opInsDel.getNumberOfAdditionalNonFilteringFields());
+ opInsDel.getFilterExpression(),
opInsDel.getDataSourceIndex(), operationVar, prevSecondaryKeys,
+ prevAdditionalFilteringKey,
opInsDel.getNumberOfAdditionalNonFilteringFields());
}
}