This is an automated email from the ASF dual-hosted git repository.

alsuliman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/asterixdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 30d1fa121c [ASTERIXDB-3340][EXT] Use proper cluster locations based on 
partitioning scheme
30d1fa121c is described below

commit 30d1fa121c926434504d7c3691338cd28bf5aa0e
Author: Ali Alsuliman <[email protected]>
AuthorDate: Fri Jan 26 12:43:59 2024 -0800

    [ASTERIXDB-3340][EXT] Use proper cluster locations based on partitioning 
scheme
    
    - user model changes: no
    - storage format changes: no
    - interface changes: yes
    
    Details:
    When specifying the constraints (cluster locations) of
    an external data source:
    
    Non-cloud deployment (Dynamic partitioning):
    use a constraints that is sorted on the node
    names (sorted cluster locations) so that it matches the constraints used
    when creating an internal dataset.
    
    Cloud-deployment (Static partitioning):
    use a constraints that is based on the partitioning map.
    
    Change-Id: Ia830326f86712d5a0868979ed54afda00df53b78
    Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/18143
    Tested-by: Jenkins <[email protected]>
    Reviewed-by: Ali Alsuliman <[email protected]>
    Reviewed-by: Wail Alkowaileet <[email protected]>
---
 .../org/apache/asterix/app/function/DumpIndexDatasource.java  |  2 +-
 .../org/apache/asterix/app/function/QueryIndexDatasource.java |  2 +-
 .../app/function/TPCDSAllTablesDataGeneratorDatasource.java   |  4 ++--
 .../app/function/TPCDSSingleTableDataGeneratorDatasource.java |  4 ++--
 .../apache/asterix/common/cluster/IClusterStateManager.java   |  2 +-
 .../asterix/common/dataflow/IDataPartitioningProvider.java    |  5 +++++
 .../reader/abstracts/AbstractExternalInputStreamFactory.java  |  4 ++--
 .../main/java/org/apache/asterix/external/util/HDFSUtils.java |  2 +-
 .../apache/asterix/metadata/declared/FunctionDataSource.java  |  6 +++---
 .../apache/asterix/metadata/declared/MetadataProvider.java    |  5 +++++
 .../metadata/utils/DynamicDataPartitioningProvider.java       |  6 ++++++
 .../metadata/utils/StaticDataPartitioningProvider.java        | 11 +++++++++--
 .../org/apache/asterix/runtime/utils/ClusterStateManager.java |  2 +-
 13 files changed, 39 insertions(+), 16 deletions(-)

diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/DumpIndexDatasource.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/DumpIndexDatasource.java
index bd72a66d66..1eac011a6a 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/DumpIndexDatasource.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/DumpIndexDatasource.java
@@ -54,7 +54,7 @@ public class DumpIndexDatasource extends FunctionDataSource {
     }
 
     @Override
