This is an automated email from the ASF dual-hosted git repository.

mblow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/asterixdb.git

commit 6308c3716b20e9cf91a9c641f92fe0004d67d150
Author: Ali Alsuliman <[email protected]>
AuthorDate: Tue Jan 16 17:38:11 2024 -0800

    [ASTERIXDB-3340][EXT] Use sorted cluster locations for external data sources
    
    - user model changes: no
    - storage format changes: no
    - interface changes: yes
    
    Details:
    When specifying the constraints (cluster locations) of
    an external data source, use a constraints that is sorted on the node
    names (sorted cluster locations) so that it matches the constraints used
    when creating an internal dataset. Otherwise, when an internal dataset
    is used with an external dataset (e.g. UNION ALL), the query will fail
    due to incorrect partition assignment.
    
    Change-Id: I5c7753d53c377d0c5e286b3bfea6b2358abf6d66
    Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/18099
    Tested-by: Jenkins <[email protected]>
    Integration-Tests: Ian Maxon <[email protected]>
    Reviewed-by: Ali Alsuliman <[email protected]>
    Reviewed-by: Michael Blow <[email protected]>
---
 .../app/function/TPCDSAllTablesDataGeneratorDatasource.java       | 2 +-
 .../app/function/TPCDSSingleTableDataGeneratorDatasource.java     | 2 +-
 .../org/apache/asterix/common/cluster/IClusterStateManager.java   | 5 +++++
 .../reader/abstracts/AbstractExternalInputStreamFactory.java      | 4 ++--
 .../src/main/java/org/apache/asterix/external/util/HDFSUtils.java | 2 +-
 .../org/apache/asterix/metadata/declared/FunctionDataSource.java  | 8 +++-----
 .../org/apache/asterix/runtime/utils/ClusterStateManager.java     | 7 +++++++
 7 files changed, 20 insertions(+), 10 deletions(-)

diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/TPCDSAllTablesDataGeneratorDatasource.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/TPCDSAllTablesDataGeneratorDatasource.java
index 82303fcf00..430ba36106 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/TPCDSAllTablesDataGeneratorDatasource.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/TPCDSAllTablesDataGeneratorDatasource.java
@@ -64,6 +64,6 @@ public class TPCDSAllTablesDataGeneratorDatasource extends 
FunctionDataSource {
 
     @Override
     protected AlgebricksAbsolutePartitionConstraint 
getLocations(IClusterStateManager csm) {
-        return new 
AlgebricksAbsolutePartitionConstraint(csm.getClusterLocations().getLocations());
+        return csm.getSortedClusterLocations();
     }
 }
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/TPCDSSingleTableDataGeneratorDatasource.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/TPCDSSingleTableDataGeneratorDatasource.java
index 458bff6f6b..67319e332a 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/TPCDSSingleTableDataGeneratorDatasource.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/TPCDSSingleTableDataGeneratorDatasource.java
@@ -68,6 +68,6 @@ public class TPCDSSingleTableDataGeneratorDatasource extends 
FunctionDataSource
 
     @Override
     protected AlgebricksAbsolutePartitionConstraint 
getLocations(IClusterStateManager csm) {
-        return new 
AlgebricksAbsolutePartitionConstraint(csm.getClusterLocations().getLocations());
+        return csm.getSortedClusterLocations();
     }
 }
diff --git 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/IClusterStateManager.java
 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/IClusterStateManager.java
