This is an automated email from the ASF dual-hosted git repository. lzljs3620320 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/paimon.git
commit ab46437b29ee30f7430eea2610773956ae67adb2 Author: yuzelin <[email protected]> AuthorDate: Fri May 9 17:12:15 2025 +0800 [flink] Clone action support Hudi table This closes #5586 --- .../apache/paimon/flink/clone/CloneFileInfo.java | 2 +- .../org/apache/paimon/flink/clone/CloneUtils.java | 2 +- .../paimon/flink/clone/ListCloneFilesFunction.java | 27 ++- paimon-hive/paimon-hive-catalog/pom.xml | 29 +++ .../hive/{migrate => clone}/HiveCloneUtils.java | 53 ++++- .../{migrate => clone}/HivePartitionFiles.java | 2 +- .../apache/paimon/hive/migrate/HiveMigrator.java | 4 +- .../java/org/apache/paimon/hudi/FileIndex.java | 253 +++++++++++++++++++++ .../org/apache/paimon/hudi/HudiCloneUtils.java | 78 +++++++ paimon-hive/paimon-hive-connector-common/pom.xml | 31 +++ .../paimon/hudi/CloneActionForHudiITCase.java | 195 ++++++++++++++++ pom.xml | 1 + 12 files changed, 657 insertions(+), 20 deletions(-) diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/CloneFileInfo.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/CloneFileInfo.java index f0c3382b02..649a509d62 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/CloneFileInfo.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/CloneFileInfo.java @@ -21,7 +21,7 @@ package org.apache.paimon.flink.clone; import org.apache.paimon.catalog.Identifier; import org.apache.paimon.data.BinaryRow; import org.apache.paimon.fs.Path; -import org.apache.paimon.hive.migrate.HivePartitionFiles; +import org.apache.paimon.hive.clone.HivePartitionFiles; import java.io.Serializable; import java.util.ArrayList; diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/CloneUtils.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/CloneUtils.java index 6dbe35ee85..5a53c39fa2 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/CloneUtils.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/CloneUtils.java @@ -23,7 +23,7 @@ import org.apache.paimon.catalog.DelegateCatalog; import org.apache.paimon.catalog.Identifier; import org.apache.paimon.flink.action.CloneAction; import org.apache.paimon.hive.HiveCatalog; -import org.apache.paimon.hive.migrate.HiveCloneUtils; +import org.apache.paimon.hive.clone.HiveCloneUtils; import org.apache.paimon.table.sink.ChannelComputer; import org.apache.paimon.utils.StringUtils; diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/ListCloneFilesFunction.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/ListCloneFilesFunction.java index 8f0592302f..40dd2d53c6 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/ListCloneFilesFunction.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/ListCloneFilesFunction.java @@ -23,8 +23,8 @@ import org.apache.paimon.annotation.VisibleForTesting; import org.apache.paimon.catalog.Catalog; import org.apache.paimon.catalog.Identifier; import org.apache.paimon.flink.predicate.SimpleSqlPredicateConvertor; -import org.apache.paimon.hive.migrate.HiveCloneUtils; -import org.apache.paimon.hive.migrate.HivePartitionFiles; +import org.apache.paimon.hive.clone.HiveCloneUtils; +import org.apache.paimon.hive.clone.HivePartitionFiles; import org.apache.paimon.options.CatalogOptions; import org.apache.paimon.partition.PartitionPredicate; import org.apache.paimon.predicate.Predicate; @@ -111,15 +111,20 @@ public class ListCloneFilesFunction PartitionPredicate predicate = getPartitionPredicate(whereSql, table.schema().logicalPartitionType(), tuple.f0); - List<HivePartitionFiles> allPartitions = - HiveCloneUtils.listFiles( - hiveCatalog, - tuple.f0, - table.schema().logicalPartitionType(), - table.coreOptions().partitionDefaultName(), - predicate); - for (HivePartitionFiles partitionFiles : allPartitions) { - CloneFileInfo.fromHive(tuple.f1, partitionFiles).forEach(collector::collect); + try { + List<HivePartitionFiles> allPartitions = + HiveCloneUtils.listFiles( + hiveCatalog, + tuple.f0, + table.schema().logicalPartitionType(), + table.coreOptions().partitionDefaultName(), + predicate); + for (HivePartitionFiles partitionFiles : allPartitions) { + CloneFileInfo.fromHive(tuple.f1, partitionFiles).forEach(collector::collect); + } + } catch (Exception e) { + throw new Exception( + "Failed to list clone files for table " + tuple.f0.getFullName(), e); } } diff --git a/paimon-hive/paimon-hive-catalog/pom.xml b/paimon-hive/paimon-hive-catalog/pom.xml index 5c49a76f1d..fc94856633 100644 --- a/paimon-hive/paimon-hive-catalog/pom.xml +++ b/paimon-hive/paimon-hive-catalog/pom.xml @@ -112,6 +112,35 @@ under the License. </exclusions> </dependency> + <!-- for hudi clone --> + <dependency> + <groupId>org.apache.hudi</groupId> + <artifactId>hudi-common</artifactId> + <version>${hudi.version}</version> + <scope>provided</scope> + </dependency> + + <dependency> + <groupId>org.apache.hudi</groupId> + <artifactId>hudi-flink</artifactId> + <version>${hudi.version}</version> + <scope>provided</scope> + </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-core</artifactId> + <version>${paimon-flink-common.flink.version}</version> + <scope>provided</scope> + </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-table-common</artifactId> + <version>${paimon-flink-common.flink.version}</version> + <scope>provided</scope> + </dependency> + <!-- Test --> <dependency> diff --git a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/migrate/HiveCloneUtils.java b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/clone/HiveCloneUtils.java similarity index 85% rename from paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/migrate/HiveCloneUtils.java rename to paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/clone/HiveCloneUtils.java index 6f88cd32ea..b3e53cd8d3 100644 --- a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/migrate/HiveCloneUtils.java +++ b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/clone/HiveCloneUtils.java @@ -16,7 +16,7 @@ * limitations under the License. */ -package org.apache.paimon.hive.migrate; +package org.apache.paimon.hive.clone; import org.apache.paimon.catalog.Identifier; import org.apache.paimon.data.BinaryRow; @@ -25,6 +25,7 @@ import org.apache.paimon.fs.FileIO; import org.apache.paimon.fs.FileStatus; import org.apache.paimon.fs.Path; import org.apache.paimon.hive.HiveCatalog; +import org.apache.paimon.hudi.HudiCloneUtils; import org.apache.paimon.migrate.FileMetaUtils; import org.apache.paimon.partition.PartitionPredicate; import org.apache.paimon.schema.Schema; @@ -37,6 +38,7 @@ import org.apache.hadoop.hive.metastore.api.Partition; import org.apache.hadoop.hive.metastore.api.PrimaryKeysRequest; import org.apache.hadoop.hive.metastore.api.StorageDescriptor; import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hudi.common.model.HoodieRecord; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -50,6 +52,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.Set; import java.util.function.Predicate; import java.util.stream.Collectors; @@ -115,7 +118,7 @@ public class HiveCloneUtils { } Table hiveTable = client.getTable(database, table); - List<FieldSchema> fields = client.getSchema(database, table); + List<FieldSchema> fields = getHiveSchema(client, hiveTable, database, table); List<FieldSchema> partitionFields = hiveTable.getPartitionKeys(); Map<String, String> hiveTableOptions = hiveTable.getParameters(); @@ -158,6 +161,23 @@ public class HiveCloneUtils { return schemaBuilder.build(); } + private static List<FieldSchema> getHiveSchema( + IMetaStoreClient client, Table hiveTable, String database, String table) + throws Exception { + List<FieldSchema> fields = client.getSchema(database, table); + if (!HudiCloneUtils.isHoodieTable(hiveTable)) { + return fields; + } + + Set<String> hudiMetadataFields = + Arrays.stream(HoodieRecord.HoodieMetadataField.values()) + .map(HoodieRecord.HoodieMetadataField::getFieldName) + .collect(Collectors.toSet()); + return fields.stream() + .filter(f -> !hudiMetadataFields.contains(f.getName())) + .collect(Collectors.toList()); + } + public static List<HivePartitionFiles> listFiles( HiveCatalog hiveCatalog, Identifier identifier, @@ -168,6 +188,31 @@ public class HiveCloneUtils { IMetaStoreClient client = hiveCatalog.getHmsClient(); Table sourceTable = client.getTable(identifier.getDatabaseName(), identifier.getTableName()); + + if (HudiCloneUtils.isHoodieTable(sourceTable)) { + return HudiCloneUtils.listFiles( + sourceTable, hiveCatalog.fileIO(), partitionRowType, predicate); + } else { + return listFromPureHiveTable( + client, + identifier, + sourceTable, + hiveCatalog.fileIO(), + partitionRowType, + defaultPartitionName, + predicate); + } + } + + private static List<HivePartitionFiles> listFromPureHiveTable( + IMetaStoreClient client, + Identifier identifier, + Table sourceTable, + FileIO fileIO, + RowType partitionRowType, + String defaultPartitionName, + @Nullable PartitionPredicate predicate) + throws Exception { List<Partition> partitions = client.listPartitions( identifier.getDatabaseName(), identifier.getTableName(), Short.MAX_VALUE); @@ -176,7 +221,7 @@ public class HiveCloneUtils { if (partitions.isEmpty()) { String location = sourceTable.getSd().getLocation(); return Collections.singletonList( - listFiles(hiveCatalog.fileIO(), location, BinaryRow.EMPTY_ROW, format)); + listFiles(fileIO, location, BinaryRow.EMPTY_ROW, format)); } else { List<BinaryWriter.ValueSetter> valueSetters = new ArrayList<>(); partitionRowType @@ -193,7 +238,7 @@ public class HiveCloneUtils { valueSetters, defaultPartitionName); if (predicate == null || predicate.test(partitionRow)) { - results.add(listFiles(hiveCatalog.fileIO(), location, partitionRow, format)); + results.add(listFiles(fileIO, location, partitionRow, format)); } } return results; diff --git a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/migrate/HivePartitionFiles.java b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/clone/HivePartitionFiles.java similarity index 97% rename from paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/migrate/HivePartitionFiles.java rename to paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/clone/HivePartitionFiles.java index db657fe4cc..1dfef1670c 100644 --- a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/migrate/HivePartitionFiles.java +++ b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/clone/HivePartitionFiles.java @@ -16,7 +16,7 @@ * limitations under the License. */ -package org.apache.paimon.hive.migrate; +package org.apache.paimon.hive.clone; import org.apache.paimon.data.BinaryRow; import org.apache.paimon.fs.Path; diff --git a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/migrate/HiveMigrator.java b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/migrate/HiveMigrator.java index 56c985f899..ca8c56b2eb 100644 --- a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/migrate/HiveMigrator.java +++ b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/migrate/HiveMigrator.java @@ -58,8 +58,8 @@ import java.util.concurrent.ThreadPoolExecutor; import java.util.stream.Collectors; import static org.apache.paimon.hive.HiveTypeUtils.toPaimonType; -import static org.apache.paimon.hive.migrate.HiveCloneUtils.HIDDEN_PATH_FILTER; -import static org.apache.paimon.hive.migrate.HiveCloneUtils.parseFormat; +import static org.apache.paimon.hive.clone.HiveCloneUtils.HIDDEN_PATH_FILTER; +import static org.apache.paimon.hive.clone.HiveCloneUtils.parseFormat; import static org.apache.paimon.utils.Preconditions.checkArgument; import static org.apache.paimon.utils.ThreadPoolUtils.createCachedThreadPool; diff --git a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hudi/FileIndex.java b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hudi/FileIndex.java new file mode 100644 index 0000000000..7dbdbafc28 --- /dev/null +++ b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hudi/FileIndex.java @@ -0,0 +1,253 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.hudi; + +import org.apache.paimon.data.BinaryRow; +import org.apache.paimon.data.BinaryWriter; +import org.apache.paimon.fs.FileIO; +import org.apache.paimon.hive.clone.HivePartitionFiles; +import org.apache.paimon.migrate.FileMetaUtils; +import org.apache.paimon.partition.PartitionPredicate; +import org.apache.paimon.types.RowType; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hudi.client.common.HoodieFlinkEngineContext; +import org.apache.hudi.common.config.HoodieMetadataConfig; +import org.apache.hudi.common.engine.HoodieEngineContext; +import org.apache.hudi.common.fs.FSUtils; +import org.apache.hudi.common.model.HoodieBaseFile; +import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.table.view.HoodieTableFileSystemView; +import org.apache.hudi.common.util.PartitionPathEncodeUtils; +import org.apache.hudi.configuration.HadoopConfigurations; +import org.apache.hudi.metadata.HoodieTableMetadata; +import org.apache.hudi.storage.HoodieStorage; +import org.apache.hudi.storage.StoragePath; +import org.apache.hudi.storage.StoragePathInfo; +import org.apache.hudi.storage.hadoop.HoodieHadoopStorage; +import org.apache.hudi.util.StreamerUtil; + +import javax.annotation.Nullable; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.stream.Collectors; + +import static org.apache.hudi.configuration.FlinkOptions.METADATA_ENABLED; +import static org.apache.paimon.utils.Preconditions.checkState; + +/** + * A file index which supports listing files efficiently through metadata table. + * + * @see org.apache.hudi.source.FileIndex + */ +public class FileIndex { + + private final Path path; + private final HoodieMetadataConfig metadataConfig; + private final Configuration hadoopConf; + private final RowType partitionType; + @Nullable private final PartitionPredicate partitionPredicate; + private final HoodieEngineContext engineContext; + private final HoodieStorage storage; + private final HoodieTableMetaClient metaClient; + private final List<BinaryWriter.ValueSetter> valueSetters; + + private List<String> partitionPaths; + + public FileIndex( + String location, + Map<String, String> conf, + RowType partitionType, + @Nullable PartitionPredicate partitionPredicate) { + this.path = new Path(location); + this.metadataConfig = metadataConfig(conf); + this.hadoopConf = + HadoopConfigurations.getHadoopConf( + org.apache.flink.configuration.Configuration.fromMap(conf)); + this.partitionType = partitionType; + this.partitionPredicate = partitionPredicate; + this.engineContext = new HoodieFlinkEngineContext(hadoopConf); + this.storage = new HoodieHadoopStorage(path, hadoopConf); + this.metaClient = StreamerUtil.createMetaClient(location, hadoopConf); + this.valueSetters = new ArrayList<>(); + partitionType + .getFieldTypes() + .forEach(type -> this.valueSetters.add(BinaryWriter.createValueSetter(type))); + } + + public boolean isPartitioned() { + List<String> partitionPaths = getAllPartitionPaths(); + if (partitionPaths.size() == 1 && partitionPaths.get(0).isEmpty()) { + checkState( + partitionType.getFieldCount() == 0, + "Hudi table is non-partitioned but partition type isn't empty."); + return false; + } else { + checkState( + partitionType.getFieldCount() >= 0, + "Hudi table is partitioned but partition type is empty."); + return true; + } + } + + public List<HivePartitionFiles> getAllFilteredPartitionFiles(FileIO fileIO) { + Map<String, BinaryRow> pathToRowMap = new HashMap<>(); + + for (String partitionPath : getAllPartitionPaths()) { + BinaryRow partitionRow = toPartitionRow(partitionPath); + if (partitionPredicate == null || partitionPredicate.test(partitionRow)) { + pathToRowMap.put(fullPartitionPath(partitionPath), partitionRow); + } + } + + Map<String, List<StoragePathInfo>> filePathInfos; + try (HoodieTableMetadata tableMetadata = + HoodieTableMetadata.create( + engineContext, storage, metadataConfig, path.toString())) { + filePathInfos = tableMetadata.getAllFilesInPartitions(pathToRowMap.keySet()); + } catch (Exception e) { + throw new RuntimeException(e); + } + + List<HivePartitionFiles> result = new ArrayList<>(filePathInfos.size()); + for (Map.Entry<String, List<StoragePathInfo>> filePathInfoEntry : + filePathInfos.entrySet()) { + result.add( + toPartitionFiles( + getBaseFiles(filePathInfoEntry.getValue()), + pathToRowMap.get(filePathInfoEntry.getKey()), + fileIO)); + } + return result; + } + + public HivePartitionFiles getUnpartitionedFiles(FileIO fileIO) { + List<StoragePathInfo> filePathInfos; + try (HoodieTableMetadata tableMetadata = + HoodieTableMetadata.create( + engineContext, storage, metadataConfig, path.toString())) { + filePathInfos = tableMetadata.getAllFilesInPartition(new StoragePath(path.toUri())); + } catch (Exception e) { + throw new RuntimeException(e); + } + return toPartitionFiles(getBaseFiles(filePathInfos), BinaryRow.EMPTY_ROW, fileIO); + } + + private List<HoodieBaseFile> getBaseFiles(List<StoragePathInfo> filePathInfos) { + HoodieTableFileSystemView fsView = + new HoodieTableFileSystemView( + metaClient, + metaClient.getCommitsAndCompactionTimeline().filterCompletedInstants(), + filePathInfos); + return fsView.getLatestBaseFiles().collect(Collectors.toList()); + } + + private BinaryRow toPartitionRow(String partitionPath) { + List<String> partitionValues = extractPartitionValues(partitionPath); + return FileMetaUtils.writePartitionValue( + partitionType, + partitionValues, + valueSetters, + // TODO: different engine may use different name, pass correct name later + PartitionPathEncodeUtils.DEFAULT_PARTITION_PATH); + } + + private List<String> extractPartitionValues(String partitionPath) { + String[] paths = partitionPath.split(StoragePath.SEPARATOR); + // TODO: because we only support table in HMS, the partition is hive-style by default + // pt1=v1/pt2=v2 + List<String> partitionValues = new ArrayList<>(); + Arrays.stream(paths) + .forEach( + p -> { + String[] kv = p.split("="); + if (kv.length == 2) { + partitionValues.add(kv[1]); + } else { + throw new RuntimeException( + "Wrong hudi partition path: " + partitionPath); + } + }); + return partitionValues; + } + + private String fullPartitionPath(String partitionPath) { + return new Path(path, partitionPath).toString(); + } + + private List<String> getAllPartitionPaths() { + if (partitionPaths == null) { + partitionPaths = + FSUtils.getAllPartitionPaths( + engineContext, storage, metadataConfig, path.toString()); + } + return partitionPaths; + } + + private static HoodieMetadataConfig metadataConfig(Map<String, String> conf) { + Properties properties = new Properties(); + + // set up metadata.enabled=true in table DDL to enable metadata listing + boolean enable = + org.apache.flink.configuration.Configuration.fromMap(conf).get(METADATA_ENABLED); + properties.put(HoodieMetadataConfig.ENABLE.key(), enable); + + return HoodieMetadataConfig.newBuilder().fromProperties(properties).build(); + } + + private HivePartitionFiles toPartitionFiles( + List<HoodieBaseFile> hoodieBaseFiles, BinaryRow partition, FileIO fileIO) { + List<org.apache.paimon.fs.Path> paths = new ArrayList<>(hoodieBaseFiles.size()); + List<Long> fileSizes = new ArrayList<>(hoodieBaseFiles.size()); + String format = null; + for (HoodieBaseFile baseFile : hoodieBaseFiles) { + org.apache.paimon.fs.Path path = new org.apache.paimon.fs.Path(baseFile.getPath()); + if (format == null) { + format = parseFormat(path.toString()); + } + long fileSize = baseFile.getFileSize(); + if (fileSize == -1) { + try { + fileSize = fileIO.getFileSize(path); + } catch (IOException ignored) { + } + } + paths.add(path); + fileSizes.add(fileSize); + } + return new HivePartitionFiles(partition, paths, fileSizes, format); + } + + private String parseFormat(String path) { + if (path.endsWith(".parquet")) { + return "parquet"; + } else if (path.endsWith(".orc")) { + return "orc"; + } else { + throw new RuntimeException("Cannot extract format from file " + path); + } + } +} diff --git a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hudi/HudiCloneUtils.java b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hudi/HudiCloneUtils.java new file mode 100644 index 0000000000..2ab091411a --- /dev/null +++ b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hudi/HudiCloneUtils.java @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.hudi; + +import org.apache.paimon.fs.FileIO; +import org.apache.paimon.hive.clone.HivePartitionFiles; +import org.apache.paimon.partition.PartitionPredicate; +import org.apache.paimon.types.RowType; + +import org.apache.flink.configuration.Configuration; +import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hudi.common.model.HoodieTableType; +import org.apache.hudi.table.catalog.TableOptionProperties; + +import javax.annotation.Nullable; + +import java.util.Collections; +import java.util.List; +import java.util.Map; + +import static org.apache.hudi.configuration.FlinkOptions.TABLE_TYPE; +import static org.apache.paimon.utils.Preconditions.checkArgument; + +/** + * Utils for cloning Hudi tables. Currently, only support COW Hudi tables in Hive metastore, and it + * will be cloned to unaware-bucket table. + */ +public class HudiCloneUtils { + + public static boolean isHoodieTable(Table hiveTable) { + return hiveTable + .getParameters() + .getOrDefault(TableOptionProperties.SPARK_SOURCE_PROVIDER, "") + .equalsIgnoreCase("hudi"); + } + + public static List<HivePartitionFiles> listFiles( + Table hudiHiveTable, + FileIO fileIO, + RowType partitionType, + @Nullable PartitionPredicate partitionPredicate) { + Map<String, String> options = hudiHiveTable.getParameters(); + checkTableType(options); + + String location = hudiHiveTable.getSd().getLocation(); + FileIndex fileIndex = new FileIndex(location, options, partitionType, partitionPredicate); + + if (fileIndex.isPartitioned()) { + return fileIndex.getAllFilteredPartitionFiles(fileIO); + } else { + return Collections.singletonList(fileIndex.getUnpartitionedFiles(fileIO)); + } + } + + private static void checkTableType(Map<String, String> conf) { + String type = Configuration.fromMap(conf).get(TABLE_TYPE); + checkArgument( + HoodieTableType.valueOf(type) == HoodieTableType.COPY_ON_WRITE, + "Only Hudi COW table is supported yet but found %s table.", + type); + } +} diff --git a/paimon-hive/paimon-hive-connector-common/pom.xml b/paimon-hive/paimon-hive-connector-common/pom.xml index 4fdaf704ec..bdb1e5781f 100644 --- a/paimon-hive/paimon-hive-connector-common/pom.xml +++ b/paimon-hive/paimon-hive-connector-common/pom.xml @@ -566,6 +566,37 @@ under the License. </exclusions> </dependency> + <!-- for huid clone test --> + <dependency> + <groupId>org.apache.hudi</groupId> + <artifactId>hudi-flink1.18-bundle</artifactId> + <version>${hudi.version}</version> + <scope>test</scope> + </dependency> + + <!-- this is for huid clone test, and it should be put before hive-exec + for the same reason as org.apache.avro:avro. --> + <dependency> + <groupId>org.apache.parquet</groupId> + <artifactId>parquet-avro</artifactId> + <version>${parquet.version}</version> + <scope>test</scope> + <exclusions> + <exclusion> + <groupId>org.apache.parquet</groupId> + <artifactId>parquet-column</artifactId> + </exclusion> + <exclusion> + <groupId>org.apache.parquet</groupId> + <artifactId>parquet-common</artifactId> + </exclusion> + <exclusion> + <groupId>org.apache.parquet</groupId> + <artifactId>parquet-hadoop</artifactId> + </exclusion> + </exclusions> + </dependency> + <!-- Why we need this test dependency: IDEA reads classes from the same project from target/classes of that module, diff --git a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hudi/CloneActionForHudiITCase.java b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hudi/CloneActionForHudiITCase.java new file mode 100644 index 0000000000..decec2e136 --- /dev/null +++ b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hudi/CloneActionForHudiITCase.java @@ -0,0 +1,195 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.hudi; + +import org.apache.paimon.flink.action.ActionITCaseBase; +import org.apache.paimon.flink.action.CloneAction; +import org.apache.paimon.hive.TestHiveMetastore; + +import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableList; + +import org.apache.flink.table.api.TableEnvironment; +import org.apache.flink.types.Row; +import org.apache.hadoop.hive.conf.HiveConf; +import org.assertj.core.api.Assertions; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.UUID; + +/** Test clone Hudi table. */ +public class CloneActionForHudiITCase extends ActionITCaseBase { + + private static final TestHiveMetastore TEST_HIVE_METASTORE = new TestHiveMetastore(); + + private static final int PORT = 9089; + + @BeforeEach + public void beforeEach() throws IOException { + super.before(); + TEST_HIVE_METASTORE.start(PORT); + } + + @AfterEach + public void afterEach() throws Exception { + super.after(); + TEST_HIVE_METASTORE.stop(); + } + + @Test + public void testMigrateOneNonPartitionedTable() throws Exception { + TableEnvironment tEnv = tableEnvironmentBuilder().batchMode().build(); + + tEnv.executeSql( + "CREATE TABLE hudi_table (" + + " id STRING PRIMARY KEY NOT ENFORCED," + + " name STRING," + + " price INT" + + ") WITH (" + + " 'connector' = 'hudi'," + + String.format( + "'path' = '%s/%s/hudi_table',", + System.getProperty(HiveConf.ConfVars.METASTOREWAREHOUSE.varname), + UUID.randomUUID()) + + " 'table.type' = 'COPY_ON_WRITE'," + + " 'hive_sync.enable' = 'true'," + + " 'hive_sync.mode' = 'hms'," + + String.format( + "'hive_sync.metastore.uris' = 'thrift://localhost:%s',", PORT) + + " 'hive_sync.db' = 'default'," + + " 'hive_sync.table' = 'hudi_table'" + + ")"); + + List<String> insertValues = new ArrayList<>(); + for (int i = 0; i < 50; i++) { + insertValues.add(String.format("('%s', '%s', %s)", i, "A", i)); + } + tEnv.executeSql("INSERT INTO hudi_table VALUES " + String.join(",", insertValues)).await(); + + // test pk + insertValues.clear(); + for (int i = 0; i < 50; i++) { + insertValues.add(String.format("('%s', '%s', %s)", i, "B", i)); + } + tEnv.executeSql("INSERT INTO hudi_table VALUES " + String.join(",", insertValues)).await(); + List<Row> r1 = ImmutableList.copyOf(tEnv.executeSql("SELECT * FROM hudi_table").collect()); + + tEnv.executeSql( + "CREATE CATALOG PAIMON WITH ('type'='paimon', 'warehouse' = '" + warehouse + "')"); + tEnv.useCatalog("PAIMON"); + tEnv.executeSql("CREATE DATABASE test"); + + createAction( + CloneAction.class, + "clone", + "--database", + "default", + "--table", + "hudi_table", + "--catalog_conf", + "metastore=hive", + "--catalog_conf", + "uri=thrift://localhost:" + PORT, + "--target_database", + "test", + "--target_table", + "test_table", + "--target_catalog_conf", + "warehouse=" + warehouse) + .run(); + + List<Row> r2 = + ImmutableList.copyOf(tEnv.executeSql("SELECT * FROM test.test_table").collect()); + Assertions.assertThatList(r1).containsExactlyInAnyOrderElementsOf(r2); + } + + @Test + public void testMigrateOnePartitionedTable() throws Exception { + TableEnvironment tEnv = tableEnvironmentBuilder().batchMode().build(); + + tEnv.executeSql( + "CREATE TABLE hudi_table (" + + " id STRING PRIMARY KEY NOT ENFORCED," + + " name STRING," + + " pt STRING" + + ") PARTITIONED BY (pt) WITH (" + + " 'connector' = 'hudi'," + + String.format( + "'path' = '%s/%s/hudi_table',", + System.getProperty(HiveConf.ConfVars.METASTOREWAREHOUSE.varname), + UUID.randomUUID()) + + " 'table.type' = 'COPY_ON_WRITE'," + + " 'hive_sync.enable' = 'true'," + + " 'hive_sync.mode' = 'hms'," + + String.format( + "'hive_sync.metastore.uris' = 'thrift://localhost:%s',", PORT) + + " 'hive_sync.db' = 'default'," + + " 'hive_sync.table' = 'hudi_table'," + + " 'hive_sync.partition_fields' = 'pt'," + + " 'hoodie.datasource.write.hive_style_partitioning' = 'true'," + + " 'hive_sync.partition_extractor_class' = 'org.apache.hudi.hive.HiveStylePartitionValueExtractor'" + + ")"); + + List<String> insertValues = new ArrayList<>(); + for (int i = 0; i < 50; i++) { + insertValues.add(String.format("('%s', '%s', '%s')", i, "A", "2025-01-01")); + } + tEnv.executeSql("insert into hudi_table values " + String.join(",", insertValues)).await(); + + // test pk + insertValues.clear(); + for (int i = 0; i < 50; i++) { + insertValues.add(String.format("('%s', '%s', '%s')", i, "B", "2025-01-01")); + } + tEnv.executeSql("INSERT INTO hudi_table VALUES " + String.join(",", insertValues)).await(); + List<Row> r1 = ImmutableList.copyOf(tEnv.executeSql("SELECT * FROM hudi_table").collect()); + + tEnv.executeSql( + "CREATE CATALOG PAIMON WITH ('type'='paimon', 'warehouse' = '" + warehouse + "')"); + tEnv.useCatalog("PAIMON"); + tEnv.executeSql("CREATE DATABASE test"); + + createAction( + CloneAction.class, + "clone", + "--database", + "default", + "--table", + "hudi_table", + "--catalog_conf", + "metastore=hive", + "--catalog_conf", + "uri=thrift://localhost:" + PORT, + "--target_database", + "test", + "--target_table", + "test_table", + "--target_catalog_conf", + "warehouse=" + warehouse) + .run(); + + List<Row> r2 = + ImmutableList.copyOf(tEnv.executeSql("SELECT * FROM test.test_table").collect()); + Assertions.assertThatList(r1).containsExactlyInAnyOrderElementsOf(r2); + } +} diff --git a/pom.xml b/pom.xml index 87eb328824..82b472d699 100644 --- a/pom.xml +++ b/pom.xml @@ -106,6 +106,7 @@ under the License. <flink.reuseForks>true</flink.reuseForks> <testcontainers.version>1.19.1</testcontainers.version> <iceberg.version>1.6.1</iceberg.version> + <hudi.version>0.15.0</hudi.version> <parquet.version>1.15.1</parquet.version> <orc.version>1.9.2</orc.version> <protobuf-java.version>3.19.6</protobuf-java.version>
