This is an automated email from the ASF dual-hosted git repository.

alsuliman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/asterixdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 46db92c16d [ASTERIXDB-3318][RUN] Create index builder factory per 
partition
46db92c16d is described below

commit 46db92c16dd1b315d939fcfae428703d9c06b1e6
Author: Ali Alsuliman <[email protected]>
AuthorDate: Wed Nov 22 09:36:11 2023 -0800

    [ASTERIXDB-3318][RUN] Create index builder factory per partition
    
    - user model changes: no
    - storage format changes: no
    - interface changes: no
    
    Details:
    LSMIndexIOOperationCallbackFactory factory is currently shared among
    index builder factories. When index builders run concurrently
    and without synchronization, this causes an issue because
    LSMIndexIOOperationCallbackFactory is stateful.
    
    Change-Id: I4c3b0b982b206ac3eea3653c68e3fecc31145cda
    Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/17976
    Reviewed-by: Ali Alsuliman <[email protected]>
    Reviewed-by: Murtadha Hubail <[email protected]>
    Tested-by: Murtadha Hubail <[email protected]>
---
 .../apache/asterix/metadata/utils/DatasetUtil.java | 35 +++++++++++++++++-----
 .../metadata/utils/SampleOperationsHelper.java     | 13 +++-----
 .../utils/SecondaryTreeIndexOperationsHelper.java  | 16 ++++------
 .../tests/am/btree/AbstractBTreeOperatorTest.java  |  8 +++--
 .../tests/am/rtree/AbstractRTreeOperatorTest.java  |  8 +++--
 .../dataflow/IndexCreateOperatorDescriptor.java    | 11 +++----
 6 files changed, 55 insertions(+), 36 deletions(-)

diff --git 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtil.java
 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtil.java
