This is an automated email from the ASF dual-hosted git repository.
alsuliman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/asterixdb.git
The following commit(s) were added to refs/heads/master by this push:
new fa38fd6b85 [ASTERIXDB-3547][EXT]: Use IDataPartitioningProvider for
Location Constraints Configuration in Delta
fa38fd6b85 is described below
commit fa38fd6b858cbd93e8ee21dc48346228f5363f26
Author: ayush.tripathi <[email protected]>
AuthorDate: Fri Jan 10 15:43:46 2025 +0530
[ASTERIXDB-3547][EXT]: Use IDataPartitioningProvider for Location
Constraints Configuration in Delta
- user model changes: no
- storage format changes: no
- interface changes: no
Ext-ref: MB-64800
Change-Id: I9ba2ce7afc54ddd08f5e522627c56e44f51cdbbc
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/19293
Integration-Tests: Jenkins <[email protected]>
Tested-by: Ali Alsuliman <[email protected]>
Reviewed-by: Ali Alsuliman <[email protected]>
---
.../external_dataset/ExternalDatasetTestUtils.java | 5 +
.../deltalake/DeltaTableGenerator.java | 131 ++++++++
.../deltalake-file-nine.00.ddl.sqlpp | 35 +++
.../deltalake-file-nine.01.query.sqlpp | 22 ++
.../deltalake-file-one.00.ddl.sqlpp | 35 +++
.../deltalake-file-one.01.query.sqlpp | 22 ++
.../deltalake-file-nine/deltalake-file-nine.01.adm | 9 +
.../deltalake-file-one/deltalake-file-one.01.adm | 1 +
.../runtimets/testsuite_external_dataset_s3.xml | 12 +
.../reader/aws/delta/DeltaReaderFactory.java | 31 +-
.../awss3/DeltaTopicPartitionDistributionTest.java | 342 +++++++++++++++++++++
11 files changed, 623 insertions(+), 22 deletions(-)
diff --git
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/external_dataset/ExternalDatasetTestUtils.java
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/external_dataset/ExternalDatasetTestUtils.java
index c50b391fa2..7e7b6f5d67 100644
---
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/external_dataset/ExternalDatasetTestUtils.java
+++
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/external_dataset/ExternalDatasetTestUtils.java
@@ -440,6 +440,11 @@ public class ExternalDatasetTestUtils {
loadDeltaDirectory(generatedDataBasePath,
"/multiple_file_delta_table/_delta_log", JSON_FILTER, "delta-data/");
loadDeltaDirectory(generatedDataBasePath,
"/delta_all_type/_delta_log", JSON_FILTER, "delta-data/");
loadDeltaDirectory(generatedDataBasePath, "/delta_all_type",
PARQUET_FILTER, "delta-data/");
+ loadDeltaDirectory(generatedDataBasePath,
"/delta_file_size_nine/_delta_log", JSON_FILTER, "delta-data/");
+ loadDeltaDirectory(generatedDataBasePath, "/delta_file_size_nine",
PARQUET_FILTER, "delta-data/");
+ loadDeltaDirectory(generatedDataBasePath,
"/delta_file_size_one/_delta_log", JSON_FILTER, "delta-data/");
+ loadDeltaDirectory(generatedDataBasePath, "/delta_file_size_one",
PARQUET_FILTER, "delta-data/");
+
}
private static void loadDeltaDirectory(String dataBasePath, String
rootPath, FilenameFilter filter,
diff --git
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/external_dataset/deltalake/DeltaTableGenerator.java
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/external_dataset/deltalake/DeltaTableGenerator.java
index 1236636054..67d460c943 100644
---
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/external_dataset/deltalake/DeltaTableGenerator.java
+++
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/external_dataset/deltalake/DeltaTableGenerator.java
@@ -55,6 +55,10 @@ public class DeltaTableGenerator {
"target" + File.separatorChar + "generated_delta_files" +
File.separatorChar + "modified_delta_table";
public static final String DELTA_MULTI_FILE_TABLE =
"target" + File.separatorChar + "generated_delta_files" +
File.separatorChar + "multiple_file_delta_table";
+ public static final String DELTA_FILE_SIZE_ONE =
+ "target" + File.separatorChar + "generated_delta_files" +
File.separatorChar + "delta_file_size_one";
+ public static final String DELTA_FILE_SIZE_NINE =
+ "target" + File.separatorChar + "generated_delta_files" +
File.separatorChar + "delta_file_size_nine";
public static void prepareDeltaTableContainer(Configuration conf) {
File basePath = new File(".");
@@ -62,6 +66,8 @@ public class DeltaTableGenerator {
prepareMultipleFilesTable(conf);
prepareModifiedTable(conf);
prepareEmptyTable(conf);
+ prepareFileSizeOne(conf);
+ prepareFileSizeNine(conf);
}
public static void cleanBinaryDirectory(File localDataRoot, String
binaryFilesPath) {
@@ -221,4 +227,129 @@ public class DeltaTableGenerator {
throw new RuntimeException(e);
}
}
+
+ public static void prepareFileSizeOne(Configuration conf) {
+ Schema schema =
SchemaBuilder.record("MyRecord").fields().requiredInt("id").requiredString("name").endRecord();
+ try {
+ Path path = new Path(DELTA_FILE_SIZE_ONE, "firstFile.parquet");
+ ParquetWriter<GenericData.Record> writer =
+ AvroParquetWriter.<GenericData.Record>
builder(path).withConf(conf).withSchema(schema).build();
+
+ List<GenericData.Record> fileFirstSnapshotRecords = List.of(new
GenericData.Record(schema));
+
+ fileFirstSnapshotRecords.get(0).put("id", 0);
+ fileFirstSnapshotRecords.get(0).put("name", "Cooper");
+
+ for (GenericData.Record record : fileFirstSnapshotRecords) {
+ writer.write(record);
+ }
+
+ long size = writer.getDataSize();
+ writer.close();
+
+ List<Action> actions = List.of(new AddFile("firstFile.parquet",
new HashMap<>(), size,
+ System.currentTimeMillis(), true, null, null));
+ DeltaLog log = DeltaLog.forTable(conf, DELTA_FILE_SIZE_ONE);
+ OptimisticTransaction txn = log.startTransaction();
+ Metadata metaData =
txn.metadata().copyBuilder().partitionColumns(new ArrayList<>())
+ .schema(new StructType().add(new StructField("id", new
IntegerType(), true))
+ .add(new StructField("name", new StringType(),
true)))
+ .build();
+ txn.updateMetadata(metaData);
+ txn.commit(actions, new Operation(Operation.Name.CREATE_TABLE),
"deltalake-table-create");
+
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ public static void prepareFileSizeNine(Configuration conf) {
+ Schema schema =
SchemaBuilder.record("MyRecord").fields().requiredInt("id").requiredString("name").endRecord();
+ try {
+ Path path = new Path(DELTA_FILE_SIZE_NINE, "firstFile.parquet");
+ ParquetWriter<GenericData.Record> writer =
+ AvroParquetWriter.<GenericData.Record>
builder(path).withConf(conf).withSchema(schema).build();
+
+ List<GenericData.Record> fileFirstSnapshotRecords = List.of(new
GenericData.Record(schema));
+ List<GenericData.Record> fileSecondSnapshotRecords = List.of(new
GenericData.Record(schema));
+ List<GenericData.Record> fileThirdSnapshotRecords = List.of(new
GenericData.Record(schema));
+ List<GenericData.Record> fileFourthSnapshotRecords = List.of(new
GenericData.Record(schema));
+ List<GenericData.Record> fileFifthSnapshotRecords = List.of(new
GenericData.Record(schema));
+ List<GenericData.Record> fileSixthSnapshotRecords = List.of(new
GenericData.Record(schema));
+ List<GenericData.Record> fileSeventhSnapshotRecords = List.of(new
GenericData.Record(schema));
+ List<GenericData.Record> fileEightSnapshotRecords = List.of(new
GenericData.Record(schema));
+ List<GenericData.Record> fileNineSnapshotRecords = List.of(new
GenericData.Record(schema));
+
+ List<List<GenericData.Record>> allSnapshotRecords =
+ List.of(fileFirstSnapshotRecords,
fileSecondSnapshotRecords, fileThirdSnapshotRecords,
+ fileFourthSnapshotRecords,
fileFifthSnapshotRecords, fileSixthSnapshotRecords,
+ fileSeventhSnapshotRecords,
fileEightSnapshotRecords, fileNineSnapshotRecords);
+
+ fileFirstSnapshotRecords.get(0).put("id", 0);
+ fileFirstSnapshotRecords.get(0).put("name", "Cooper");
+
+ fileSecondSnapshotRecords.get(0).put("id", 1);
+ fileSecondSnapshotRecords.get(0).put("name", "Adam");
+
+ fileThirdSnapshotRecords.get(0).put("id", 2);
+ fileThirdSnapshotRecords.get(0).put("name", "Third");
+
+ fileFourthSnapshotRecords.get(0).put("id", 3);
+ fileFourthSnapshotRecords.get(0).put("name", "Fourth");
+
+ fileFifthSnapshotRecords.get(0).put("id", 4);
+ fileFifthSnapshotRecords.get(0).put("name", "Five");
+
+ fileSixthSnapshotRecords.get(0).put("id", 5);
+ fileSixthSnapshotRecords.get(0).put("name", "Six");
+
+ fileSeventhSnapshotRecords.get(0).put("id", 6);
+ fileSeventhSnapshotRecords.get(0).put("name", "Seven");
+
+ fileEightSnapshotRecords.get(0).put("id", 7);
+ fileEightSnapshotRecords.get(0).put("name", "Eight");
+
+ fileNineSnapshotRecords.get(0).put("id", 8);
+ fileNineSnapshotRecords.get(0).put("name", "Nine");
+
+ for (GenericData.Record record : fileFirstSnapshotRecords) {
+ writer.write(record);
+ }
+
+ long size = writer.getDataSize();
+ writer.close();
+
+ List<Action> actions = List.of(new AddFile("firstFile.parquet",
new HashMap<>(), size,
+ System.currentTimeMillis(), true, null, null));
+ DeltaLog log = DeltaLog.forTable(conf, DELTA_FILE_SIZE_NINE);
+ OptimisticTransaction txn = log.startTransaction();
+ Metadata metaData =
txn.metadata().copyBuilder().partitionColumns(new ArrayList<>())
+ .schema(new StructType().add(new StructField("id", new
IntegerType(), true))
+ .add(new StructField("name", new StringType(),
true)))
+ .build();
+ txn.updateMetadata(metaData);
+ txn.commit(actions, new Operation(Operation.Name.CREATE_TABLE),
"deltalake-table-create");
+
+ for (int i = 2; i <= 9; i++) {
+ Path path2 = new Path(DELTA_FILE_SIZE_NINE, "File" + i +
".parquet");
+ ParquetWriter<GenericData.Record> writer2 =
+ AvroParquetWriter.<GenericData.Record>
builder(path2).withConf(conf).withSchema(schema).build();
+
+ for (GenericData.Record record : allSnapshotRecords.get(i -
1)) {
+ writer2.write(record);
+ }
+
+ long size2 = writer2.getDataSize();
+ writer2.close();
+
+ List<Action> actions2 = List.of(new AddFile("File" + i +
".parquet", new HashMap<>(), size2,
+ System.currentTimeMillis(), true, null, null));
+
+ OptimisticTransaction txn2 = log.startTransaction();
+ txn2.commit(actions2, new Operation(Operation.Name.WRITE),
"deltalake-table-create");
+ }
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
}
diff --git
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/deltalake-file-nine/deltalake-file-nine.00.ddl.sqlpp
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/deltalake-file-nine/deltalake-file-nine.00.ddl.sqlpp
new file mode 100644
index 0000000000..c1a74c5054
--- /dev/null
+++
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/deltalake-file-nine/deltalake-file-nine.00.ddl.sqlpp
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+ DROP DATAVERSE test IF EXISTS;
+ CREATE DATAVERSE test;
+
+ USE test;
+
+
+ CREATE TYPE DeltalakeTableType as {
+ };
+
+ CREATE EXTERNAL COLLECTION DeltalakeDataset(DeltalakeTableType) USING
%adapter%
+ (
+ %template%,
+ ("container"="playground"),
+ ("definition"="delta-data/delta_file_size_nine"),
+ ("table-format" = "delta")
+ );
\ No newline at end of file
diff --git
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/deltalake-file-nine/deltalake-file-nine.01.query.sqlpp
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/deltalake-file-nine/deltalake-file-nine.01.query.sqlpp
new file mode 100644
index 0000000000..db2abf5bed
--- /dev/null
+++
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/deltalake-file-nine/deltalake-file-nine.01.query.sqlpp
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+ USE test;
+
+ SELECT element ds FROM DeltalakeDataset as ds order by id;
\ No newline at end of file
diff --git
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/deltalake-file-one/deltalake-file-one.00.ddl.sqlpp
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/deltalake-file-one/deltalake-file-one.00.ddl.sqlpp
new file mode 100644
index 0000000000..1284e933a3
--- /dev/null
+++
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/deltalake-file-one/deltalake-file-one.00.ddl.sqlpp
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+ DROP DATAVERSE test IF EXISTS;
+ CREATE DATAVERSE test;
+
+ USE test;
+
+
+ CREATE TYPE DeltalakeTableType as {
+ };
+
+ CREATE EXTERNAL COLLECTION DeltalakeDataset(DeltalakeTableType) USING
%adapter%
+ (
+ %template%,
+ ("container"="playground"),
+ ("definition"="delta-data/delta_file_size_one"),
+ ("table-format" = "delta")
+ );
\ No newline at end of file
diff --git
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/deltalake-file-one/deltalake-file-one.01.query.sqlpp
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/deltalake-file-one/deltalake-file-one.01.query.sqlpp
new file mode 100644
index 0000000000..84e79142a8
--- /dev/null
+++
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/deltalake-file-one/deltalake-file-one.01.query.sqlpp
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+ USE test;
+
+ SELECT element ds FROM DeltalakeDataset as ds;
\ No newline at end of file
diff --git
a/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/deltalake-file-nine/deltalake-file-nine.01.adm
b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/deltalake-file-nine/deltalake-file-nine.01.adm
new file mode 100644
index 0000000000..500f6a9970
--- /dev/null
+++
b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/deltalake-file-nine/deltalake-file-nine.01.adm
@@ -0,0 +1,9 @@
+{ "id": 0, "name": "Cooper" }
+{ "id": 1, "name": "Adam" }
+{ "id": 2, "name": "Third" }
+{ "id": 3, "name": "Fourth" }
+{ "id": 4, "name": "Five" }
+{ "id": 5, "name": "Six" }
+{ "id": 6, "name": "Seven" }
+{ "id": 7, "name": "Eight" }
+{ "id": 8, "name": "Nine" }
diff --git
a/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/deltalake-file-one/deltalake-file-one.01.adm
b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/deltalake-file-one/deltalake-file-one.01.adm
new file mode 100644
index 0000000000..006681c6f3
--- /dev/null
+++
b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/deltalake-file-one/deltalake-file-one.01.adm
@@ -0,0 +1 @@
+{ "id": 0, "name": "Cooper" }
\ No newline at end of file
diff --git
a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_external_dataset_s3.xml
b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_external_dataset_s3.xml
index ff1b325c9f..c31cb5e7c4 100644
---
a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_external_dataset_s3.xml
+++
b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_external_dataset_s3.xml
@@ -555,6 +555,18 @@
<expected-error>ASX1108: External source error.
io.delta.kernel.exceptions.TableNotFoundException: Delta table at path
`s3a://playground/delta-data/s1` is not found.</expected-error>
</compilation-unit>
</test-case>
+ <test-case FilePath="external-dataset">
+ <compilation-unit name="common/deltalake-file-one">
+ <placeholder name="adapter" value="S3" />
+ <output-dir compare="Text">common/deltalake-file-one</output-dir>
+ </compilation-unit>
+ </test-case>
+ <test-case FilePath="external-dataset">
+ <compilation-unit name="common/deltalake-file-nine">
+ <placeholder name="adapter" value="S3" />
+ <output-dir compare="Text">common/deltalake-file-nine</output-dir>
+ </compilation-unit>
+ </test-case>
<test-case FilePath="external-dataset">
<compilation-unit name="common/avro/avro-types/avro-map">
<placeholder name="adapter" value="S3" />
diff --git
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/DeltaReaderFactory.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/DeltaReaderFactory.java
index dc4c310660..790db8c24d 100644
---
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/DeltaReaderFactory.java
+++
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/DeltaReaderFactory.java
@@ -30,7 +30,6 @@ import java.util.Map;
import java.util.PriorityQueue;
import java.util.Set;
-import org.apache.asterix.common.cluster.IClusterStateManager;
import org.apache.asterix.common.dataflow.ICcApplicationContext;
import org.apache.asterix.common.exceptions.AsterixException;
import org.apache.asterix.common.exceptions.ErrorCode;
@@ -44,7 +43,6 @@ import org.apache.asterix.external.util.ExternalDataConstants;
import org.apache.asterix.external.util.HDFSUtils;
import org.apache.asterix.om.types.ARecordType;
import org.apache.asterix.runtime.projection.FunctionCallInformation;
-import org.apache.commons.lang3.ArrayUtils;
import org.apache.hadoop.mapred.JobConf;
import
org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
@@ -77,6 +75,10 @@ public abstract class DeltaReaderFactory implements
IRecordReaderFactory<Object>
protected final List<PartitionWorkLoadBasedOnSize>
partitionWorkLoadsBasedOnSize = new ArrayList<>();
protected ConfFactory confFactory;
+ public List<PartitionWorkLoadBasedOnSize>
getPartitionWorkLoadsBasedOnSize() {
+ return partitionWorkLoadsBasedOnSize;
+ }
+
@Override
public AlgebricksAbsolutePartitionConstraint getPartitionConstraint() {
return locationConstraints;
@@ -134,9 +136,9 @@ public abstract class DeltaReaderFactory implements
IRecordReaderFactory<Object>
scanFiles.add(row);
}
}
- locationConstraints = configureLocationConstraints(appCtx, scanFiles);
+ locationConstraints = getPartitions(appCtx);
configuration.put(ExternalDataConstants.KEY_PARSER,
ExternalDataConstants.FORMAT_DELTA);
- distributeFiles(scanFiles);
+ distributeFiles(scanFiles,
getPartitionConstraint().getLocations().length);
issueWarnings(warnings, warningCollector);
}
@@ -151,26 +153,11 @@ public abstract class DeltaReaderFactory implements
IRecordReaderFactory<Object>
warnings.clear();
}
- private AlgebricksAbsolutePartitionConstraint
configureLocationConstraints(ICcApplicationContext appCtx,
- List<Row> scanFiles) {
- IClusterStateManager csm = appCtx.getClusterStateManager();
-
- String[] locations = csm.getClusterLocations().getLocations();
- if (scanFiles.size() == 0) {
- return
AlgebricksAbsolutePartitionConstraint.randomLocation(locations);
- } else if (locations.length > scanFiles.size()) {
- LOGGER.debug(
- "configured partitions ({}) exceeds total partition count
({}); limiting configured partitions to total partition count",
- locations.length, scanFiles.size());
- final String[] locationCopy = locations.clone();
- ArrayUtils.shuffle(locationCopy);
- locations = ArrayUtils.subarray(locationCopy, 0, scanFiles.size());
- }
- return new AlgebricksAbsolutePartitionConstraint(locations);
+ public AlgebricksAbsolutePartitionConstraint
getPartitions(ICcApplicationContext appCtx) {
+ return appCtx.getDataPartitioningProvider().getClusterLocations();
}
- private void distributeFiles(List<Row> scanFiles) {
- final int partitionsCount =
getPartitionConstraint().getLocations().length;
+ public void distributeFiles(List<Row> scanFiles, int partitionsCount) {
PriorityQueue<PartitionWorkLoadBasedOnSize> workloadQueue = new
PriorityQueue<>(partitionsCount,
Comparator.comparingLong(PartitionWorkLoadBasedOnSize::getTotalSize));
diff --git
a/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/input/record/reader/awss3/DeltaTopicPartitionDistributionTest.java
b/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/input/record/reader/awss3/DeltaTopicPartitionDistributionTest.java
new file mode 100644
index 0000000000..1bc8eb857c
--- /dev/null
+++
b/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/input/record/reader/awss3/DeltaTopicPartitionDistributionTest.java
@@ -0,0 +1,342 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.input.record.reader.awss3;
+
+import static io.delta.kernel.internal.InternalScanFileUtils.ADD_FILE_ORDINAL;
+
+import java.math.BigDecimal;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import
org.apache.asterix.external.input.record.reader.aws.delta.DeltaReaderFactory;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.junit.Assert;
+import org.junit.Test;
+
+import io.delta.kernel.data.ArrayValue;
+import io.delta.kernel.data.MapValue;
+import io.delta.kernel.data.Row;
+import io.delta.kernel.types.IntegerType;
+import io.delta.kernel.types.StringType;
+import io.delta.kernel.types.StructField;
+import io.delta.kernel.types.StructType;
+
+public class DeltaTopicPartitionDistributionTest {
+
+ @Test
+ public void distributeFilesMoreFilesThanPartitions() {
+ int rowCount = 25;
+ int numberOfPartition = 13;
+ List<Row> scanFiles = createMockRows(rowCount);
+ DeltaReaderFactory d = new DeltaReaderFactory() {
+ @Override
+ protected void configureJobConf(JobConf conf, Map<String, String>
configuration)
+ throws AlgebricksException {
+
+ }
+
+ @Override
+ protected String getTablePath(Map<String, String> configuration)
throws AlgebricksException {
+ return null;
+ }
+
+ @Override
+ public List<String> getRecordReaderNames() {
+ return null;
+ }
+ };
+ d.distributeFiles(scanFiles, numberOfPartition);
+ Assert.assertEquals(numberOfPartition,
d.getPartitionWorkLoadsBasedOnSize().size());
+ verifyFileDistribution(scanFiles.size(),
d.getPartitionWorkLoadsBasedOnSize());
+ }
+
+ @Test
+ public void distributeFilesLessFilesThanPartitions() {
+ int rowCount = 15;
+ int numberOfPartition = 23;
+ List<Row> scanFiles = createMockRows(rowCount);
+ DeltaReaderFactory d = new DeltaReaderFactory() {
+ @Override
+ protected void configureJobConf(JobConf conf, Map<String, String>
configuration)
+ throws AlgebricksException {
+
+ }
+
+ @Override
+ protected String getTablePath(Map<String, String> configuration)
throws AlgebricksException {
+ return null;
+ }
+
+ @Override
+ public List<String> getRecordReaderNames() {
+ return null;
+ }
+ };
+ d.distributeFiles(scanFiles, numberOfPartition);
+ Assert.assertEquals(numberOfPartition,
d.getPartitionWorkLoadsBasedOnSize().size());
+ verifyFileDistribution(scanFiles.size(),
d.getPartitionWorkLoadsBasedOnSize());
+ }
+
+ @Test
+ public void distributeFilesEqualFilesAndPartitions() {
+ int rowCount = 9;
+ int numberOfPartition = 9;
+ List<Row> scanFiles = createMockRows(rowCount);
+ DeltaReaderFactory d = new DeltaReaderFactory() {
+ @Override
+ protected void configureJobConf(JobConf conf, Map<String, String>
configuration)
+ throws AlgebricksException {
+
+ }
+
+ @Override
+ protected String getTablePath(Map<String, String> configuration)
throws AlgebricksException {
+ return null;
+ }
+
+ @Override
+ public List<String> getRecordReaderNames() {
+ return null;
+ }
+ };
+ d.distributeFiles(scanFiles, numberOfPartition);
+ Assert.assertEquals(numberOfPartition,
d.getPartitionWorkLoadsBasedOnSize().size());
+ verifyFileDistribution(scanFiles.size(),
d.getPartitionWorkLoadsBasedOnSize());
+ }
+
+ private void verifyFileDistribution(int numberOfFiles,
+ List<DeltaReaderFactory.PartitionWorkLoadBasedOnSize> workloads) {
+ int totalDistributedFiles = 0;
+
+ for (DeltaReaderFactory.PartitionWorkLoadBasedOnSize workload :
workloads) {
+ totalDistributedFiles += workload.getScanFiles().size();
+ Assert.assertTrue(workload.getTotalSize() >= 0);
+ }
+ Assert.assertEquals(numberOfFiles, totalDistributedFiles);
+ }
+
+ private List<Row> createMockRows(int count) {
+ List<Row> rows = new ArrayList<>();
+ StructType sch = createMockSchema();
+
+ for (int i = 1; i <= count; i++) {
+ int finalI = i;
+ Row row = new Row() {
+
+ @Override
+ public StructType getSchema() {
+ return sch;
+ }
+
+ @Override
+ public boolean isNullAt(int i) {
+ return false;
+ }
+
+ @Override
+ public boolean getBoolean(int i) {
+ return false;
+ }
+
+ @Override
+ public byte getByte(int i) {
+ return 0;
+ }
+
+ @Override
+ public short getShort(int i) {
+ return 0;
+ }
+
+ @Override
+ public int getInt(int i) {
+ if (i == 1) {
+ return finalI;
+ } else if (i == 2) {
+ return finalI * 10;
+ }
+ return 0;
+ }
+
+ @Override
+ public long getLong(int i) {
+ return 0;
+ }
+
+ @Override
+ public float getFloat(int i) {
+ return 0;
+ }
+
+ @Override
+ public double getDouble(int i) {
+ return 0;
+ }
+
+ @Override
+ public String getString(int i) {
+ if (i == 0) {
+ return "tableRoot_" + finalI;
+ } else if (i == 1) {
+ return "addFilePath_" + finalI;
+ }
+ return null;
+ }
+
+ @Override
+ public BigDecimal getDecimal(int i) {
+ return null;
+ }
+
+ @Override
+ public byte[] getBinary(int i) {
+ return new byte[0];
+ }
+
+ @Override
+ public ArrayValue getArray(int i) {
+ return null;
+ }
+
+ @Override
+ public MapValue getMap(int i) {
+ return null;
+ }
+
+ @Override
+ public Row getStruct(int index) {
+ if (index == ADD_FILE_ORDINAL) {
+ return createAddFileEntry(finalI);
+ }
+ return null;
+ }
+ };
+
+ rows.add(row);
+ }
+
+ return rows;
+ }
+
+ private StructType createMockSchema() {
+ List<StructField> fields = new ArrayList<>();
+
+ fields.add(new StructField("field1", StringType.STRING, true));
+ fields.add(new StructField("field2", IntegerType.INTEGER, true));
+ fields.add(new StructField("field3", IntegerType.INTEGER, true));
+
+ return new StructType(fields);
+ }
+
+ private Row createAddFileEntry(int i) {
+ List<StructField> addFileFields = new ArrayList<>();
+
+ addFileFields.add(new StructField("addFilePath", StringType.STRING,
true));
+ addFileFields.add(new StructField("size", IntegerType.INTEGER, true));
+
+ StructType addFileSchema = new StructType(addFileFields);
+
+ Row addFileRow = new Row() {
+ @Override
+ public StructType getSchema() {
+ return addFileSchema;
+ }
+
+ @Override
+ public boolean isNullAt(int index) {
+ return false;
+ }
+
+ @Override
+ public boolean getBoolean(int i) {
+ return false;
+ }
+
+ @Override
+ public byte getByte(int i) {
+ return 0;
+ }
+
+ @Override
+ public short getShort(int i) {
+ return 0;
+ }
+
+ @Override
+ public int getInt(int index) {
+ if (index == 1) {
+ return i * 100;
+ }
+ return 0;
+ }
+
+ @Override
+ public long getLong(int i) {
+ return 0;
+ }
+
+ @Override
+ public float getFloat(int i) {
+ return 0;
+ }
+
+ @Override
+ public double getDouble(int i) {
+ return 0;
+ }
+
+ @Override
+ public BigDecimal getDecimal(int i) {
+ return null;
+ }
+
+ @Override
+ public byte[] getBinary(int i) {
+ return new byte[0];
+ }
+
+ @Override
+ public Row getStruct(int index) {
+ return null;
+ }
+
+ @Override
+ public ArrayValue getArray(int index) {
+ return null;
+ }
+
+ @Override
+ public MapValue getMap(int index) {
+ return null;
+ }
+
+ @Override
+ public String getString(int index) {
+ if (index == 0) {
+ return "addFilePath_" + i;
+ }
+ return null;
+ }
+ };
+
+ return addFileRow;
+ }
+}