This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git

commit ab46437b29ee30f7430eea2610773956ae67adb2
Author: yuzelin <[email protected]>
AuthorDate: Fri May 9 17:12:15 2025 +0800

    [flink] Clone action support Hudi table
    
    This closes #5586
---
 .../apache/paimon/flink/clone/CloneFileInfo.java   |   2 +-
 .../org/apache/paimon/flink/clone/CloneUtils.java  |   2 +-
 .../paimon/flink/clone/ListCloneFilesFunction.java |  27 ++-
 paimon-hive/paimon-hive-catalog/pom.xml            |  29 +++
 .../hive/{migrate => clone}/HiveCloneUtils.java    |  53 ++++-
 .../{migrate => clone}/HivePartitionFiles.java     |   2 +-
 .../apache/paimon/hive/migrate/HiveMigrator.java   |   4 +-
 .../java/org/apache/paimon/hudi/FileIndex.java     | 253 +++++++++++++++++++++
 .../org/apache/paimon/hudi/HudiCloneUtils.java     |  78 +++++++
 paimon-hive/paimon-hive-connector-common/pom.xml   |  31 +++
 .../paimon/hudi/CloneActionForHudiITCase.java      | 195 ++++++++++++++++
 pom.xml                                            |   1 +
 12 files changed, 657 insertions(+), 20 deletions(-)

diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/CloneFileInfo.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/CloneFileInfo.java
index f0c3382b02..649a509d62 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/CloneFileInfo.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/CloneFileInfo.java
@@ -21,7 +21,7 @@ package org.apache.paimon.flink.clone;
 import org.apache.paimon.catalog.Identifier;
 import org.apache.paimon.data.BinaryRow;
 import org.apache.paimon.fs.Path;
-import org.apache.paimon.hive.migrate.HivePartitionFiles;
+import org.apache.paimon.hive.clone.HivePartitionFiles;
 
 import java.io.Serializable;
 import java.util.ArrayList;
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/CloneUtils.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/CloneUtils.java
index 6dbe35ee85..5a53c39fa2 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/CloneUtils.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/CloneUtils.java
@@ -23,7 +23,7 @@ import org.apache.paimon.catalog.DelegateCatalog;
 import org.apache.paimon.catalog.Identifier;
 import org.apache.paimon.flink.action.CloneAction;
 import org.apache.paimon.hive.HiveCatalog;
-import org.apache.paimon.hive.migrate.HiveCloneUtils;
+import org.apache.paimon.hive.clone.HiveCloneUtils;
 import org.apache.paimon.table.sink.ChannelComputer;
 import org.apache.paimon.utils.StringUtils;
 
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/ListCloneFilesFunction.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/ListCloneFilesFunction.java
index 8f0592302f..40dd2d53c6 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/ListCloneFilesFunction.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/ListCloneFilesFunction.java
@@ -23,8 +23,8 @@ import org.apache.paimon.annotation.VisibleForTesting;
 import org.apache.paimon.catalog.Catalog;
 import org.apache.paimon.catalog.Identifier;
 import org.apache.paimon.flink.predicate.SimpleSqlPredicateConvertor;
-import org.apache.paimon.hive.migrate.HiveCloneUtils;
-import org.apache.paimon.hive.migrate.HivePartitionFiles;
+import org.apache.paimon.hive.clone.HiveCloneUtils;
+import org.apache.paimon.hive.clone.HivePartitionFiles;
 import org.apache.paimon.options.CatalogOptions;
 import org.apache.paimon.partition.PartitionPredicate;
 import org.apache.paimon.predicate.Predicate;
@@ -111,15 +111,20 @@ public class ListCloneFilesFunction
         PartitionPredicate predicate =
                 getPartitionPredicate(whereSql, 
table.schema().logicalPartitionType(), tuple.f0);
 