index 93802c2e47..43f40ebe56 100644
--- 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtil.java
+++ 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtil.java
@@ -339,7 +339,8 @@ public class DatasetUtil {
 
         JobSpecification spec = 
RuntimeUtils.createJobSpecification(metadataProvider.getApplicationContext());
         PartitioningProperties partitioningProperties = 
metadataProvider.getPartitioningProperties(dataset);
-        FileSplit[] fs = 
partitioningProperties.getSplitsProvider().getFileSplits();
+        IFileSplitProvider splitsProvider = 
partitioningProperties.getSplitsProvider();
+        FileSplit[] fs = splitsProvider.getFileSplits();
         StringBuilder sb = new StringBuilder();
         for (FileSplit f : fs) {
             sb.append(f).append(" ");
@@ -349,19 +350,37 @@ public class DatasetUtil {
                 DatasetUtil.getMergePolicyFactory(dataset, 
metadataProvider.getMetadataTxnContext());
         // prepare a LocalResourceMetadata which will be stored in NC's local 
resource
         // repository
-        IResourceFactory resourceFactory = 
dataset.getResourceFactory(metadataProvider, index, itemType, metaItemType,
-                compactionInfo.first, compactionInfo.second);
-        IndexBuilderFactory indexBuilderFactory =
-                new 
IndexBuilderFactory(metadataProvider.getStorageComponentProvider().getStorageManager(),
-                        partitioningProperties.getSplitsProvider(), 
resourceFactory, true);
-        IndexCreateOperatorDescriptor indexCreateOp = new 
IndexCreateOperatorDescriptor(spec, indexBuilderFactory,
-                partitioningProperties.getComputeStorageMap());
+        int[][] computeStorageMap = 
partitioningProperties.getComputeStorageMap();
+        IndexBuilderFactory[][] indexBuilderFactories = 
getIndexBuilderFactories(dataset, metadataProvider, index,
+                itemType, metaItemType, splitsProvider, compactionInfo.first, 
compactionInfo.second, computeStorageMap);
+        IndexCreateOperatorDescriptor indexCreateOp =
+                new IndexCreateOperatorDescriptor(spec, indexBuilderFactories, 
computeStorageMap);
         
AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, 
indexCreateOp,
                 partitioningProperties.getConstraints());
         spec.addRoot(indexCreateOp);
         return spec;
     }
 
+    public static IndexBuilderFactory[][] getIndexBuilderFactories(Dataset 
dataset, MetadataProvider metadataProvider,
+            Index index, ARecordType itemType, ARecordType metaItemType, 
IFileSplitProvider fileSplitProvider,
+            ILSMMergePolicyFactory mergePolicyFactory, Map<String, String> 
mergePolicyProperties,
+            int[][] computeStorageMap) throws AlgebricksException {
+        IndexBuilderFactory[][] indexBuilderFactories = new 
IndexBuilderFactory[computeStorageMap.length][];
+        for (int i = 0; i < computeStorageMap.length; i++) {
+            int len = computeStorageMap[i].length;
+            indexBuilderFactories[i] = new IndexBuilderFactory[len];
+            for (int k = 0; k < len; k++) {
+                IResourceFactory resourceFactory = 
dataset.getResourceFactory(metadataProvider, index, itemType,
+                        metaItemType, mergePolicyFactory, 
mergePolicyProperties);
+                IndexBuilderFactory indexBuilderFactory =
+                        new 
IndexBuilderFactory(metadataProvider.getStorageComponentProvider().getStorageManager(),
+                                fileSplitProvider, resourceFactory, true);
+                indexBuilderFactories[i][k] = indexBuilderFactory;
+            }
+        }
+        return indexBuilderFactories;
+    }
+
     public static JobSpecification compactDatasetJobSpec(Dataverse dataverse, 
String datasetName,
             MetadataProvider metadataProvider) throws AlgebricksException {
         DataverseName dataverseName = dataverse.getDataverseName();
diff --git 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SampleOperationsHelper.java
 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SampleOperationsHelper.java
index 3dd84ec573..e87466386d 100644
--- 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SampleOperationsHelper.java
+++ 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SampleOperationsHelper.java
@@ -27,7 +27,6 @@ import java.util.Set;
 
 import org.apache.asterix.common.cluster.PartitioningProperties;
 import org.apache.asterix.common.config.OptimizationConfUtil;
-import org.apache.asterix.common.context.IStorageComponentProvider;
 import org.apache.asterix.common.utils.StorageConstants;
 import org.apache.asterix.formats.base.IDataFormat;
 import org.apache.asterix.metadata.declared.MetadataProvider;
@@ -81,14 +80,12 @@ import 
org.apache.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor;
 import org.apache.hyracks.dataflow.std.file.IFileSplitProvider;
 import 
org.apache.hyracks.dataflow.std.group.AbstractAggregatorDescriptorFactory;
 import 
org.apache.hyracks.dataflow.std.group.sort.SortGroupByOperatorDescriptor;
-import org.apache.hyracks.storage.am.common.api.IIndexBuilderFactory;
 import org.apache.hyracks.storage.am.common.build.IndexBuilderFactory;
 import 
org.apache.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
 import 
org.apache.hyracks.storage.am.common.dataflow.IndexCreateOperatorDescriptor;
 import 
org.apache.hyracks.storage.am.common.dataflow.IndexDataflowHelperFactory;
 import 
org.apache.hyracks.storage.am.common.dataflow.IndexDropOperatorDescriptor;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMMergePolicyFactory;
-import org.apache.hyracks.storage.common.IResourceFactory;
 import org.apache.hyracks.storage.common.IStorageManager;
 import org.apache.hyracks.storage.common.projection.ITupleProjectorFactory;
 
@@ -158,13 +155,11 @@ public class SampleOperationsHelper implements 
ISecondaryIndexOperationsHelper {
     @Override
     public JobSpecification buildCreationJobSpec() throws AlgebricksException {
         JobSpecification spec = 
RuntimeUtils.createJobSpecification(metadataProvider.getApplicationContext());
-        IStorageComponentProvider storageComponentProvider = 
metadataProvider.getStorageComponentProvider();
-        IResourceFactory resourceFactory = 
dataset.getResourceFactory(metadataProvider, sampleIdx, itemType, metaType,
-                mergePolicyFactory, mergePolicyProperties);
-        IIndexBuilderFactory indexBuilderFactory = new 
IndexBuilderFactory(storageComponentProvider.getStorageManager(),
-                fileSplitProvider, resourceFactory, true);
+        IndexBuilderFactory[][] indexBuilderFactories =
+                DatasetUtil.getIndexBuilderFactories(dataset, 
metadataProvider, sampleIdx, itemType, metaType,
+                        fileSplitProvider, mergePolicyFactory, 
mergePolicyProperties, computeStorageMap);
         IndexCreateOperatorDescriptor indexCreateOp =
-                new IndexCreateOperatorDescriptor(spec, indexBuilderFactory, 
computeStorageMap);
+                new IndexCreateOperatorDescriptor(spec, indexBuilderFactories, 
computeStorageMap);
         indexCreateOp.setSourceLocation(sourceLoc);
         
AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, 
indexCreateOp, partitionConstraint);
         spec.addRoot(indexCreateOp);
diff --git 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryTreeIndexOperationsHelper.java
 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryTreeIndexOperationsHelper.java
index 0cda6259e0..62352bf879 100644
--- 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryTreeIndexOperationsHelper.java
+++ 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryTreeIndexOperationsHelper.java
@@ -24,7 +24,6 @@ import static 
org.apache.hyracks.storage.am.common.dataflow.IndexDropOperatorDes
 import java.util.Set;
 
 import org.apache.asterix.common.cluster.PartitioningProperties;
-import org.apache.asterix.common.context.IStorageComponentProvider;
 import org.apache.asterix.metadata.declared.MetadataProvider;
 import org.apache.asterix.metadata.entities.Dataset;
 import org.apache.asterix.metadata.entities.Index;
@@ -34,14 +33,12 @@ import 
org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import 
org.apache.hyracks.algebricks.core.jobgen.impl.ConnectorPolicyAssignmentPolicy;
 import org.apache.hyracks.api.exceptions.SourceLocation;
 import org.apache.hyracks.api.job.JobSpecification;
-import org.apache.hyracks.storage.am.common.api.IIndexBuilderFactory;
 import org.apache.hyracks.storage.am.common.build.IndexBuilderFactory;
 import 
org.apache.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
 import 
org.apache.hyracks.storage.am.common.dataflow.IndexCreateOperatorDescriptor;
 import 
org.apache.hyracks.storage.am.common.dataflow.IndexDataflowHelperFactory;
 import 
org.apache.hyracks.storage.am.common.dataflow.IndexDropOperatorDescriptor;
 import 
org.apache.hyracks.storage.am.lsm.common.dataflow.LSMTreeIndexCompactOperatorDescriptor;
-import org.apache.hyracks.storage.common.IResourceFactory;
 
 public abstract class SecondaryTreeIndexOperationsHelper extends 
SecondaryIndexOperationsHelper {
 
@@ -53,14 +50,13 @@ public abstract class SecondaryTreeIndexOperationsHelper 
extends SecondaryIndexO
     @Override
     public JobSpecification buildCreationJobSpec() throws AlgebricksException {
         JobSpecification spec = 
RuntimeUtils.createJobSpecification(metadataProvider.getApplicationContext());
-        IStorageComponentProvider storageComponentProvider = 
metadataProvider.getStorageComponentProvider();
-        IResourceFactory resourceFactory = 
dataset.getResourceFactory(metadataProvider, index, itemType, metaType,
-                mergePolicyFactory, mergePolicyProperties);
-        IIndexBuilderFactory indexBuilderFactory = new 
IndexBuilderFactory(storageComponentProvider.getStorageManager(),
-                secondaryFileSplitProvider, resourceFactory, true);
         PartitioningProperties partitioningProperties = 
metadataProvider.getPartitioningProperties(dataset);
-        IndexCreateOperatorDescriptor secondaryIndexCreateOp = new 
IndexCreateOperatorDescriptor(spec,
-                indexBuilderFactory, 
partitioningProperties.getComputeStorageMap());
+        int[][] computeStorageMap = 
partitioningProperties.getComputeStorageMap();
+        IndexBuilderFactory[][] indexBuilderFactories =
+                DatasetUtil.getIndexBuilderFactories(dataset, 
metadataProvider, index, itemType, metaType,
+                        secondaryFileSplitProvider, mergePolicyFactory, 
mergePolicyProperties, computeStorageMap);
+        IndexCreateOperatorDescriptor secondaryIndexCreateOp =
+                new IndexCreateOperatorDescriptor(spec, indexBuilderFactories, 
computeStorageMap);
         secondaryIndexCreateOp.setSourceLocation(sourceLoc);
         
AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, 
secondaryIndexCreateOp,
                 secondaryPartitionConstraint);
diff --git 
a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/am/btree/AbstractBTreeOperatorTest.java
 
b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/am/btree/AbstractBTreeOperatorTest.java
index d90a212097..4948a66b11 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/am/btree/AbstractBTreeOperatorTest.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/am/btree/AbstractBTreeOperatorTest.java
@@ -120,8 +120,10 @@ public abstract class AbstractBTreeOperatorTest extends 
AbstractIntegrationTest
         IResourceFactory primaryResourceFactory = 
createPrimaryResourceFactory();
         IIndexBuilderFactory indexBuilderFactory =
                 new IndexBuilderFactory(storageManager, primarySplitProvider, 
primaryResourceFactory, false);
+        IIndexBuilderFactory[][] indexBuilderFactories = new 
IIndexBuilderFactory[1][1];
+        indexBuilderFactories[0][0] = indexBuilderFactory;
         IndexCreateOperatorDescriptor primaryCreateOp =
-                new IndexCreateOperatorDescriptor(spec, indexBuilderFactory, 
TestUtil.getPartitionsMap(1));
+                new IndexCreateOperatorDescriptor(spec, indexBuilderFactories, 
TestUtil.getPartitionsMap(1));
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, 
primaryCreateOp, NC1_ID);
         spec.addRoot(primaryCreateOp);
         runTest(spec);
@@ -168,8 +170,10 @@ public abstract class AbstractBTreeOperatorTest extends 
AbstractIntegrationTest
         IResourceFactory secondaryResourceFactory = 
createSecondaryResourceFactory();
         IIndexBuilderFactory indexBuilderFactory =
                 new IndexBuilderFactory(storageManager, 
secondarySplitProvider, secondaryResourceFactory, false);
+        IIndexBuilderFactory[][] indexBuilderFactories = new 
IIndexBuilderFactory[1][1];
+        indexBuilderFactories[0][0] = indexBuilderFactory;
         IndexCreateOperatorDescriptor secondaryCreateOp =
-                new IndexCreateOperatorDescriptor(spec, indexBuilderFactory, 
TestUtil.getPartitionsMap(1));
+                new IndexCreateOperatorDescriptor(spec, indexBuilderFactories, 
TestUtil.getPartitionsMap(1));
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, 
secondaryCreateOp, NC1_ID);
         spec.addRoot(secondaryCreateOp);
         runTest(spec);
diff --git 
a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/am/rtree/AbstractRTreeOperatorTest.java
 
b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/am/rtree/AbstractRTreeOperatorTest.java
index 1634b2d345..a8f2ef1383 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/am/rtree/AbstractRTreeOperatorTest.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/am/rtree/AbstractRTreeOperatorTest.java
@@ -242,8 +242,10 @@ public abstract class AbstractRTreeOperatorTest extends 
AbstractIntegrationTest
                 pageManagerFactory, null, null);
         IIndexBuilderFactory indexBuilderFactory =
                 new IndexBuilderFactory(storageManager, primarySplitProvider, 
btreeFactory, false);
+        IIndexBuilderFactory[][] indexBuilderFactories = new 
IIndexBuilderFactory[1][1];
+        indexBuilderFactories[0][0] = indexBuilderFactory;
         IndexCreateOperatorDescriptor primaryCreateOp =
-                new IndexCreateOperatorDescriptor(spec, indexBuilderFactory, 
TestUtil.getPartitionsMap(1));
+                new IndexCreateOperatorDescriptor(spec, indexBuilderFactories, 
TestUtil.getPartitionsMap(1));
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, 
primaryCreateOp, NC1_ID);
         spec.addRoot(primaryCreateOp);
         runTest(spec);
