This is an automated email from the ASF dual-hosted git repository.
mhubail pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/asterixdb.git
The following commit(s) were added to refs/heads/master by this push:
new de51cc4799 [ASTERIXDB-3144][RT] Run correlated index bulkload at
storage parallelism
de51cc4799 is described below
commit de51cc479920e26e88f041d8c5d064764f788a42
Author: Murtadha Hubail <[email protected]>
AuthorDate: Fri May 5 21:22:32 2023 +0300
[ASTERIXDB-3144][RT] Run correlated index bulkload at storage parallelism
- user model changes: no
- storage format changes: no
- interface changes: no
Details:
- To ensure correctness, always run correlated secondary index
bulkload jobs using storage parallelism regardless of the number
of compute partitions. This will ensure that each storage partition
will produce the corresponding secondary index components.
Change-Id: I5f38e4b06bcd91479bae544619bf3a96dde3c500
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/17512
Integration-Tests: Jenkins <[email protected]>
Tested-by: Jenkins <[email protected]>
Reviewed-by: Ali Alsuliman <[email protected]>
---
.../apache/asterix/metadata/utils/DatasetUtil.java | 15 ++++++++++--
.../SecondaryCorrelatedBTreeOperationsHelper.java | 3 ++-
...aryCorrelatedInvertedIndexOperationsHelper.java | 3 ++-
.../SecondaryCorrelatedRTreeOperationsHelper.java | 3 ++-
.../utils/SecondaryIndexOperationsHelper.java | 27 ++++++++++++++++++----
5 files changed, 42 insertions(+), 9 deletions(-)
diff --git
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtil.java
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtil.java
index 130e39c357..e882129823 100644
---
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtil.java
+++
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtil.java
@@ -585,7 +585,20 @@ public class DatasetUtil {
MetadataProvider metadataProvider) throws AlgebricksException {
PartitioningProperties partitioningProperties =
metadataProvider.getPartitioningProperties(dataset);
AlgebricksPartitionConstraint primaryPartitionConstraint =
partitioningProperties.getConstraints();
+ IOperatorDescriptor dummyKeyProviderOp =
createDummyKeyProviderOp(spec);
+
AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec,
dummyKeyProviderOp,
+ primaryPartitionConstraint);
+ return dummyKeyProviderOp;
+ }
+ public static IOperatorDescriptor
createCorrelatedDummyKeyProviderOp(JobSpecification spec,
+ AlgebricksPartitionConstraint apc) throws AlgebricksException {
+ IOperatorDescriptor dummyKeyProviderOp =
createDummyKeyProviderOp(spec);
+
AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec,
dummyKeyProviderOp, apc);
+ return dummyKeyProviderOp;
+ }
+
+ private static IOperatorDescriptor
createDummyKeyProviderOp(JobSpecification spec) throws AlgebricksException {
// Build dummy tuple containing one field with a dummy value inside.
ArrayTupleBuilder tb = new ArrayTupleBuilder(1);
DataOutput dos = tb.getDataOutput();
@@ -602,8 +615,6 @@ public class DatasetUtil {
RecordDescriptor keyRecDesc = new RecordDescriptor(keyRecDescSers);
ConstantTupleSourceOperatorDescriptor keyProviderOp = new
ConstantTupleSourceOperatorDescriptor(spec,
keyRecDesc, tb.getFieldEndOffsets(), tb.getByteArray(),
tb.getSize());
-
AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec,
keyProviderOp,
- primaryPartitionConstraint);
return keyProviderOp;
}
diff --git
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryCorrelatedBTreeOperationsHelper.java
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryCorrelatedBTreeOperationsHelper.java
index c59613781b..2b948ef623 100644
---
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryCorrelatedBTreeOperationsHelper.java
+++
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryCorrelatedBTreeOperationsHelper.java
@@ -75,7 +75,8 @@ public class SecondaryCorrelatedBTreeOperationsHelper extends
SecondaryCorrelate
IndexUtil.bindJobEventListener(spec, metadataProvider);
// Create dummy key provider for feeding the primary index scan.
- IOperatorDescriptor keyProviderOp =
DatasetUtil.createDummyKeyProviderOp(spec, dataset, metadataProvider);
+ IOperatorDescriptor keyProviderOp =
+ DatasetUtil.createCorrelatedDummyKeyProviderOp(spec,
primaryPartitionConstraint);
// Create primary index scan op.
IOperatorDescriptor primaryScanOp =
createPrimaryIndexScanDiskComponentsOp(spec, metadataProvider,
diff --git
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryCorrelatedInvertedIndexOperationsHelper.java
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryCorrelatedInvertedIndexOperationsHelper.java
index 00cc5950b3..cd3f01c85d 100644
---
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryCorrelatedInvertedIndexOperationsHelper.java
+++
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryCorrelatedInvertedIndexOperationsHelper.java
@@ -216,7 +216,8 @@ public class
SecondaryCorrelatedInvertedIndexOperationsHelper extends SecondaryC
IndexUtil.bindJobEventListener(spec, metadataProvider);
// Create dummy key provider for feeding the primary index scan.
- IOperatorDescriptor keyProviderOp =
DatasetUtil.createDummyKeyProviderOp(spec, dataset, metadataProvider);
+ IOperatorDescriptor keyProviderOp =
+ DatasetUtil.createCorrelatedDummyKeyProviderOp(spec,
primaryPartitionConstraint);
// Create primary index scan op.
IOperatorDescriptor primaryScanOp =
createPrimaryIndexScanDiskComponentsOp(spec, metadataProvider,
diff --git
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryCorrelatedRTreeOperationsHelper.java
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryCorrelatedRTreeOperationsHelper.java
index e26aab3731..302ad74a01 100644
---
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryCorrelatedRTreeOperationsHelper.java
+++
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryCorrelatedRTreeOperationsHelper.java
@@ -188,7 +188,8 @@ public class SecondaryCorrelatedRTreeOperationsHelper
extends SecondaryCorrelate
assert dataset.getDatasetType() == DatasetType.INTERNAL;
// Create dummy key provider for feeding the primary index scan.
- IOperatorDescriptor keyProviderOp =
DatasetUtil.createDummyKeyProviderOp(spec, dataset, metadataProvider);
+ IOperatorDescriptor keyProviderOp =
+ DatasetUtil.createCorrelatedDummyKeyProviderOp(spec,
primaryPartitionConstraint);
IndexUtil.bindJobEventListener(spec, metadataProvider);
// Create primary index scan op.
diff --git
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryIndexOperationsHelper.java
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryIndexOperationsHelper.java
index 5d6c13c792..1a58423246 100644
---
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryIndexOperationsHelper.java
+++
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryIndexOperationsHelper.java
@@ -30,6 +30,7 @@ import
org.apache.asterix.common.config.DatasetConfig.DatasetType;
import org.apache.asterix.common.config.OptimizationConfUtil;
import org.apache.asterix.common.exceptions.CompilationException;
import org.apache.asterix.common.exceptions.ErrorCode;
+import org.apache.asterix.common.utils.StoragePathUtil;
import org.apache.asterix.formats.base.IDataFormat;
import org.apache.asterix.formats.nontagged.BinaryBooleanInspector;
import org.apache.asterix.formats.nontagged.BinaryComparatorFactoryProvider;
@@ -74,6 +75,7 @@ import
org.apache.hyracks.api.dataflow.value.ITuplePartitionerFactory;
import org.apache.hyracks.api.dataflow.value.ITypeTraits;
import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
import org.apache.hyracks.api.exceptions.SourceLocation;
+import org.apache.hyracks.api.io.FileSplit;
import org.apache.hyracks.api.job.JobSpecification;
import
org.apache.hyracks.dataflow.common.data.partition.FieldHashPartitionerFactory;
import org.apache.hyracks.dataflow.std.file.IFileSplitProvider;
@@ -211,8 +213,9 @@ public abstract class SecondaryIndexOperationsHelper
implements ISecondaryIndexO
payloadSerde =
SerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(itemType);
metaSerde =
metaType == null ? null :
SerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(metaType);
- PartitioningProperties partitioningProperties =
- metadataProvider.getPartitioningProperties(dataset,
index.getIndexName());
+ PartitioningProperties partitioningProperties;
+ partitioningProperties =
+
getSecondaryIndexBulkloadPartitioningProperties(metadataProvider, dataset,
index.getIndexName());
secondaryFileSplitProvider =
partitioningProperties.getSpiltsProvider();
secondaryPartitionConstraint = partitioningProperties.getConstraints();
numPrimaryKeys = dataset.getPrimaryKeys().size();
@@ -223,8 +226,8 @@ public abstract class SecondaryIndexOperationsHelper
implements ISecondaryIndexO
} else {
numFilterFields = 0;
}
-
- PartitioningProperties datasetPartitioningProperties =
metadataProvider.getPartitioningProperties(dataset);
+ PartitioningProperties datasetPartitioningProperties =
getSecondaryIndexBulkloadPartitioningProperties(
+ metadataProvider, dataset, dataset.getDatasetName());
primaryFileSplitProvider =
datasetPartitioningProperties.getSpiltsProvider();
primaryPartitionConstraint =
datasetPartitioningProperties.getConstraints();
setPrimaryRecDescAndComparators();
@@ -527,4 +530,20 @@ public abstract class SecondaryIndexOperationsHelper
implements ISecondaryIndexO
public AlgebricksPartitionConstraint getSecondaryPartitionConstraint() {
return secondaryPartitionConstraint;
}
+
+ private PartitioningProperties
getSecondaryIndexBulkloadPartitioningProperties(MetadataProvider mp, Dataset
dataset,
+ String indexName) throws AlgebricksException {
+ PartitioningProperties partitioningProperties =
mp.getPartitioningProperties(dataset, indexName);
+ // special case for bulkloading secondary indexes for datasets with
correldated merge policy
+ // to ensure correctness, we will run in as many locations as storage
partitions
+ // this will not be needed once ASTERIXDB-3176 is implemented
+ if (this instanceof SecondaryCorrelatedTreeIndexOperationsHelper) {
+ FileSplit[] fileSplits =
partitioningProperties.getSpiltsProvider().getFileSplits();
+ Pair<IFileSplitProvider, AlgebricksPartitionConstraint> sp =
+
StoragePathUtil.splitProviderAndPartitionConstraints(fileSplits);
+ return PartitioningProperties.of(sp.getFirst(), sp.getSecond(),
+
DataPartitioningProvider.getOneToOnePartitionsMap(fileSplits.length));
+ }
+ return partitioningProperties;
+ }
}