index 76802d9a1b..fa623929dd 100644
--- 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/IClusterStateManager.java
+++ 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/IClusterStateManager.java
@@ -171,6 +171,11 @@ public interface IClusterStateManager {
      */
     AlgebricksAbsolutePartitionConstraint getClusterLocations();
 
+    /**
+     * @return the constraint representing all the partitions of the cluster 
sorted by node name
+     */
+    AlgebricksAbsolutePartitionConstraint getSortedClusterLocations();
+
     /**
      * @param excludePendingRemoval
      *            true, if the desired set shouldn't have pending removal nodes
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/abstracts/AbstractExternalInputStreamFactory.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/abstracts/AbstractExternalInputStreamFactory.java
index eac4835ff7..9760c55a68 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/abstracts/AbstractExternalInputStreamFactory.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/abstracts/AbstractExternalInputStreamFactory.java
@@ -70,8 +70,8 @@ public abstract class AbstractExternalInputStreamFactory 
implements IInputStream
     public void configure(IServiceContext ctx, Map<String, String> 
configuration, IWarningCollector warningCollector)
             throws AlgebricksException {
         this.configuration = configuration;
-        this.partitionConstraint =
-                ((ICcApplicationContext) 
ctx.getApplicationContext()).getClusterStateManager().getClusterLocations();
+        this.partitionConstraint = ((ICcApplicationContext) 
ctx.getApplicationContext()).getClusterStateManager()
+                .getSortedClusterLocations();
     }
 
     public static class PartitionWorkLoadBasedOnSize implements Serializable {
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/HDFSUtils.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/HDFSUtils.java
index 3506216e10..f4edafcb5f 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/HDFSUtils.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/HDFSUtils.java
@@ -265,7 +265,7 @@ public class HDFSUtils {
     public static AlgebricksAbsolutePartitionConstraint 
getPartitionConstraints(IApplicationContext appCtx,
             AlgebricksAbsolutePartitionConstraint clusterLocations) {
         if (clusterLocations == null) {
-            return ((ICcApplicationContext) 
appCtx).getClusterStateManager().getClusterLocations();
+            return ((ICcApplicationContext) 
appCtx).getClusterStateManager().getSortedClusterLocations();
         }
         return clusterLocations;
 
diff --git 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/FunctionDataSource.java
 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/FunctionDataSource.java
index 6b9b03bcaa..3b6bd9d55a 100644
--- 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/FunctionDataSource.java
+++ 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/FunctionDataSource.java
@@ -20,9 +20,7 @@ package org.apache.asterix.metadata.declared;
 
 import java.util.Arrays;
 import java.util.Collections;
-import java.util.HashSet;
 import java.util.List;
-import java.util.Set;
 
 import org.apache.asterix.common.cluster.IClusterStateManager;
 import org.apache.asterix.common.functions.FunctionSignature;
@@ -116,9 +114,9 @@ public abstract class FunctionDataSource extends DataSource 
{
             AlgebricksAbsolutePartitionConstraint locations);
 
     protected AlgebricksAbsolutePartitionConstraint 
getLocations(IClusterStateManager csm) {
-        String[] allPartitions = csm.getClusterLocations().getLocations();
-        Set<String> ncs = new HashSet<>(Arrays.asList(allPartitions));
-        return new AlgebricksAbsolutePartitionConstraint(ncs.toArray(new 
String[ncs.size()]));
+        String[] sortedLocations = 
csm.getSortedClusterLocations().getLocations();
+        return new AlgebricksAbsolutePartitionConstraint(
+                
Arrays.stream(sortedLocations).distinct().toArray(String[]::new));
     }
 
     protected IDataParserFactory createDataParserFactory() {
diff --git 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java
 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java
index d3c87ff341..1ff62a9807 100644
--- 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java
+++ 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java
@@ -289,6 +289,13 @@ public class ClusterStateManager implements 
IClusterStateManager {
         return clusterPartitionConstraint;
     }
 
+    @Override
+    public synchronized AlgebricksAbsolutePartitionConstraint 
getSortedClusterLocations() {
+        String[] clone = getClusterLocations().getLocations().clone();
+        Arrays.sort(clone);
+        return new AlgebricksAbsolutePartitionConstraint(clone);
+    }
+
     private synchronized void resetClusterPartitionConstraint() {
         ArrayList<String> clusterActiveLocations = new ArrayList<>();
         for (ClusterPartition p : clusterPartitions.values()) {

Reply via email to