@@ -305,8 +307,10 @@ public abstract class AbstractRTreeOperatorTest extends 
AbstractIntegrationTest
         JobSpecification spec = new JobSpecification();
         IndexBuilderFactory indexBuilderFactory =
                 new IndexBuilderFactory(storageManager, 
secondarySplitProvider, rtreeFactory, false);
+        IIndexBuilderFactory[][] indexBuilderFactories = new 
IIndexBuilderFactory[1][1];
+        indexBuilderFactories[0][0] = indexBuilderFactory;
         IndexCreateOperatorDescriptor secondaryCreateOp =
-                new IndexCreateOperatorDescriptor(spec, indexBuilderFactory, 
TestUtil.getPartitionsMap(1));
+                new IndexCreateOperatorDescriptor(spec, indexBuilderFactories, 
TestUtil.getPartitionsMap(1));
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, 
secondaryCreateOp, NC1_ID);
         spec.addRoot(secondaryCreateOp);
         runTest(spec);
diff --git 
a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexCreateOperatorDescriptor.java
 
b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexCreateOperatorDescriptor.java
index 61b600a9af..32636e8dba 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexCreateOperatorDescriptor.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexCreateOperatorDescriptor.java
@@ -31,13 +31,13 @@ import 
org.apache.hyracks.storage.am.common.api.IIndexBuilderFactory;
 public class IndexCreateOperatorDescriptor extends 