-        List<HivePartitionFiles> allPartitions =
-                HiveCloneUtils.listFiles(
-                        hiveCatalog,
-                        tuple.f0,
-                        table.schema().logicalPartitionType(),
-                        table.coreOptions().partitionDefaultName(),
-                        predicate);
-        for (HivePartitionFiles partitionFiles : allPartitions) {
-            CloneFileInfo.fromHive(tuple.f1, 
partitionFiles).forEach(collector::collect);
+        try {
+            List<HivePartitionFiles> allPartitions =
+                    HiveCloneUtils.listFiles(
+                            hiveCatalog,
+                            tuple.f0,
+                            table.schema().logicalPartitionType(),
+                            table.coreOptions().partitionDefaultName(),
+                            predicate);
+            for (HivePartitionFiles partitionFiles : allPartitions) {
+                CloneFileInfo.fromHive(tuple.f1, 
partitionFiles).forEach(collector::collect);
+            }
+        } catch (Exception e) {
+            throw new Exception(
+                    "Failed to list clone files for table " + 
tuple.f0.getFullName(), e);
         }
     }
 
diff --git a/paimon-hive/paimon-hive-catalog/pom.xml 
b/paimon-hive/paimon-hive-catalog/pom.xml
index 5c49a76f1d..fc94856633 100644
--- a/paimon-hive/paimon-hive-catalog/pom.xml
+++ b/paimon-hive/paimon-hive-catalog/pom.xml
@@ -112,6 +112,35 @@ under the License.
             </exclusions>
         </dependency>
 
+        <!-- for hudi clone -->
+        <dependency>
+            <groupId>org.apache.hudi</groupId>
+            <artifactId>hudi-common</artifactId>
+            <version>${hudi.version}</version>
+            <scope>provided</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.hudi</groupId>
+            <artifactId>hudi-flink</artifactId>
+            <version>${hudi.version}</version>
+            <scope>provided</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-core</artifactId>
+            <version>${paimon-flink-common.flink.version}</version>
+            <scope>provided</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-table-common</artifactId>
+            <version>${paimon-flink-common.flink.version}</version>
+            <scope>provided</scope>
+        </dependency>
+
         <!-- Test -->
 
         <dependency>
diff --git 
a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/migrate/HiveCloneUtils.java
 
b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/clone/HiveCloneUtils.java
similarity index 85%
rename from 
paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/migrate/HiveCloneUtils.java
rename to 
paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/clone/HiveCloneUtils.java
index 6f88cd32ea..b3e53cd8d3 100644
--- 
a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/migrate/HiveCloneUtils.java
+++ 
b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/clone/HiveCloneUtils.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.paimon.hive.migrate;
+package org.apache.paimon.hive.clone;
 
 import org.apache.paimon.catalog.Identifier;
 import org.apache.paimon.data.BinaryRow;
@@ -25,6 +25,7 @@ import org.apache.paimon.fs.FileIO;
 import org.apache.paimon.fs.FileStatus;
 import org.apache.paimon.fs.Path;
 import org.apache.paimon.hive.HiveCatalog;
+import org.apache.paimon.hudi.HudiCloneUtils;
 import org.apache.paimon.migrate.FileMetaUtils;
 import org.apache.paimon.partition.PartitionPredicate;
 import org.apache.paimon.schema.Schema;
@@ -37,6 +38,7 @@ import org.apache.hadoop.hive.metastore.api.Partition;
 import org.apache.hadoop.hive.metastore.api.PrimaryKeysRequest;
 import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
 import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hudi.common.model.HoodieRecord;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -50,6 +52,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
+import java.util.Set;
 import java.util.function.Predicate;
 import java.util.stream.Collectors;
 
@@ -115,7 +118,7 @@ public class HiveCloneUtils {
         }
 
         Table hiveTable = client.getTable(database, table);
-        List<FieldSchema> fields = client.getSchema(database, table);
+        List<FieldSchema> fields = getHiveSchema(client, hiveTable, database, 
table);
         List<FieldSchema> partitionFields = hiveTable.getPartitionKeys();
         Map<String, String> hiveTableOptions = hiveTable.getParameters();
 
@@ -158,6 +161,23 @@ public class HiveCloneUtils {
         return schemaBuilder.build();
     }
 
