This is an automated email from the ASF dual-hosted git repository. mblow pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/asterixdb.git
commit 6308c3716b20e9cf91a9c641f92fe0004d67d150 Author: Ali Alsuliman <[email protected]> AuthorDate: Tue Jan 16 17:38:11 2024 -0800 [ASTERIXDB-3340][EXT] Use sorted cluster locations for external data sources - user model changes: no - storage format changes: no - interface changes: yes Details: When specifying the constraints (cluster locations) of an external data source, use a constraints that is sorted on the node names (sorted cluster locations) so that it matches the constraints used when creating an internal dataset. Otherwise, when an internal dataset is used with an external dataset (e.g. UNION ALL), the query will fail due to incorrect partition assignment. Change-Id: I5c7753d53c377d0c5e286b3bfea6b2358abf6d66 Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/18099 Tested-by: Jenkins <[email protected]> Integration-Tests: Ian Maxon <[email protected]> Reviewed-by: Ali Alsuliman <[email protected]> Reviewed-by: Michael Blow <[email protected]> --- .../app/function/TPCDSAllTablesDataGeneratorDatasource.java | 2 +- .../app/function/TPCDSSingleTableDataGeneratorDatasource.java | 2 +- .../org/apache/asterix/common/cluster/IClusterStateManager.java | 5 +++++ .../reader/abstracts/AbstractExternalInputStreamFactory.java | 4 ++-- .../src/main/java/org/apache/asterix/external/util/HDFSUtils.java | 2 +- .../org/apache/asterix/metadata/declared/FunctionDataSource.java | 8 +++----- .../org/apache/asterix/runtime/utils/ClusterStateManager.java | 7 +++++++ 7 files changed, 20 insertions(+), 10 deletions(-) diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/TPCDSAllTablesDataGeneratorDatasource.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/TPCDSAllTablesDataGeneratorDatasource.java index 82303fcf00..430ba36106 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/TPCDSAllTablesDataGeneratorDatasource.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/TPCDSAllTablesDataGeneratorDatasource.java @@ -64,6 +64,6 @@ public class TPCDSAllTablesDataGeneratorDatasource extends FunctionDataSource { @Override protected AlgebricksAbsolutePartitionConstraint getLocations(IClusterStateManager csm) { - return new AlgebricksAbsolutePartitionConstraint(csm.getClusterLocations().getLocations()); + return csm.getSortedClusterLocations(); } } diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/TPCDSSingleTableDataGeneratorDatasource.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/TPCDSSingleTableDataGeneratorDatasource.java index 458bff6f6b..67319e332a 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/TPCDSSingleTableDataGeneratorDatasource.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/TPCDSSingleTableDataGeneratorDatasource.java @@ -68,6 +68,6 @@ public class TPCDSSingleTableDataGeneratorDatasource extends FunctionDataSource @Override protected AlgebricksAbsolutePartitionConstraint getLocations(IClusterStateManager csm) { - return new AlgebricksAbsolutePartitionConstraint(csm.getClusterLocations().getLocations()); + return csm.getSortedClusterLocations(); } } diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/IClusterStateManager.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/IClusterStateManager.java index 76802d9a1b..fa623929dd 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/IClusterStateManager.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/IClusterStateManager.java @@ -171,6 +171,11 @@ public interface IClusterStateManager { */ AlgebricksAbsolutePartitionConstraint getClusterLocations(); + /** + * @return the constraint representing all the partitions of the cluster sorted by node name + */ + AlgebricksAbsolutePartitionConstraint getSortedClusterLocations(); + /** * @param excludePendingRemoval * true, if the desired set shouldn't have pending removal nodes diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/abstracts/AbstractExternalInputStreamFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/abstracts/AbstractExternalInputStreamFactory.java index eac4835ff7..9760c55a68 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/abstracts/AbstractExternalInputStreamFactory.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/abstracts/AbstractExternalInputStreamFactory.java @@ -70,8 +70,8 @@ public abstract class AbstractExternalInputStreamFactory implements IInputStream public void configure(IServiceContext ctx, Map<String, String> configuration, IWarningCollector warningCollector) throws AlgebricksException { this.configuration = configuration; - this.partitionConstraint = - ((ICcApplicationContext) ctx.getApplicationContext()).getClusterStateManager().getClusterLocations(); + this.partitionConstraint = ((ICcApplicationContext) ctx.getApplicationContext()).getClusterStateManager() + .getSortedClusterLocations(); } public static class PartitionWorkLoadBasedOnSize implements Serializable { diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/HDFSUtils.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/HDFSUtils.java index 3506216e10..f4edafcb5f 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/HDFSUtils.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/HDFSUtils.java @@ -265,7 +265,7 @@ public class HDFSUtils { public static AlgebricksAbsolutePartitionConstraint getPartitionConstraints(IApplicationContext appCtx, AlgebricksAbsolutePartitionConstraint clusterLocations) { if (clusterLocations == null) { - return ((ICcApplicationContext) appCtx).getClusterStateManager().getClusterLocations(); + return ((ICcApplicationContext) appCtx).getClusterStateManager().getSortedClusterLocations(); } return clusterLocations; diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/FunctionDataSource.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/FunctionDataSource.java index 6b9b03bcaa..3b6bd9d55a 100644 --- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/FunctionDataSource.java +++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/FunctionDataSource.java @@ -20,9 +20,7 @@ package org.apache.asterix.metadata.declared; import java.util.Arrays; import java.util.Collections; -import java.util.HashSet; import java.util.List; -import java.util.Set; import org.apache.asterix.common.cluster.IClusterStateManager; import org.apache.asterix.common.functions.FunctionSignature; @@ -116,9 +114,9 @@ public abstract class FunctionDataSource extends DataSource { AlgebricksAbsolutePartitionConstraint locations); protected AlgebricksAbsolutePartitionConstraint getLocations(IClusterStateManager csm) { - String[] allPartitions = csm.getClusterLocations().getLocations(); - Set<String> ncs = new HashSet<>(Arrays.asList(allPartitions)); - return new AlgebricksAbsolutePartitionConstraint(ncs.toArray(new String[ncs.size()])); + String[] sortedLocations = csm.getSortedClusterLocations().getLocations(); + return new AlgebricksAbsolutePartitionConstraint( + Arrays.stream(sortedLocations).distinct().toArray(String[]::new)); } protected IDataParserFactory createDataParserFactory() { diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java index d3c87ff341..1ff62a9807 100644 --- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java +++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java @@ -289,6 +289,13 @@ public class ClusterStateManager implements IClusterStateManager { return clusterPartitionConstraint; } + @Override + public synchronized AlgebricksAbsolutePartitionConstraint getSortedClusterLocations() { + String[] clone = getClusterLocations().getLocations().clone(); + Arrays.sort(clone); + return new AlgebricksAbsolutePartitionConstraint(clone); + } + private synchronized void resetClusterPartitionConstraint() { ArrayList<String> clusterActiveLocations = new ArrayList<>(); for (ClusterPartition p : clusterPartitions.values()) {