AbstractSingleActivityOperatorDescriptor {
 
     private static final long serialVersionUID = 2L;
-    private final IIndexBuilderFactory indexBuilderFactory;
+    private final IIndexBuilderFactory[][] indexBuilderFactories;
     private final int[][] partitionsMap;
 
-    public IndexCreateOperatorDescriptor(IOperatorDescriptorRegistry spec, 
IIndexBuilderFactory indexBuilderFactory,
-            int[][] partitionsMap) {
+    public IndexCreateOperatorDescriptor(IOperatorDescriptorRegistry spec,
+            IIndexBuilderFactory[][] indexBuilderFactories, int[][] 
partitionsMap) {
         super(spec, 0, 0);
-        this.indexBuilderFactory = indexBuilderFactory;
+        this.indexBuilderFactories = indexBuilderFactories;
         this.partitionsMap = partitionsMap;
     }
 
@@ -45,9 +45,10 @@ public class IndexCreateOperatorDescriptor extends 
AbstractSingleActivityOperato
     public IOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx,
             IRecordDescriptorProvider recordDescProvider, int partition, int 
nPartitions) throws HyracksDataException {
         int[] storagePartitions = partitionsMap[partition];
+        IIndexBuilderFactory[] partitionIndexBuilderFactories = 
indexBuilderFactories[partition];
         IIndexBuilder[] indexBuilders = new 
IIndexBuilder[storagePartitions.length];
         for (int i = 0; i < storagePartitions.length; i++) {
-            indexBuilders[i] = indexBuilderFactory.create(ctx, 
storagePartitions[i]);
+            indexBuilders[i] = partitionIndexBuilderFactories[i].create(ctx, 
storagePartitions[i]);
         }
         return new IndexCreateOperatorNodePushable(indexBuilders);
     }

Reply via email to