This is an automated email from the ASF dual-hosted git repository.
alsuliman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/asterixdb.git
The following commit(s) were added to refs/heads/master by this push:
new 30d1fa121c [ASTERIXDB-3340][EXT] Use proper cluster locations based on
partitioning scheme
30d1fa121c is described below
commit 30d1fa121c926434504d7c3691338cd28bf5aa0e
Author: Ali Alsuliman <[email protected]>
AuthorDate: Fri Jan 26 12:43:59 2024 -0800
[ASTERIXDB-3340][EXT] Use proper cluster locations based on partitioning
scheme
- user model changes: no
- storage format changes: no
- interface changes: yes
Details:
When specifying the constraints (cluster locations) of
an external data source:
Non-cloud deployment (Dynamic partitioning):
use a constraints that is sorted on the node
names (sorted cluster locations) so that it matches the constraints used
when creating an internal dataset.
Cloud-deployment (Static partitioning):
use a constraints that is based on the partitioning map.
Change-Id: Ia830326f86712d5a0868979ed54afda00df53b78
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/18143
Tested-by: Jenkins <[email protected]>
Reviewed-by: Ali Alsuliman <[email protected]>
Reviewed-by: Wail Alkowaileet <[email protected]>
---
.../org/apache/asterix/app/function/DumpIndexDatasource.java | 2 +-
.../org/apache/asterix/app/function/QueryIndexDatasource.java | 2 +-
.../app/function/TPCDSAllTablesDataGeneratorDatasource.java | 4 ++--
.../app/function/TPCDSSingleTableDataGeneratorDatasource.java | 4 ++--
.../apache/asterix/common/cluster/IClusterStateManager.java | 2 +-
.../asterix/common/dataflow/IDataPartitioningProvider.java | 5 +++++
.../reader/abstracts/AbstractExternalInputStreamFactory.java | 4 ++--
.../main/java/org/apache/asterix/external/util/HDFSUtils.java | 2 +-
.../apache/asterix/metadata/declared/FunctionDataSource.java | 6 +++---
.../apache/asterix/metadata/declared/MetadataProvider.java | 5 +++++
.../metadata/utils/DynamicDataPartitioningProvider.java | 6 ++++++
.../metadata/utils/StaticDataPartitioningProvider.java | 11 +++++++++--
.../org/apache/asterix/runtime/utils/ClusterStateManager.java | 2 +-
13 files changed, 39 insertions(+), 16 deletions(-)
diff --git
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/DumpIndexDatasource.java
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/DumpIndexDatasource.java
index bd72a66d66..1eac011a6a 100644
---
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/DumpIndexDatasource.java
+++
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/DumpIndexDatasource.java
@@ -54,7 +54,7 @@ public class DumpIndexDatasource extends FunctionDataSource {
}
@Override
- protected AlgebricksAbsolutePartitionConstraint
getLocations(IClusterStateManager csm) {
+ protected AlgebricksAbsolutePartitionConstraint
getLocations(IClusterStateManager csm, MetadataProvider md) {
return constraint;
}
diff --git
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/QueryIndexDatasource.java
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/QueryIndexDatasource.java
index b3889fd8ed..52054d6d1d 100644
---
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/QueryIndexDatasource.java
+++
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/QueryIndexDatasource.java
@@ -80,7 +80,7 @@ public class QueryIndexDatasource extends FunctionDataSource {
}
@Override
- protected AlgebricksAbsolutePartitionConstraint
getLocations(IClusterStateManager csm) {
+ protected AlgebricksAbsolutePartitionConstraint
getLocations(IClusterStateManager csm, MetadataProvider md) {
return storageLocations;
}
diff --git
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/TPCDSAllTablesDataGeneratorDatasource.java
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/TPCDSAllTablesDataGeneratorDatasource.java
index 430ba36106..b4d1b03ab2 100644
---
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/TPCDSAllTablesDataGeneratorDatasource.java
+++
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/TPCDSAllTablesDataGeneratorDatasource.java
@@ -63,7 +63,7 @@ public class TPCDSAllTablesDataGeneratorDatasource extends
FunctionDataSource {
}
@Override
- protected AlgebricksAbsolutePartitionConstraint
getLocations(IClusterStateManager csm) {
- return csm.getSortedClusterLocations();
+ protected AlgebricksAbsolutePartitionConstraint
getLocations(IClusterStateManager csm, MetadataProvider md) {
+ return md.getDataPartitioningProvider().getClusterLocations();
}
}
diff --git
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/TPCDSSingleTableDataGeneratorDatasource.java
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/TPCDSSingleTableDataGeneratorDatasource.java
index 67319e332a..3bbaee2079 100644
---
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/TPCDSSingleTableDataGeneratorDatasource.java
+++
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/TPCDSSingleTableDataGeneratorDatasource.java
@@ -67,7 +67,7 @@ public class TPCDSSingleTableDataGeneratorDatasource extends
FunctionDataSource
}
@Override
- protected AlgebricksAbsolutePartitionConstraint
getLocations(IClusterStateManager csm) {
- return csm.getSortedClusterLocations();
+ protected AlgebricksAbsolutePartitionConstraint
getLocations(IClusterStateManager csm, MetadataProvider md) {
+ return md.getDataPartitioningProvider().getClusterLocations();
}
}
diff --git
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/IClusterStateManager.java
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/IClusterStateManager.java
index ffa6c872f1..a5d503b8c3 100644
---
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/IClusterStateManager.java
+++
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/IClusterStateManager.java
@@ -173,7 +173,7 @@ public interface IClusterStateManager {
/**
* @return the constraint representing all the partitions of the cluster
sorted by node name
*/
- AlgebricksAbsolutePartitionConstraint getSortedClusterLocations();
+ AlgebricksAbsolutePartitionConstraint getNodeSortedClusterLocations();
/**
* @param excludePendingRemoval true, if the desired set shouldn't have
pending removal nodes
diff --git
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/IDataPartitioningProvider.java
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/IDataPartitioningProvider.java
index e59d4e730e..828715d68b 100644
---
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/IDataPartitioningProvider.java
+++
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/IDataPartitioningProvider.java
@@ -18,5 +18,10 @@
*/
package org.apache.asterix.common.dataflow;
+import
org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
+
public interface IDataPartitioningProvider {
+
+ AlgebricksAbsolutePartitionConstraint getClusterLocations();
+
}
diff --git
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/abstracts/AbstractExternalInputStreamFactory.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/abstracts/AbstractExternalInputStreamFactory.java
index ac8859bd6e..06662e57de 100644
---
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/abstracts/AbstractExternalInputStreamFactory.java
+++
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/abstracts/AbstractExternalInputStreamFactory.java
@@ -70,8 +70,8 @@ public abstract class AbstractExternalInputStreamFactory
implements IInputStream
public void configure(IServiceContext ctx, Map<String, String>
configuration, IWarningCollector warningCollector,
IExternalFilterEvaluatorFactory filterEvaluatorFactory) throws
AlgebricksException, HyracksDataException {
this.configuration = configuration;
- this.partitionConstraint = ((ICcApplicationContext)
ctx.getApplicationContext()).getClusterStateManager()
- .getSortedClusterLocations();
+ this.partitionConstraint = ((ICcApplicationContext)
ctx.getApplicationContext()).getDataPartitioningProvider()
+ .getClusterLocations();
this.filterEvaluatorFactory = filterEvaluatorFactory;
}
diff --git
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/HDFSUtils.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/HDFSUtils.java
index e92448cdde..f7638b4696 100644
---
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/HDFSUtils.java
+++
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/HDFSUtils.java
@@ -252,7 +252,7 @@ public class HDFSUtils {
public static AlgebricksAbsolutePartitionConstraint
getPartitionConstraints(IApplicationContext appCtx,
AlgebricksAbsolutePartitionConstraint clusterLocations) {
if (clusterLocations == null) {
- return ((ICcApplicationContext)
appCtx).getClusterStateManager().getSortedClusterLocations();
+ return ((ICcApplicationContext)
appCtx).getDataPartitioningProvider().getClusterLocations();
}
return clusterLocations;
diff --git
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/FunctionDataSource.java
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/FunctionDataSource.java
index f6aa347d2f..91f7615cdc 100644
---
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/FunctionDataSource.java
+++
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/FunctionDataSource.java
@@ -110,7 +110,7 @@ public abstract class FunctionDataSource extends DataSource
{
adapterFactory.setOutputType(RecordUtil.FULLY_OPEN_RECORD_TYPE);
IClusterStateManager csm =
metadataProvider.getApplicationContext().getClusterStateManager();
FunctionDataSourceFactory factory =
- new FunctionDataSourceFactory(createFunction(metadataProvider,
getLocations(csm)));
+ new FunctionDataSourceFactory(createFunction(metadataProvider,
getLocations(csm, metadataProvider)));
IDataParserFactory dataParserFactory = createDataParserFactory();
dataParserFactory.setRecordType(RecordUtil.FULLY_OPEN_RECORD_TYPE);
dataParserFactory.configure(Collections.emptyMap());
@@ -126,8 +126,8 @@ public abstract class FunctionDataSource extends DataSource
{
protected abstract IDatasourceFunction createFunction(MetadataProvider
metadataProvider,
AlgebricksAbsolutePartitionConstraint locations);
- protected AlgebricksAbsolutePartitionConstraint
getLocations(IClusterStateManager csm) {
- String[] sortedLocations =
csm.getSortedClusterLocations().getLocations();
+ protected AlgebricksAbsolutePartitionConstraint
getLocations(IClusterStateManager csm, MetadataProvider md) {
+ String[] sortedLocations =
md.getDataPartitioningProvider().getClusterLocations().getLocations();
return new AlgebricksAbsolutePartitionConstraint(
Arrays.stream(sortedLocations).distinct().toArray(String[]::new));
}
diff --git
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
index 107cfb2cc5..8d8ca80810 100644
---
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
+++
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
@@ -1010,9 +1010,14 @@ public class MetadataProvider implements
IMetadataProvider<DataSourceId, String>
}
public AlgebricksAbsolutePartitionConstraint getClusterLocations() {
+ //TODO(partitioning): should this be removed and
getSortedClusterLocations() is used instead?
return appCtx.getClusterStateManager().getClusterLocations();
}
+ public DataPartitioningProvider getDataPartitioningProvider() {
+ return dataPartitioningProvider;
+ }
+
public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>
buildExternalDataLookupRuntime(
JobSpecification jobSpec, Dataset dataset, int[] ridIndexes,
boolean retainInput,
IVariableTypeEnvironment typeEnv, IOperatorSchema opSchema,
JobGenContext context,
diff --git
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DynamicDataPartitioningProvider.java
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DynamicDataPartitioningProvider.java
index 57392db4f0..1a1c8ac829 100644
---
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DynamicDataPartitioningProvider.java
+++
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DynamicDataPartitioningProvider.java
@@ -24,6 +24,7 @@ import org.apache.asterix.common.metadata.DataverseName;
import org.apache.asterix.common.utils.StoragePathUtil;
import org.apache.asterix.metadata.MetadataTransactionContext;
import org.apache.asterix.metadata.entities.Dataset;
+import
org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
import
org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
import org.apache.hyracks.algebricks.common.utils.Pair;
@@ -64,4 +65,9 @@ public class DynamicDataPartitioningProvider extends
DataPartitioningProvider {
int[][] partitionsMap =
getOneToOnePartitionsMap(getLocationsCount(splitsAndConstraints.second));
return PartitioningProperties.of(splitsAndConstraints.first,
splitsAndConstraints.second, partitionsMap);
}
+
+ @Override
+ public AlgebricksAbsolutePartitionConstraint getClusterLocations() {
+ return clusterStateManager.getNodeSortedClusterLocations();
+ }
}
diff --git
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/StaticDataPartitioningProvider.java
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/StaticDataPartitioningProvider.java
index 7206a5084a..44141cb1cd 100644
---
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/StaticDataPartitioningProvider.java
+++
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/StaticDataPartitioningProvider.java
@@ -30,6 +30,7 @@ import
org.apache.asterix.common.cluster.SplitComputeLocations;
import org.apache.asterix.common.cluster.StorageComputePartitionsMap;
import org.apache.asterix.common.dataflow.ICcApplicationContext;
import org.apache.asterix.common.metadata.DataverseName;
+import org.apache.asterix.common.metadata.MetadataConstants;
import org.apache.asterix.common.metadata.MetadataIndexImmutableProperties;
import org.apache.asterix.common.utils.StorageConstants;
import org.apache.asterix.common.utils.StoragePathUtil;
@@ -50,10 +51,10 @@ public class StaticDataPartitioningProvider extends
DataPartitioningProvider {
@Override
public PartitioningProperties getPartitioningProperties(String
databaseName) {
- SplitComputeLocations dataverseSplits = getSplits(databaseName);
+ SplitComputeLocations databaseSplits = getSplits(databaseName);
StorageComputePartitionsMap partitionMap =
clusterStateManager.getStorageComputeMap();
int[][] partitionsMap = partitionMap.getComputeToStorageMap(false);
- return PartitioningProperties.of(dataverseSplits.getSplitsProvider(),
dataverseSplits.getConstraints(),
+ return PartitioningProperties.of(databaseSplits.getSplitsProvider(),
databaseSplits.getConstraints(),
partitionsMap);
}
@@ -127,4 +128,10 @@ public class StaticDataPartitioningProvider extends
DataPartitioningProvider {
new
AlgebricksAbsolutePartitionConstraint(locations.toArray(new String[0]));
return new SplitComputeLocations(splitProvider, constraints);
}
+
+ @Override
+ public AlgebricksAbsolutePartitionConstraint getClusterLocations() {
+ SplitComputeLocations locations =
getSplits(MetadataConstants.DEFAULT_DATABASE);
+ return (AlgebricksAbsolutePartitionConstraint)
locations.getConstraints();
+ }
}
diff --git
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java
index 0a10204200..444d91bd32 100644
---
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java
+++
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java
@@ -302,7 +302,7 @@ public class ClusterStateManager implements
IClusterStateManager {
}
@Override
- public synchronized AlgebricksAbsolutePartitionConstraint
getSortedClusterLocations() {
+ public synchronized AlgebricksAbsolutePartitionConstraint
getNodeSortedClusterLocations() {
String[] clone = getClusterLocations().getLocations().clone();
Arrays.sort(clone);
return new AlgebricksAbsolutePartitionConstraint(clone);