This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 7a55ad341b6 [HUDI-6991] Fix hoodie.parquet.max.file.size conf reset
error (#9924)
7a55ad341b6 is described below
commit 7a55ad341b69df4d2e04d56687e591612103c0b4
Author: ksmou <[email protected]>
AuthorDate: Thu Nov 2 13:44:30 2023 +0800
[HUDI-6991] Fix hoodie.parquet.max.file.size conf reset error (#9924)
---
.../SparkSortAndSizeExecutionStrategy.java | 4 +-
.../functional/TestSparkSortAndSizeClustering.java | 167 +++++++++++++++++++++
2 files changed, 169 insertions(+), 2 deletions(-)
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/SparkSortAndSizeExecutionStrategy.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/SparkSortAndSizeExecutionStrategy.java
index 85ee7ec9d4b..843a638e4cf 100644
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/SparkSortAndSizeExecutionStrategy.java
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/SparkSortAndSizeExecutionStrategy.java
@@ -68,7 +68,7 @@ public class SparkSortAndSizeExecutionStrategy<T>
.withBulkInsertParallelism(numOutputGroups)
.withProps(getWriteConfig().getProps()).build();
- newConfig.setValue(HoodieStorageConfig.PARQUET_MAX_FILE_SIZE,
String.valueOf(getWriteConfig().getClusteringMaxBytesInGroup()));
+ newConfig.setValue(HoodieStorageConfig.PARQUET_MAX_FILE_SIZE,
String.valueOf(getWriteConfig().getClusteringTargetFileMaxBytes()));
BulkInsertPartitioner<Dataset<Row>> partitioner =
getRowPartitioner(strategyParams, schema);
Dataset<Row> repartitionedRecords =
partitioner.repartitionRecords(inputRecords, numOutputGroups);
@@ -92,7 +92,7 @@ public class SparkSortAndSizeExecutionStrategy<T>
.withBulkInsertParallelism(numOutputGroups)
.withProps(getWriteConfig().getProps()).build();
- newConfig.setValue(HoodieStorageConfig.PARQUET_MAX_FILE_SIZE,
String.valueOf(getWriteConfig().getClusteringMaxBytesInGroup()));
+ newConfig.setValue(HoodieStorageConfig.PARQUET_MAX_FILE_SIZE,
String.valueOf(getWriteConfig().getClusteringTargetFileMaxBytes()));
return (HoodieData<WriteStatus>)
SparkBulkInsertHelper.newInstance().bulkInsert(inputRecords, instantTime,
getHoodieTable(),
newConfig, false, getRDDPartitioner(strategyParams, schema), true,
numOutputGroups, new CreateHandleFactory(shouldPreserveHoodieMetadata));
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestSparkSortAndSizeClustering.java
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestSparkSortAndSizeClustering.java
new file mode 100644
index 00000000000..b1e7765fc8b
--- /dev/null
+++
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestSparkSortAndSizeClustering.java
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.functional;
+
+import org.apache.hudi.avro.model.HoodieClusteringGroup;
+import org.apache.hudi.avro.model.HoodieClusteringPlan;
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.config.HoodieStorageConfig;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.model.HoodieWriteStat;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
+import org.apache.hudi.common.table.view.FileSystemViewStorageType;
+import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
+import org.apache.hudi.common.testutils.HoodieTestUtils;
+import org.apache.hudi.common.util.ClusteringUtils;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.config.HoodieClusteringConfig;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
+import org.apache.hudi.table.action.HoodieWriteMetadata;
+import org.apache.hudi.table.action.cluster.ClusteringPlanPartitionFilterMode;
+import org.apache.hudi.testutils.HoodieSparkClientTestHarness;
+import org.apache.hudi.testutils.MetadataMergeWriteStatus;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.stream.Collectors;
+
+public class TestSparkSortAndSizeClustering extends
HoodieSparkClientTestHarness {
+
+
+ private HoodieWriteConfig config;
+ private HoodieTestDataGenerator dataGen = new HoodieTestDataGenerator(0);
+
+ public void setup(int maxFileSize) throws IOException {
+ setup(maxFileSize, Collections.emptyMap());
+ }
+
+ public void setup(int maxFileSize, Map<String, String> options) throws
IOException {
+ initPath();
+ initSparkContexts();
+ initTestDataGenerator();
+ initFileSystem();
+ Properties props = getPropertiesForKeyGen(true);
+ props.putAll(options);
+ props.setProperty(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key(),
"_row_key");
+ metaClient = HoodieTestUtils.init(hadoopConf, basePath,
HoodieTableType.COPY_ON_WRITE, props);
+ config = getConfigBuilder().withProps(props)
+ .withAutoCommit(false)
+
.withStorageConfig(HoodieStorageConfig.newBuilder().parquetMaxFileSize(maxFileSize).build())
+ .withClusteringConfig(HoodieClusteringConfig.newBuilder()
+
.withClusteringPlanPartitionFilterMode(ClusteringPlanPartitionFilterMode.RECENT_DAYS)
+ .build())
+ .build();
+
+ writeClient = getHoodieWriteClient(config);
+ }
+
+ @AfterEach
+ public void tearDown() throws IOException {
+ cleanupResources();
+ }
+
+ @Test
+ public void testClusteringWithRDD() throws IOException {
+ writeAndClustering(false);
+ }
+
+ @Test
+ public void testClusteringWithRow() throws IOException {
+ writeAndClustering(true);
+ }
+
+ public void writeAndClustering(boolean isRow) throws IOException {
+ setup(102400);
+ config.setValue("hoodie.datasource.write.row.writer.enable",
String.valueOf(isRow));
+ config.setValue("hoodie.metadata.enable", "false");
+
config.setValue("hoodie.clustering.plan.strategy.daybased.lookback.partitions",
"1");
+ config.setValue("hoodie.clustering.plan.strategy.target.file.max.bytes",
String.valueOf(1024 * 1024));
+ config.setValue("hoodie.clustering.plan.strategy.max.bytes.per.group",
String.valueOf(2 * 1024 * 1024));
+
+ int numRecords = 1000;
+ writeData(writeClient.createNewInstantTime(), numRecords, true);
+
+ String clusteringTime = (String)
writeClient.scheduleClustering(Option.empty()).get();
+ HoodieClusteringPlan plan = ClusteringUtils.getClusteringPlan(
+ metaClient,
HoodieTimeline.getReplaceCommitRequestedInstant(clusteringTime)).map(Pair::getRight).get();
+
+ List<HoodieClusteringGroup> inputGroups = plan.getInputGroups();
+ Assertions.assertEquals(1, inputGroups.size(), "Clustering plan will
contain 1 input group");
+
+ Integer outputFileGroups =
plan.getInputGroups().get(0).getNumOutputFileGroups();
+ Assertions.assertEquals(2, outputFileGroups, "Clustering plan will
generate 2 output groups");
+
+ HoodieWriteMetadata writeMetadata = writeClient.cluster(clusteringTime,
true);
+ List<HoodieWriteStat> writeStats =
(List<HoodieWriteStat>)writeMetadata.getWriteStats().get();
+ Assertions.assertEquals(2, writeStats.size(), "Clustering should write 2
files");
+
+ List<Row> rows = readRecords();
+ Assertions.assertEquals(numRecords, rows.size());
+ }
+
+ private List<WriteStatus> writeData(String commitTime, int totalRecords,
boolean doCommit) {
+ List<HoodieRecord> records = dataGen.generateInserts(commitTime,
totalRecords);
+ JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(records);
+ metaClient = HoodieTableMetaClient.reload(metaClient);
+
+ writeClient.startCommitWithTime(commitTime);
+ List<WriteStatus> writeStatues = writeClient.insert(writeRecords,
commitTime).collect();
+ org.apache.hudi.testutils.Assertions.assertNoWriteErrors(writeStatues);
+
+ if (doCommit) {
+ Assertions.assertTrue(writeClient.commitStats(commitTime,
context.parallelize(writeStatues, 1),
writeStatues.stream().map(WriteStatus::getStat).collect(Collectors.toList()),
+ Option.empty(), metaClient.getCommitActionType()));
+ }
+
+ metaClient = HoodieTableMetaClient.reload(metaClient);
+ return writeStatues;
+ }
+
+ private List<Row> readRecords() {
+ Dataset<Row> roViewDF = sparkSession
+ .read()
+ .format("hudi")
+ .load(basePath + "/*/*/*/*");
+ roViewDF.createOrReplaceTempView("clutering_table");
+ return sparkSession.sqlContext().sql("select * from
clutering_table").collectAsList();
+ }
+
+ public HoodieWriteConfig.Builder getConfigBuilder() {
+ return
HoodieWriteConfig.newBuilder().withPath(basePath).withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA)
+ .withParallelism(2, 2)
+ .withWriteStatusClass(MetadataMergeWriteStatus.class)
+ .forTable("clustering-table")
+
.withEmbeddedTimelineServerEnabled(true).withFileSystemViewConfig(FileSystemViewStorageConfig.newBuilder()
+
.withStorageType(FileSystemViewStorageType.EMBEDDED_KV_STORE).build());
+ }
+}