+    private static List<FieldSchema> getHiveSchema(
+            IMetaStoreClient client, Table hiveTable, String database, String 
table)
+            throws Exception {
+        List<FieldSchema> fields = client.getSchema(database, table);
+        if (!HudiCloneUtils.isHoodieTable(hiveTable)) {
+            return fields;
+        }
+
+        Set<String> hudiMetadataFields =
+                Arrays.stream(HoodieRecord.HoodieMetadataField.values())
+                        .map(HoodieRecord.HoodieMetadataField::getFieldName)
+                        .collect(Collectors.toSet());
+        return fields.stream()
+                .filter(f -> !hudiMetadataFields.contains(f.getName()))
+                .collect(Collectors.toList());
+    }
+
     public static List<HivePartitionFiles> listFiles(
             HiveCatalog hiveCatalog,
             Identifier identifier,
@@ -168,6 +188,31 @@ public class HiveCloneUtils {
         IMetaStoreClient client = hiveCatalog.getHmsClient();
         Table sourceTable =
                 client.getTable(identifier.getDatabaseName(), 
identifier.getTableName());
+
+        if (HudiCloneUtils.isHoodieTable(sourceTable)) {
+            return HudiCloneUtils.listFiles(
+                    sourceTable, hiveCatalog.fileIO(), partitionRowType, 
predicate);
+        } else {
+            return listFromPureHiveTable(
+                    client,
+                    identifier,
+                    sourceTable,
+                    hiveCatalog.fileIO(),
+                    partitionRowType,
+                    defaultPartitionName,
+                    predicate);
+        }
+    }
+
+    private static List<HivePartitionFiles> listFromPureHiveTable(
+            IMetaStoreClient client,
+            Identifier identifier,
+            Table sourceTable,
+            FileIO fileIO,
+            RowType partitionRowType,
+            String defaultPartitionName,
+            @Nullable PartitionPredicate predicate)
+            throws Exception {
         List<Partition> partitions =
                 client.listPartitions(
                         identifier.getDatabaseName(), 
identifier.getTableName(), Short.MAX_VALUE);
@@ -176,7 +221,7 @@ public class HiveCloneUtils {
         if (partitions.isEmpty()) {
             String location = sourceTable.getSd().getLocation();
             return Collections.singletonList(
-                    listFiles(hiveCatalog.fileIO(), location, 
BinaryRow.EMPTY_ROW, format));
+                    listFiles(fileIO, location, BinaryRow.EMPTY_ROW, format));
         } else {
             List<BinaryWriter.ValueSetter> valueSetters = new ArrayList<>();
             partitionRowType
@@ -193,7 +238,7 @@ public class HiveCloneUtils {
                                 valueSetters,
                                 defaultPartitionName);
                 if (predicate == null || predicate.test(partitionRow)) {
-                    results.add(listFiles(hiveCatalog.fileIO(), location, 
partitionRow, format));
+                    results.add(listFiles(fileIO, location, partitionRow, 
format));
                 }
             }
             return results;
diff --git 
a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/migrate/HivePartitionFiles.java
 
b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/clone/HivePartitionFiles.java
similarity index 97%
rename from 
paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/migrate/HivePartitionFiles.java
rename to 
paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/clone/HivePartitionFiles.java
index db657fe4cc..1dfef1670c 100644
--- 
a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/migrate/HivePartitionFiles.java
+++ 
b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/clone/HivePartitionFiles.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.paimon.hive.migrate;
+package org.apache.paimon.hive.clone;
 
 import org.apache.paimon.data.BinaryRow;
 import org.apache.paimon.fs.Path;
diff --git 
a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/migrate/HiveMigrator.java
 
b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/migrate/HiveMigrator.java
index 56c985f899..ca8c56b2eb 100644
--- 
a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/migrate/HiveMigrator.java
+++ 
b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/migrate/HiveMigrator.java
@@ -58,8 +58,8 @@ import java.util.concurrent.ThreadPoolExecutor;
 import java.util.stream.Collectors;
 
 import static org.apache.paimon.hive.HiveTypeUtils.toPaimonType;
-import static org.apache.paimon.hive.migrate.HiveCloneUtils.HIDDEN_PATH_FILTER;
-import static org.apache.paimon.hive.migrate.HiveCloneUtils.parseFormat;
+import static org.apache.paimon.hive.clone.HiveCloneUtils.HIDDEN_PATH_FILTER;
+import static org.apache.paimon.hive.clone.HiveCloneUtils.parseFormat;
 import static org.apache.paimon.utils.Preconditions.checkArgument;
 import static org.apache.paimon.utils.ThreadPoolUtils.createCachedThreadPool;
 
diff --git 
a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hudi/FileIndex.java
 
b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hudi/FileIndex.java
new file mode 100644
index 0000000000..7dbdbafc28
--- /dev/null
+++ 
b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hudi/FileIndex.java
@@ -0,0 +1,253 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.hudi;
+
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.data.BinaryWriter;
+import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.hive.clone.HivePartitionFiles;
+import org.apache.paimon.migrate.FileMetaUtils;
+import org.apache.paimon.partition.PartitionPredicate;
+import org.apache.paimon.types.RowType;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hudi.client.common.HoodieFlinkEngineContext;
+import org.apache.hudi.common.config.HoodieMetadataConfig;
+import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.HoodieBaseFile;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
+import org.apache.hudi.common.util.PartitionPathEncodeUtils;
+import org.apache.hudi.configuration.HadoopConfigurations;
+import org.apache.hudi.metadata.HoodieTableMetadata;
+import org.apache.hudi.storage.HoodieStorage;
+import org.apache.hudi.storage.StoragePath;
+import org.apache.hudi.storage.StoragePathInfo;
+import org.apache.hudi.storage.hadoop.HoodieHadoopStorage;
+import org.apache.hudi.util.StreamerUtil;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.stream.Collectors;
+
+import static org.apache.hudi.configuration.FlinkOptions.METADATA_ENABLED;
+import static org.apache.paimon.utils.Preconditions.checkState;
+
+/**
+ * A file index which supports listing files efficiently through metadata 
table.
+ *
+ * @see org.apache.hudi.source.FileIndex
+ */
+public class FileIndex {
+
+    private final Path path;
+    private final HoodieMetadataConfig metadataConfig;
+    private final Configuration hadoopConf;
+    private final RowType partitionType;
+    @Nullable private final PartitionPredicate partitionPredicate;
+    private final HoodieEngineContext engineContext;
+    private final HoodieStorage storage;
+    private final HoodieTableMetaClient metaClient;
+    private final List<BinaryWriter.ValueSetter> valueSetters;
+
+    private List<String> partitionPaths;
+
+    public FileIndex(
+            String location,
+            Map<String, String> conf,
+            RowType partitionType,
+            @Nullable PartitionPredicate partitionPredicate) {
+        this.path = new Path(location);
+        this.metadataConfig = metadataConfig(conf);
+        this.hadoopConf =
+                HadoopConfigurations.getHadoopConf(
+                        
org.apache.flink.configuration.Configuration.fromMap(conf));
+        this.partitionType = partitionType;
+        this.partitionPredicate = partitionPredicate;
+        this.engineContext = new HoodieFlinkEngineContext(hadoopConf);
+        this.storage = new HoodieHadoopStorage(path, hadoopConf);
+        this.metaClient = StreamerUtil.createMetaClient(location, hadoopConf);
+        this.valueSetters = new ArrayList<>();
+        partitionType
+                .getFieldTypes()
+                .forEach(type -> 
this.valueSetters.add(BinaryWriter.createValueSetter(type)));
+    }
+
+    public boolean isPartitioned() {
+        List<String> partitionPaths = getAllPartitionPaths();
+        if (partitionPaths.size() == 1 && partitionPaths.get(0).isEmpty()) {
+            checkState(
+                    partitionType.getFieldCount() == 0,
+                    "Hudi table is non-partitioned but partition type isn't 
empty.");
+            return false;
+        } else {
+            checkState(
+                    partitionType.getFieldCount() >= 0,
+                    "Hudi table is partitioned but partition type is empty.");
+            return true;
+        }
+    }
+
+    public List<HivePartitionFiles> getAllFilteredPartitionFiles(FileIO 
fileIO) {
+        Map<String, BinaryRow> pathToRowMap = new HashMap<>();
+
+        for (String partitionPath : getAllPartitionPaths()) {
+            BinaryRow partitionRow = toPartitionRow(partitionPath);
+            if (partitionPredicate == null || 
partitionPredicate.test(partitionRow)) {
+                pathToRowMap.put(fullPartitionPath(partitionPath), 
partitionRow);
+            }
+        }
+
+        Map<String, List<StoragePathInfo>> filePathInfos;
+        try (HoodieTableMetadata tableMetadata =
+                HoodieTableMetadata.create(
+                        engineContext, storage, metadataConfig, 
path.toString())) {
+            filePathInfos = 
tableMetadata.getAllFilesInPartitions(pathToRowMap.keySet());
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+
+        List<HivePartitionFiles> result = new 
ArrayList<>(filePathInfos.size());
+        for (Map.Entry<String, List<StoragePathInfo>> filePathInfoEntry :
+                filePathInfos.entrySet()) {
+            result.add(
+                    toPartitionFiles(
+                            getBaseFiles(filePathInfoEntry.getValue()),
+                            pathToRowMap.get(filePathInfoEntry.getKey()),
+                            fileIO));
+        }
+        return result;
+    }
+
+    public HivePartitionFiles getUnpartitionedFiles(FileIO fileIO) {
+        List<StoragePathInfo> filePathInfos;
+        try (HoodieTableMetadata tableMetadata =
+                HoodieTableMetadata.create(
+                        engineContext, storage, metadataConfig, 
path.toString())) {
+            filePathInfos = tableMetadata.getAllFilesInPartition(new 
StoragePath(path.toUri()));
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+        return toPartitionFiles(getBaseFiles(filePathInfos), 
BinaryRow.EMPTY_ROW, fileIO);
+    }
+
+    private List<HoodieBaseFile> getBaseFiles(List<StoragePathInfo> 
filePathInfos) {
+        HoodieTableFileSystemView fsView =
+                new HoodieTableFileSystemView(
+                        metaClient,
+                        
metaClient.getCommitsAndCompactionTimeline().filterCompletedInstants(),
+                        filePathInfos);
+        return fsView.getLatestBaseFiles().collect(Collectors.toList());
+    }
+
+    private BinaryRow toPartitionRow(String partitionPath) {
+        List<String> partitionValues = extractPartitionValues(partitionPath);
+        return FileMetaUtils.writePartitionValue(
+                partitionType,
+                partitionValues,
+                valueSetters,
+                // TODO: different engine may use different name, pass correct 
name later
+                PartitionPathEncodeUtils.DEFAULT_PARTITION_PATH);
+    }
+
+    private List<String> extractPartitionValues(String partitionPath) {
+        String[] paths = partitionPath.split(StoragePath.SEPARATOR);
+        // TODO: because we only support table in HMS, the partition is 
hive-style by default
+        // pt1=v1/pt2=v2
+        List<String> partitionValues = new ArrayList<>();
+        Arrays.stream(paths)
+                .forEach(
+                        p -> {
+                            String[] kv = p.split("=");
+                            if (kv.length == 2) {
+                                partitionValues.add(kv[1]);
+                            } else {
+                                throw new RuntimeException(
+                                        "Wrong hudi partition path: " + 
partitionPath);
+                            }
+                        });
+        return partitionValues;
+    }
+
+    private String fullPartitionPath(String partitionPath) {
+        return new Path(path, partitionPath).toString();
+    }
+
+    private List<String> getAllPartitionPaths() {
+        if (partitionPaths == null) {
+            partitionPaths =
+                    FSUtils.getAllPartitionPaths(
+                            engineContext, storage, metadataConfig, 
path.toString());
+        }
+        return partitionPaths;
+    }
+
+    private static HoodieMetadataConfig metadataConfig(Map<String, String> 
conf) {
+        Properties properties = new Properties();
+
+        // set up metadata.enabled=true in table DDL to enable metadata listing
+        boolean enable =
+                
org.apache.flink.configuration.Configuration.fromMap(conf).get(METADATA_ENABLED);
+        properties.put(HoodieMetadataConfig.ENABLE.key(), enable);
+
+        return 
HoodieMetadataConfig.newBuilder().fromProperties(properties).build();
+    }
+
+    private HivePartitionFiles toPartitionFiles(
+            List<HoodieBaseFile> hoodieBaseFiles, BinaryRow partition, FileIO 
fileIO) {
+        List<org.apache.paimon.fs.Path> paths = new 
ArrayList<>(hoodieBaseFiles.size());
+        List<Long> fileSizes = new ArrayList<>(hoodieBaseFiles.size());
+        String format = null;
+        for (HoodieBaseFile baseFile : hoodieBaseFiles) {
+            org.apache.paimon.fs.Path path = new 
org.apache.paimon.fs.Path(baseFile.getPath());
+            if (format == null) {
+                format = parseFormat(path.toString());
+            }
+            long fileSize = baseFile.getFileSize();
+            if (fileSize == -1) {
+                try {
+                    fileSize = fileIO.getFileSize(path);
+                } catch (IOException ignored) {
+                }
+            }
+            paths.add(path);
+            fileSizes.add(fileSize);
+        }
+        return new HivePartitionFiles(partition, paths, fileSizes, format);
+    }
+
+    private String parseFormat(String path) {
+        if (path.endsWith(".parquet")) {
+            return "parquet";
+        } else if (path.endsWith(".orc")) {
+            return "orc";
+        } else {
+            throw new RuntimeException("Cannot extract format from file " + 
path);
+        }
+    }
+}
diff --git 
a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hudi/HudiCloneUtils.java
 
b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hudi/HudiCloneUtils.java
new file mode 100644
index 0000000000..2ab091411a
--- /dev/null
+++ 
b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hudi/HudiCloneUtils.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.hudi;
+
+import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.hive.clone.HivePartitionFiles;
+import org.apache.paimon.partition.PartitionPredicate;
+import org.apache.paimon.types.RowType;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.table.catalog.TableOptionProperties;
+
+import javax.annotation.Nullable;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.hudi.configuration.FlinkOptions.TABLE_TYPE;
+import static org.apache.paimon.utils.Preconditions.checkArgument;
+
+/**
+ * Utils for cloning Hudi tables. Currently, only support COW Hudi tables in 
Hive metastore, and it
+ * will be cloned to unaware-bucket table.
+ */
+public class HudiCloneUtils {
+
+    public static boolean isHoodieTable(Table hiveTable) {
+        return hiveTable
+                .getParameters()
+                .getOrDefault(TableOptionProperties.SPARK_SOURCE_PROVIDER, "")
+                .equalsIgnoreCase("hudi");
+    }
+
+    public static List<HivePartitionFiles> listFiles(
+            Table hudiHiveTable,
+            FileIO fileIO,
+            RowType partitionType,
+            @Nullable PartitionPredicate partitionPredicate) {
+        Map<String, String> options = hudiHiveTable.getParameters();
+        checkTableType(options);
+
+        String location = hudiHiveTable.getSd().getLocation();
+        FileIndex fileIndex = new FileIndex(location, options, partitionType, 
partitionPredicate);
+
+        if (fileIndex.isPartitioned()) {
+            return fileIndex.getAllFilteredPartitionFiles(fileIO);
+        } else {
+            return 
Collections.singletonList(fileIndex.getUnpartitionedFiles(fileIO));
+        }
+    }
+
+    private static void checkTableType(Map<String, String> conf) {
+        String type = Configuration.fromMap(conf).get(TABLE_TYPE);
+        checkArgument(
+                HoodieTableType.valueOf(type) == HoodieTableType.COPY_ON_WRITE,
+                "Only Hudi COW table is supported yet but found %s table.",
+                type);
+    }
+}
diff --git a/paimon-hive/paimon-hive-connector-common/pom.xml 
b/paimon-hive/paimon-hive-connector-common/pom.xml
index 4fdaf704ec..bdb1e5781f 100644
--- a/paimon-hive/paimon-hive-connector-common/pom.xml
+++ b/paimon-hive/paimon-hive-connector-common/pom.xml
@@ -566,6 +566,37 @@ under the License.
             </exclusions>
         </dependency>
 
+        <!-- for huid clone test -->
+        <dependency>
+            <groupId>org.apache.hudi</groupId>
+            <artifactId>hudi-flink1.18-bundle</artifactId>
+            <version>${hudi.version}</version>
+            <scope>test</scope>
+        </dependency>
+
+        <!-- this is for huid clone test, and it should be put before hive-exec
+             for the same reason as org.apache.avro:avro. -->
+        <dependency>
+            <groupId>org.apache.parquet</groupId>
+            <artifactId>parquet-avro</artifactId>
+            <version>${parquet.version}</version>
+            <scope>test</scope>
+            <exclusions>
+                <exclusion>
+                    <groupId>org.apache.parquet</groupId>
+                    <artifactId>parquet-column</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.apache.parquet</groupId>
+                    <artifactId>parquet-common</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.apache.parquet</groupId>
+                    <artifactId>parquet-hadoop</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+
         <!--
         Why we need this test dependency:
         IDEA reads classes from the same project from target/classes of that 
module,
diff --git 
a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hudi/CloneActionForHudiITCase.java
 
b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hudi/CloneActionForHudiITCase.java
new file mode 100644
index 0000000000..decec2e136
--- /dev/null
+++ 
b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hudi/CloneActionForHudiITCase.java
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.hudi;
+
+import org.apache.paimon.flink.action.ActionITCaseBase;
+import org.apache.paimon.flink.action.CloneAction;
+import org.apache.paimon.hive.TestHiveMetastore;
+
+import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableList;
+
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.types.Row;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.assertj.core.api.Assertions;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.UUID;
+
+/** Test clone Hudi table. */
+public class CloneActionForHudiITCase extends ActionITCaseBase {
+
+    private static final TestHiveMetastore TEST_HIVE_METASTORE = new 
TestHiveMetastore();
+
+    private static final int PORT = 9089;
+
+    @BeforeEach
+    public void beforeEach() throws IOException {
+        super.before();
+        TEST_HIVE_METASTORE.start(PORT);
+    }
+
+    @AfterEach
+    public void afterEach() throws Exception {
+        super.after();
+        TEST_HIVE_METASTORE.stop();
+    }
+
+    @Test
+    public void testMigrateOneNonPartitionedTable() throws Exception {
+        TableEnvironment tEnv = tableEnvironmentBuilder().batchMode().build();
+
+        tEnv.executeSql(
+                "CREATE TABLE hudi_table ("
+                        + "  id STRING PRIMARY KEY NOT ENFORCED,"
+                        + "  name STRING,"
+                        + "  price INT"
+                        + ") WITH ("
+                        + "  'connector' = 'hudi',"
+                        + String.format(
+                                "'path' = '%s/%s/hudi_table',",
+                                
System.getProperty(HiveConf.ConfVars.METASTOREWAREHOUSE.varname),
+                                UUID.randomUUID())
+                        + "  'table.type' = 'COPY_ON_WRITE',"
+                        + "  'hive_sync.enable' = 'true',"
+                        + "  'hive_sync.mode' = 'hms',"
+                        + String.format(
+                                "'hive_sync.metastore.uris' = 
'thrift://localhost:%s',", PORT)
+                        + "  'hive_sync.db' = 'default',"
+                        + "  'hive_sync.table' = 'hudi_table'"
+                        + ")");
+
+        List<String> insertValues = new ArrayList<>();
+        for (int i = 0; i < 50; i++) {
+            insertValues.add(String.format("('%s', '%s', %s)", i, "A", i));
+        }
+        tEnv.executeSql("INSERT INTO hudi_table VALUES " + String.join(",", 
insertValues)).await();
+
+        // test pk
+        insertValues.clear();
+        for (int i = 0; i < 50; i++) {
+            insertValues.add(String.format("('%s', '%s', %s)", i, "B", i));
+        }
+        tEnv.executeSql("INSERT INTO hudi_table VALUES " + String.join(",", 
insertValues)).await();
+        List<Row> r1 = ImmutableList.copyOf(tEnv.executeSql("SELECT * FROM 
hudi_table").collect());
+
+        tEnv.executeSql(
+                "CREATE CATALOG PAIMON WITH ('type'='paimon', 'warehouse' = '" 
+ warehouse + "')");
+        tEnv.useCatalog("PAIMON");
+        tEnv.executeSql("CREATE DATABASE test");
+
+        createAction(
+                        CloneAction.class,
+                        "clone",
+                        "--database",
+                        "default",
+                        "--table",
+                        "hudi_table",
+                        "--catalog_conf",
+                        "metastore=hive",
+                        "--catalog_conf",
+                        "uri=thrift://localhost:" + PORT,
+                        "--target_database",
+                        "test",
+                        "--target_table",
+                        "test_table",
+                        "--target_catalog_conf",
+                        "warehouse=" + warehouse)
+                .run();
+
+        List<Row> r2 =
+                ImmutableList.copyOf(tEnv.executeSql("SELECT * FROM 
test.test_table").collect());
+        Assertions.assertThatList(r1).containsExactlyInAnyOrderElementsOf(r2);
+    }
+
+    @Test
+    public void testMigrateOnePartitionedTable() throws Exception {
+        TableEnvironment tEnv = tableEnvironmentBuilder().batchMode().build();
+
+        tEnv.executeSql(
+                "CREATE TABLE hudi_table ("
+                        + "  id STRING PRIMARY KEY NOT ENFORCED,"
+                        + "  name STRING,"
+                        + "  pt STRING"
+                        + ") PARTITIONED BY (pt) WITH ("
+                        + "  'connector' = 'hudi',"
+                        + String.format(
+                                "'path' = '%s/%s/hudi_table',",
+                                
System.getProperty(HiveConf.ConfVars.METASTOREWAREHOUSE.varname),
+                                UUID.randomUUID())
+                        + "  'table.type' = 'COPY_ON_WRITE',"
+                        + "  'hive_sync.enable' = 'true',"
+                        + "  'hive_sync.mode' = 'hms',"
+                        + String.format(
+                                "'hive_sync.metastore.uris' = 
'thrift://localhost:%s',", PORT)
+                        + "  'hive_sync.db' = 'default',"
+                        + "  'hive_sync.table' = 'hudi_table',"
+                        + "  'hive_sync.partition_fields' = 'pt',"
+                        + "  'hoodie.datasource.write.hive_style_partitioning' 
= 'true',"
+                        + "  'hive_sync.partition_extractor_class' = 
'org.apache.hudi.hive.HiveStylePartitionValueExtractor'"
+                        + ")");
+
+        List<String> insertValues = new ArrayList<>();
+        for (int i = 0; i < 50; i++) {
+            insertValues.add(String.format("('%s', '%s', '%s')", i, "A", 
"2025-01-01"));
+        }
+        tEnv.executeSql("insert into hudi_table values " + String.join(",", 
insertValues)).await();
+
+        // test pk
+        insertValues.clear();
+        for (int i = 0; i < 50; i++) {
+            insertValues.add(String.format("('%s', '%s', '%s')", i, "B", 
"2025-01-01"));
+        }
+        tEnv.executeSql("INSERT INTO hudi_table VALUES " + String.join(",", 
insertValues)).await();
+        List<Row> r1 = ImmutableList.copyOf(tEnv.executeSql("SELECT * FROM 
hudi_table").collect());
+
+        tEnv.executeSql(
+                "CREATE CATALOG PAIMON WITH ('type'='paimon', 'warehouse' = '" 
+ warehouse + "')");
+        tEnv.useCatalog("PAIMON");
+        tEnv.executeSql("CREATE DATABASE test");
+
+        createAction(
+                        CloneAction.class,
+                        "clone",
+                        "--database",
+                        "default",
+                        "--table",
+                        "hudi_table",
+                        "--catalog_conf",
+                        "metastore=hive",
+                        "--catalog_conf",
+                        "uri=thrift://localhost:" + PORT,
+                        "--target_database",
+                        "test",
+                        "--target_table",
+                        "test_table",
+                        "--target_catalog_conf",
+                        "warehouse=" + warehouse)
+                .run();
+
+        List<Row> r2 =
+                ImmutableList.copyOf(tEnv.executeSql("SELECT * FROM 
test.test_table").collect());
+        Assertions.assertThatList(r1).containsExactlyInAnyOrderElementsOf(r2);
+    }
+}
diff --git a/pom.xml b/pom.xml
index 87eb328824..82b472d699 100644
--- a/pom.xml
+++ b/pom.xml
@@ -106,6 +106,7 @@ under the License.
         <flink.reuseForks>true</flink.reuseForks>
         <testcontainers.version>1.19.1</testcontainers.version>
         <iceberg.version>1.6.1</iceberg.version>
+        <hudi.version>0.15.0</hudi.version>
         <parquet.version>1.15.1</parquet.version>
         <orc.version>1.9.2</orc.version>
         <protobuf-java.version>3.19.6</protobuf-java.version>


Reply via email to