This is an automated email from the ASF dual-hosted git repository. zhangyue19921010 pushed a commit to branch partition-bucket-index in repository https://gitbox.apache.org/repos/asf/hudi.git
commit ed0b5a97c821004dbaf1b6c2dd3358f3de464fd2 Author: Lokesh Jain <[email protected]> AuthorDate: Fri Mar 21 11:01:20 2025 +0530 [HUDI-9120] Remove HUDI-9130 code changes relating to FGR (#12935) --- .../hudi/table/action/compact/HoodieCompactor.java | 2 +- .../MultipleSparkJobExecutionStrategy.java | 2 +- .../hudi/common/config/HoodieReaderConfig.java | 19 ------- .../hudi/common/table/TestHoodieReaderConfig.java | 66 ---------------------- .../hudi/hadoop/utils/HoodieInputFormatUtils.java | 39 +++---------- .../main/scala/org/apache/hudi/DefaultSource.scala | 5 +- .../org/apache/hudi/TestHoodieSparkSqlWriter.scala | 12 ++-- 7 files changed, 21 insertions(+), 124 deletions(-) diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/HoodieCompactor.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/HoodieCompactor.java index 1dc9a3742ab..bca2dfe26c5 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/HoodieCompactor.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/HoodieCompactor.java @@ -146,7 +146,7 @@ public abstract class HoodieCompactor<T, I, K, O> implements Serializable { boolean useFileGroupReaderBasedCompaction = context.supportsFileGroupReader() // the engine needs to support fg reader first && !metaClient.isMetadataTable() - && HoodieReaderConfig.isFileGroupReaderEnabled(metaClient.getTableConfig().getTableVersion(), config) + && config.getBooleanOrDefault(HoodieReaderConfig.FILE_GROUP_READER_ENABLED) && operationType == WriteOperationType.COMPACT && !hasBootstrapFile(operations) // bootstrap file read for fg reader is not ready && config.populateMetaFields(); // Virtual key support by fg reader is not ready diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java index c5b8957556d..747c47b987b 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java @@ -371,7 +371,7 @@ public abstract class MultipleSparkJobExecutionStrategy<T> Schema tableSchemaWithMetaFields) { List<ClusteringOperation> clusteringOps = clusteringGroup.getSlices().stream() .map(ClusteringOperation::create).collect(Collectors.toList()); - boolean canUseFileGroupReaderBasedClustering = HoodieReaderConfig.isFileGroupReaderEnabled(getHoodieTable().getMetaClient().getTableConfig().getTableVersion(), getWriteConfig()) + boolean canUseFileGroupReaderBasedClustering = getWriteConfig().getBooleanOrDefault(HoodieReaderConfig.FILE_GROUP_READER_ENABLED) && getWriteConfig().getBooleanOrDefault(HoodieTableConfig.POPULATE_META_FIELDS) && clusteringOps.stream().allMatch(slice -> StringUtils.isNullOrEmpty(slice.getBootstrapFilePath())); diff --git a/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieReaderConfig.java b/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieReaderConfig.java index f7e1d904896..56935fc5d81 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieReaderConfig.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieReaderConfig.java @@ -19,13 +19,8 @@ package org.apache.hudi.common.config; -import org.apache.hudi.common.table.HoodieTableVersion; -import org.apache.hudi.storage.StorageConfiguration; - import javax.annotation.concurrent.Immutable; -import java.util.Map; - /** * Configurations for reading a file group */ @@ -94,18 +89,4 @@ public class HoodieReaderConfig extends HoodieConfig { "hoodie.write.record.merge.custom.implementation.classes"; public static final String RECORD_MERGE_IMPL_CLASSES_DEPRECATED_WRITE_CONFIG_KEY = "hoodie.datasource.write.record.merger.impls"; - - public static boolean isFileGroupReaderEnabled(HoodieTableVersion tableVersion, HoodieConfig config) { - return tableVersion.greaterThanOrEquals(HoodieTableVersion.EIGHT) && config.getBooleanOrDefault(HoodieReaderConfig.FILE_GROUP_READER_ENABLED); - } - - public static boolean isFileGroupReaderEnabled(HoodieTableVersion tableVersion, Map<String, String> parameters) { - return tableVersion.greaterThanOrEquals(HoodieTableVersion.EIGHT) - && Boolean.parseBoolean(parameters.getOrDefault(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(), HoodieReaderConfig.FILE_GROUP_READER_ENABLED.defaultValue().toString())); - } - - public static boolean isFileGroupReaderEnabled(HoodieTableVersion tableVersion, StorageConfiguration conf) { - return tableVersion.greaterThanOrEquals(HoodieTableVersion.EIGHT) - && conf.getBoolean(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(), HoodieReaderConfig.FILE_GROUP_READER_ENABLED.defaultValue()); - } } diff --git a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/TestHoodieReaderConfig.java b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/TestHoodieReaderConfig.java deleted file mode 100644 index 97f07c56bca..00000000000 --- a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/TestHoodieReaderConfig.java +++ /dev/null @@ -1,66 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hudi.common.table; - -import org.apache.hudi.common.config.HoodieConfig; -import org.apache.hudi.common.config.HoodieReaderConfig; -import org.apache.hudi.storage.hadoop.HadoopStorageConfiguration; - -import org.apache.hadoop.conf.Configuration; -import org.junit.jupiter.api.Test; - -import java.util.HashMap; -import java.util.Map; - -import static org.apache.hudi.common.config.HoodieReaderConfig.FILE_GROUP_READER_ENABLED; -import static org.junit.jupiter.api.Assertions.assertFalse; -import static org.junit.jupiter.api.Assertions.assertTrue; - -public class TestHoodieReaderConfig { - - @Test - public void testIsFileGroupReaderEnabled() { - HoodieConfig config = new HoodieConfig(); - // tbl version 8 by default allows file group reader - assertTrue(HoodieReaderConfig.isFileGroupReaderEnabled(HoodieTableVersion.EIGHT, config)); - // tbl version 6 does not allow file group reader - assertFalse(HoodieReaderConfig.isFileGroupReaderEnabled(HoodieTableVersion.SIX, config)); - config.setValue(FILE_GROUP_READER_ENABLED, "false"); - // tbl version 8 does not allow file group reader when config is set to false - assertFalse(HoodieReaderConfig.isFileGroupReaderEnabled(HoodieTableVersion.EIGHT, config)); - - Map<String, String> map = new HashMap<>(); - // tbl version 8 by default allows file group reader - assertTrue(HoodieReaderConfig.isFileGroupReaderEnabled(HoodieTableVersion.EIGHT, map)); - // tbl version 6 does not allow file group reader - assertFalse(HoodieReaderConfig.isFileGroupReaderEnabled(HoodieTableVersion.SIX, map)); - map.put(FILE_GROUP_READER_ENABLED.key(), "false"); - // tbl version 8 does not allow file group reader when config is set to false - assertFalse(HoodieReaderConfig.isFileGroupReaderEnabled(HoodieTableVersion.EIGHT, map)); - - HadoopStorageConfiguration conf = new HadoopStorageConfiguration(new Configuration()); - // tbl version 8 by default allows file group reader - assertTrue(HoodieReaderConfig.isFileGroupReaderEnabled(HoodieTableVersion.EIGHT, conf)); - // tbl version 6 does not allow file group reader - assertFalse(HoodieReaderConfig.isFileGroupReaderEnabled(HoodieTableVersion.SIX, conf)); - conf.set(FILE_GROUP_READER_ENABLED.key(), "false"); - // tbl version 8 does not allow file group reader when config is set to false - assertFalse(HoodieReaderConfig.isFileGroupReaderEnabled(HoodieTableVersion.EIGHT, conf)); - } -} diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieInputFormatUtils.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieInputFormatUtils.java index 7a067fa19ac..e298703fd27 100644 --- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieInputFormatUtils.java +++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieInputFormatUtils.java @@ -52,7 +52,6 @@ import org.apache.hudi.storage.HoodieStorage; import org.apache.hudi.storage.HoodieStorageUtils; import org.apache.hudi.storage.StoragePath; import org.apache.hudi.storage.StoragePathInfo; -import org.apache.hudi.storage.hadoop.HadoopStorageConfiguration; import org.apache.hudi.storage.hadoop.HoodieHadoopStorage; import org.apache.hadoop.conf.Configuration; @@ -70,7 +69,6 @@ import org.apache.hadoop.mapred.FileInputFormat; import org.apache.hadoop.mapred.FileSplit; import org.apache.hadoop.mapred.InputSplit; import org.apache.hadoop.mapred.JobConf; -import org.apache.hadoop.mapred.lib.CombineFileSplit; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.JobContext; import org.slf4j.Logger; @@ -94,7 +92,6 @@ import static org.apache.hudi.common.config.HoodieMetadataConfig.ENABLE; import static org.apache.hudi.common.table.HoodieTableMetaClient.METAFOLDER_NAME; import static org.apache.hudi.common.table.timeline.TimelineUtils.handleHollowCommitIfNeeded; import static org.apache.hudi.hadoop.fs.HadoopFSUtils.convertToStoragePath; -import static org.apache.hudi.hadoop.fs.HadoopFSUtils.getStorageConf; public class HoodieInputFormatUtils { @@ -529,39 +526,19 @@ public class HoodieInputFormatUtils { return realtimeSplit.getBasePath(); } else { Path inputPath = ((FileSplit) split).getPath(); - return getTablePath(jobConf, inputPath); + FileSystem fs = inputPath.getFileSystem(jobConf); + HoodieStorage storage = new HoodieHadoopStorage(fs); + Option<StoragePath> tablePath = TablePathUtils.getTablePath(storage, convertToStoragePath(inputPath)); + return tablePath.get().toString(); } } - private static String getTablePath(JobConf jobConf, Path inputPath) throws IOException { - FileSystem fs = inputPath.getFileSystem(jobConf); - HoodieStorage storage = new HoodieHadoopStorage(fs); - Option<StoragePath> tablePath = TablePathUtils.getTablePath(storage, convertToStoragePath(inputPath)); - return tablePath.get().toString(); - } - /** * `schema.on.read` and skip merge not implemented */ - public static boolean shouldUseFilegroupReader(final JobConf jobConf, final InputSplit split) throws IOException { - if (split instanceof FileSplit || split instanceof RealtimeSplit) { - HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder().setConf(getStorageConf(jobConf)).setBasePath(getTableBasePath(split, jobConf)).build(); - return HoodieReaderConfig.isFileGroupReaderEnabled(metaClient.getTableConfig().getTableVersion(), new HadoopStorageConfiguration(jobConf)) - && !jobConf.getBoolean(HoodieCommonConfig.SCHEMA_EVOLUTION_ENABLE.key(), HoodieCommonConfig.SCHEMA_EVOLUTION_ENABLE.defaultValue()) - && !(split instanceof BootstrapBaseFileSplit); - } else if (split instanceof CombineFileSplit) { - for (Path path : ((CombineFileSplit) split).getPaths()) { - HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder().setConf(getStorageConf(jobConf)).setBasePath(getTablePath(jobConf, path)).build(); - boolean isFileGroupReaderEnabled = HoodieReaderConfig.isFileGroupReaderEnabled(metaClient.getTableConfig().getTableVersion(), new HadoopStorageConfiguration(jobConf)) - && !jobConf.getBoolean(HoodieCommonConfig.SCHEMA_EVOLUTION_ENABLE.key(), HoodieCommonConfig.SCHEMA_EVOLUTION_ENABLE.defaultValue()) - && !(split instanceof BootstrapBaseFileSplit); - if (!isFileGroupReaderEnabled) { - return false; - } - } - return true; - } else { - return false; - } + public static boolean shouldUseFilegroupReader(final JobConf jobConf, final InputSplit split) { + return jobConf.getBoolean(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(), HoodieReaderConfig.FILE_GROUP_READER_ENABLED.defaultValue()) + && !jobConf.getBoolean(HoodieCommonConfig.SCHEMA_EVOLUTION_ENABLE.key(), HoodieCommonConfig.SCHEMA_EVOLUTION_ENABLE.defaultValue()) + && !(split instanceof BootstrapBaseFileSplit); } } diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala index 5bdd76a8136..a0d7ed059dd 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala @@ -304,8 +304,9 @@ object DefaultSource { Option(schema) } - val useNewParquetFileFormat = !metaClient.isMetadataTable && (globPaths == null || globPaths.isEmpty) && - HoodieReaderConfig.isFileGroupReaderEnabled(metaClient.getTableConfig.getTableVersion, parameters.asJava) + val useNewParquetFileFormat = parameters.getOrElse(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(), + HoodieReaderConfig.FILE_GROUP_READER_ENABLED.defaultValue().toString).toBoolean && + !metaClient.isMetadataTable && (globPaths == null || globPaths.isEmpty) if (metaClient.getCommitsTimeline.filterCompletedInstants.countInstants() == 0) { new EmptyRelation(sqlContext, resolveSchema(metaClient, parameters, Some(schema))) } else if (isCdcQuery) { diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkSqlWriter.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkSqlWriter.scala index 4f4854d497c..f72d0ec5037 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkSqlWriter.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkSqlWriter.scala @@ -634,11 +634,15 @@ def testBulkInsertForDropPartitionColumn(): Unit = { */ @ParameterizedTest @CsvSource(value = Array( - "COPY_ON_WRITE", - "MERGE_ON_READ" + "COPY_ON_WRITE,6", + "COPY_ON_WRITE,8", + "MERGE_ON_READ,6", + "MERGE_ON_READ,8" )) - def testSchemaEvolutionForTableType(tableType: String): Unit = { - val opts = getCommonParams(tempPath, hoodieFooTableName, tableType) + def testSchemaEvolutionForTableType(tableType: String, tableVersion: Int): Unit = { + var opts = getCommonParams(tempPath, hoodieFooTableName, tableType) + opts = opts + (HoodieTableConfig.VERSION.key() -> tableVersion.toString, + HoodieWriteConfig.WRITE_TABLE_VERSION.key() -> tableVersion.toString) // Create new table // NOTE: We disable Schema Reconciliation by default (such that Writer's