-    protected AlgebricksAbsolutePartitionConstraint 
getLocations(IClusterStateManager csm) {
+    protected AlgebricksAbsolutePartitionConstraint 
getLocations(IClusterStateManager csm, MetadataProvider md) {
         return constraint;
     }
 
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/QueryIndexDatasource.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/QueryIndexDatasource.java
index b3889fd8ed..52054d6d1d 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/QueryIndexDatasource.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/QueryIndexDatasource.java
@@ -80,7 +80,7 @@ public class QueryIndexDatasource extends FunctionDataSource {
     }
 
     @Override
-    protected AlgebricksAbsolutePartitionConstraint 
getLocations(IClusterStateManager csm) {
+    protected AlgebricksAbsolutePartitionConstraint 
getLocations(IClusterStateManager csm, MetadataProvider md) {
         return storageLocations;
     }
 
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/TPCDSAllTablesDataGeneratorDatasource.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/TPCDSAllTablesDataGeneratorDatasource.java
index 430ba36106..b4d1b03ab2 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/TPCDSAllTablesDataGeneratorDatasource.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/TPCDSAllTablesDataGeneratorDatasource.java
@@ -63,7 +63,7 @@ public class TPCDSAllTablesDataGeneratorDatasource extends 
FunctionDataSource {
     }
 
     @Override
-    protected AlgebricksAbsolutePartitionConstraint 
getLocations(IClusterStateManager csm) {
-        return csm.getSortedClusterLocations();
+    protected AlgebricksAbsolutePartitionConstraint 
getLocations(IClusterStateManager csm, MetadataProvider md) {
+        return md.getDataPartitioningProvider().getClusterLocations();
     }
 }
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/TPCDSSingleTableDataGeneratorDatasource.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/TPCDSSingleTableDataGeneratorDatasource.java
index 67319e332a..3bbaee2079 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/TPCDSSingleTableDataGeneratorDatasource.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/TPCDSSingleTableDataGeneratorDatasource.java
@@ -67,7 +67,7 @@ public class TPCDSSingleTableDataGeneratorDatasource extends 
FunctionDataSource
     }
 
     @Override
-    protected AlgebricksAbsolutePartitionConstraint 
getLocations(IClusterStateManager csm) {
-        return csm.getSortedClusterLocations();
+    protected AlgebricksAbsolutePartitionConstraint 
getLocations(IClusterStateManager csm, MetadataProvider md) {
+        return md.getDataPartitioningProvider().getClusterLocations();
     }
 }
diff --git 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/IClusterStateManager.java
 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/IClusterStateManager.java
index ffa6c872f1..a5d503b8c3 100644
--- 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/IClusterStateManager.java
+++ 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/IClusterStateManager.java
@@ -173,7 +173,7 @@ public interface IClusterStateManager {
     /**
      * @return the constraint representing all the partitions of the cluster 
sorted by node name
      */
-    AlgebricksAbsolutePartitionConstraint getSortedClusterLocations();
+    AlgebricksAbsolutePartitionConstraint getNodeSortedClusterLocations();
 
     /**
      * @param excludePendingRemoval true, if the desired set shouldn't have 
pending removal nodes
diff --git 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/IDataPartitioningProvider.java
 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/IDataPartitioningProvider.java
index e59d4e730e..828715d68b 100644
--- 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/IDataPartitioningProvider.java
+++ 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/IDataPartitioningProvider.java
@@ -18,5 +18,10 @@
  */
 package org.apache.asterix.common.dataflow;
 
+import 
org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
+
 public interface IDataPartitioningProvider {
+
+    AlgebricksAbsolutePartitionConstraint getClusterLocations();
+
 }
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/abstracts/AbstractExternalInputStreamFactory.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/abstracts/AbstractExternalInputStreamFactory.java
index ac8859bd6e..06662e57de 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/abstracts/AbstractExternalInputStreamFactory.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/abstracts/AbstractExternalInputStreamFactory.java
@@ -70,8 +70,8 @@ public abstract class AbstractExternalInputStreamFactory 
implements IInputStream
     public void configure(IServiceContext ctx, Map<String, String> 
configuration, IWarningCollector warningCollector,
             IExternalFilterEvaluatorFactory filterEvaluatorFactory) throws 
AlgebricksException, HyracksDataException {
         this.configuration = configuration;
-        this.partitionConstraint = ((ICcApplicationContext) 
ctx.getApplicationContext()).getClusterStateManager()
-                .getSortedClusterLocations();
+        this.partitionConstraint = ((ICcApplicationContext) 
ctx.getApplicationContext()).getDataPartitioningProvider()
+                .getClusterLocations();
         this.filterEvaluatorFactory = filterEvaluatorFactory;
     }
 
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/HDFSUtils.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/HDFSUtils.java
index e92448cdde..f7638b4696 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/HDFSUtils.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/HDFSUtils.java
@@ -252,7 +252,7 @@ public class HDFSUtils {
     public static AlgebricksAbsolutePartitionConstraint 
getPartitionConstraints(IApplicationContext appCtx,
             AlgebricksAbsolutePartitionConstraint clusterLocations) {
         if (clusterLocations == null) {
-            return ((ICcApplicationContext) 
appCtx).getClusterStateManager().getSortedClusterLocations();
+            return ((ICcApplicationContext) 
appCtx).getDataPartitioningProvider().getClusterLocations();
         }
         return clusterLocations;
 
diff --git 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/FunctionDataSource.java
 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/FunctionDataSource.java
index f6aa347d2f..91f7615cdc 100644
--- 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/FunctionDataSource.java
+++ 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/FunctionDataSource.java
@@ -110,7 +110,7 @@ public abstract class FunctionDataSource extends DataSource 
{
         adapterFactory.setOutputType(RecordUtil.FULLY_OPEN_RECORD_TYPE);
         IClusterStateManager csm = 
metadataProvider.getApplicationContext().getClusterStateManager();
         FunctionDataSourceFactory factory =
-                new FunctionDataSourceFactory(createFunction(metadataProvider, 
getLocations(csm)));
+                new FunctionDataSourceFactory(createFunction(metadataProvider, 
getLocations(csm, metadataProvider)));
         IDataParserFactory dataParserFactory = createDataParserFactory();
         dataParserFactory.setRecordType(RecordUtil.FULLY_OPEN_RECORD_TYPE);
         dataParserFactory.configure(Collections.emptyMap());
@@ -126,8 +126,8 @@ public abstract class FunctionDataSource extends DataSource 
{
     protected abstract IDatasourceFunction createFunction(MetadataProvider 
metadataProvider,
             AlgebricksAbsolutePartitionConstraint locations);
 
-    protected AlgebricksAbsolutePartitionConstraint 
getLocations(IClusterStateManager csm) {
-        String[] sortedLocations = 
csm.getSortedClusterLocations().getLocations();
+    protected AlgebricksAbsolutePartitionConstraint 
getLocations(IClusterStateManager csm, MetadataProvider md) {
+        String[] sortedLocations = 
md.getDataPartitioningProvider().getClusterLocations().getLocations();
         return new AlgebricksAbsolutePartitionConstraint(
                 
Arrays.stream(sortedLocations).distinct().toArray(String[]::new));
     }
diff --git 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
index 107cfb2cc5..8d8ca80810 100644
--- 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
+++ 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
@@ -1010,9 +1010,14 @@ public class MetadataProvider implements 
IMetadataProvider<DataSourceId, String>
     }
 
     public AlgebricksAbsolutePartitionConstraint getClusterLocations() {
+        //TODO(partitioning): should this be removed and 
getSortedClusterLocations() is used instead?
         return appCtx.getClusterStateManager().getClusterLocations();
     }
 
+    public DataPartitioningProvider getDataPartitioningProvider() {
+        return dataPartitioningProvider;
+    }
+
     public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> 
buildExternalDataLookupRuntime(
             JobSpecification jobSpec, Dataset dataset, int[] ridIndexes, 
boolean retainInput,
             IVariableTypeEnvironment typeEnv, IOperatorSchema opSchema, 
JobGenContext context,
diff --git 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DynamicDataPartitioningProvider.java
 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DynamicDataPartitioningProvider.java
index 57392db4f0..1a1c8ac829 100644
--- 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DynamicDataPartitioningProvider.java
+++ 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DynamicDataPartitioningProvider.java
@@ -24,6 +24,7 @@ import org.apache.asterix.common.metadata.DataverseName;
 import org.apache.asterix.common.utils.StoragePathUtil;
 import org.apache.asterix.metadata.MetadataTransactionContext;
 import org.apache.asterix.metadata.entities.Dataset;
+import 
org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
 import 
org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.algebricks.common.utils.Pair;
@@ -64,4 +65,9 @@ public class DynamicDataPartitioningProvider extends 
DataPartitioningProvider {
         int[][] partitionsMap = 
getOneToOnePartitionsMap(getLocationsCount(splitsAndConstraints.second));
         return PartitioningProperties.of(splitsAndConstraints.first, 
splitsAndConstraints.second, partitionsMap);
     }
+
+    @Override
+    public AlgebricksAbsolutePartitionConstraint getClusterLocations() {
+        return clusterStateManager.getNodeSortedClusterLocations();
+    }
 }
diff --git 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/StaticDataPartitioningProvider.java
 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/StaticDataPartitioningProvider.java
index 7206a5084a..44141cb1cd 100644
--- 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/StaticDataPartitioningProvider.java
+++ 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/StaticDataPartitioningProvider.java
@@ -30,6 +30,7 @@ import 
org.apache.asterix.common.cluster.SplitComputeLocations;
 import org.apache.asterix.common.cluster.StorageComputePartitionsMap;
 import org.apache.asterix.common.dataflow.ICcApplicationContext;
 import org.apache.asterix.common.metadata.DataverseName;
+import org.apache.asterix.common.metadata.MetadataConstants;
 import org.apache.asterix.common.metadata.MetadataIndexImmutableProperties;
 import org.apache.asterix.common.utils.StorageConstants;
 import org.apache.asterix.common.utils.StoragePathUtil;
@@ -50,10 +51,10 @@ public class StaticDataPartitioningProvider extends 
DataPartitioningProvider {
 
     @Override
     public PartitioningProperties getPartitioningProperties(String 
databaseName) {
-        SplitComputeLocations dataverseSplits = getSplits(databaseName);
+        SplitComputeLocations databaseSplits = getSplits(databaseName);
         StorageComputePartitionsMap partitionMap = 
clusterStateManager.getStorageComputeMap();
         int[][] partitionsMap = partitionMap.getComputeToStorageMap(false);
-        return PartitioningProperties.of(dataverseSplits.getSplitsProvider(), 
dataverseSplits.getConstraints(),
+        return PartitioningProperties.of(databaseSplits.getSplitsProvider(), 
databaseSplits.getConstraints(),
                 partitionsMap);
     }
 
@@ -127,4 +128,10 @@ public class StaticDataPartitioningProvider extends 
DataPartitioningProvider {
                 new 
AlgebricksAbsolutePartitionConstraint(locations.toArray(new String[0]));
         return new SplitComputeLocations(splitProvider, constraints);
     }
+
+    @Override
+    public AlgebricksAbsolutePartitionConstraint getClusterLocations() {
+        SplitComputeLocations locations = 
getSplits(MetadataConstants.DEFAULT_DATABASE);
+        return (AlgebricksAbsolutePartitionConstraint) 
locations.getConstraints();
+    }
 }
diff --git 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java
 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java
index 0a10204200..444d91bd32 100644
--- 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java
+++ 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java
@@ -302,7 +302,7 @@ public class ClusterStateManager implements 
IClusterStateManager {
     }
 
     @Override
-    public synchronized AlgebricksAbsolutePartitionConstraint 
getSortedClusterLocations() {
+    public synchronized AlgebricksAbsolutePartitionConstraint 
getNodeSortedClusterLocations() {
         String[] clone = getClusterLocations().getLocations().clone();
         Arrays.sort(clone);
         return new AlgebricksAbsolutePartitionConstraint(clone);

Reply via email to