This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git

commit f4b8e3dd96b49304f1c576ac7e4549efa7a3de25
Author: JingsongLi <[email protected]>
AuthorDate: Mon May 12 12:06:36 2025 +0800

    [hudi] Extract paimon hudi module
---
 paimon-hive/paimon-hive-catalog/pom.xml            |  29 ---
 .../paimon/hive/clone/HiveCloneExtractor.java      |  71 +++++++
 .../apache/paimon/hive/clone/HiveCloneUtils.java   | 185 ++----------------
 ...loneUtils.java => HiveTableCloneExtractor.java} | 208 +++++----------------
 .../org/apache/paimon/hudi/HudiCloneUtils.java     |  78 --------
 paimon-hive/paimon-hive-connector-common/pom.xml   |   7 +
 paimon-hudi/pom.xml                                |  70 +++++++
 .../java/org/apache/paimon/hudi/HudiFileIndex.java |   7 +-
 .../apache/paimon/hudi/HudiHiveCloneExtractor.java |  99 ++++++++++
 ...org.apache.paimon.hive.clone.HiveCloneExtractor |  16 ++
 pom.xml                                            |   1 +
 11 files changed, 331 insertions(+), 440 deletions(-)

diff --git a/paimon-hive/paimon-hive-catalog/pom.xml 
b/paimon-hive/paimon-hive-catalog/pom.xml
index fc94856633..5c49a76f1d 100644
--- a/paimon-hive/paimon-hive-catalog/pom.xml
+++ b/paimon-hive/paimon-hive-catalog/pom.xml
@@ -112,35 +112,6 @@ under the License.
             </exclusions>
         </dependency>
 
-        <!-- for hudi clone -->
-        <dependency>
-            <groupId>org.apache.hudi</groupId>
-            <artifactId>hudi-common</artifactId>
-            <version>${hudi.version}</version>
-            <scope>provided</scope>
-        </dependency>
-
-        <dependency>
-            <groupId>org.apache.hudi</groupId>
-            <artifactId>hudi-flink</artifactId>
-            <version>${hudi.version}</version>
-            <scope>provided</scope>
-        </dependency>
-
-        <dependency>
-            <groupId>org.apache.flink</groupId>
-            <artifactId>flink-core</artifactId>
-            <version>${paimon-flink-common.flink.version}</version>
-            <scope>provided</scope>
-        </dependency>
-
-        <dependency>
-            <groupId>org.apache.flink</groupId>
-            <artifactId>flink-table-common</artifactId>
-            <version>${paimon-flink-common.flink.version}</version>
-            <scope>provided</scope>
-        </dependency>
-
         <!-- Test -->
 
         <dependency>
diff --git 
a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/clone/HiveCloneExtractor.java
 
b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/clone/HiveCloneExtractor.java
new file mode 100644
index 0000000000..a223db56e3
--- /dev/null
+++ 
b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/clone/HiveCloneExtractor.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.hive.clone;
+
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.factories.FactoryUtil;
+import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.partition.PartitionPredicate;
+import org.apache.paimon.types.RowType;
+
+import org.apache.hadoop.hive.metastore.IMetaStoreClient;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.Table;
+
+import javax.annotation.Nullable;
+
+import java.util.List;
+import java.util.Map;
+
+/** An interface for hive clone schema and files extractor. */
+public interface HiveCloneExtractor {
+
+    boolean matches(Table table);
+
+    List<FieldSchema> extractSchema(
+            IMetaStoreClient client, Table hiveTable, String database, String 
table)
+            throws Exception;
+
+    List<HivePartitionFiles> extractFiles(
+            IMetaStoreClient client,
+            Table table,
+            FileIO fileIO,
+            Identifier identifier,
+            RowType partitionRowType,
+            String defaultPartitionName,
+            @Nullable PartitionPredicate predicate)
+            throws Exception;
+
+    List<String> extractPartitionKeys(Table table);
+
+    Map<String, String> extractOptions(Table table);
+
+    List<HiveCloneExtractor> EXTRACTORS =
+            FactoryUtil.discoverFactories(
+                    HiveCloneExtractor.class.getClassLoader(), 
HiveCloneExtractor.class);
+
+    static HiveCloneExtractor getExtractor(Table table) {
+        for (HiveCloneExtractor extractor : EXTRACTORS) {
+            if (extractor.matches(table)) {
+                return extractor;
+            }
+        }
+        return HiveTableCloneExtractor.INSTANCE;
+    }
+}
diff --git 
a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/clone/HiveCloneUtils.java
 
b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/clone/HiveCloneUtils.java
index b3e53cd8d3..e1326ede41 100644
--- 
a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/clone/HiveCloneUtils.java
+++ 
b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/clone/HiveCloneUtils.java
@@ -19,14 +19,8 @@
 package org.apache.paimon.hive.clone;
 
 import org.apache.paimon.catalog.Identifier;
-import org.apache.paimon.data.BinaryRow;
-import org.apache.paimon.data.BinaryWriter;
-import org.apache.paimon.fs.FileIO;
 import org.apache.paimon.fs.FileStatus;
-import org.apache.paimon.fs.Path;
 import org.apache.paimon.hive.HiveCatalog;
-import org.apache.paimon.hudi.HudiCloneUtils;
-import org.apache.paimon.migrate.FileMetaUtils;
 import org.apache.paimon.partition.PartitionPredicate;
 import org.apache.paimon.schema.Schema;
 import org.apache.paimon.types.RowType;
@@ -38,26 +32,17 @@ import org.apache.hadoop.hive.metastore.api.Partition;
 import org.apache.hadoop.hive.metastore.api.PrimaryKeysRequest;
 import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
 import org.apache.hadoop.hive.metastore.api.Table;
-import org.apache.hudi.common.model.HoodieRecord;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import javax.annotation.Nullable;
 
-import java.io.IOException;
 import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.Objects;
-import java.util.Set;
 import java.util.function.Predicate;
-import java.util.stream.Collectors;
 
-import static org.apache.paimon.CoreOptions.FILE_COMPRESSION;
-import static org.apache.paimon.CoreOptions.FILE_FORMAT;
 import static org.apache.paimon.hive.HiveTypeUtils.toPaimonType;
 
 /** Utils for cloning Hive table to Paimon table. */
@@ -118,38 +103,15 @@ public class HiveCloneUtils {
         }
 
         Table hiveTable = client.getTable(database, table);
-        List<FieldSchema> fields = getHiveSchema(client, hiveTable, database, 
table);
-        List<FieldSchema> partitionFields = hiveTable.getPartitionKeys();
-        Map<String, String> hiveTableOptions = hiveTable.getParameters();
-
-        Map<String, String> paimonOptions = new HashMap<>();
-        // for compatible with hive comment system
-        if (hiveTableOptions.get("comment") != null) {
-            paimonOptions.put("hive.comment", hiveTableOptions.get("comment"));
-        }
-
-        String format = parseFormat(hiveTable);
-        paimonOptions.put(FILE_FORMAT.key(), format);
-        Map<String, String> formatOptions = getIdentifierPrefixOptions(format, 
hiveTableOptions);
-        Map<String, String> sdFormatOptions =
-                getIdentifierPrefixOptions(
-                        format, 
hiveTable.getSd().getSerdeInfo().getParameters());
-        formatOptions.putAll(sdFormatOptions);
-        paimonOptions.putAll(formatOptions);
-
-        String compression = parseCompression(hiveTable, format, 
formatOptions);
-        if (compression != null) {
-            paimonOptions.put(FILE_COMPRESSION.key(), compression);
-        }
-
+        HiveCloneExtractor extractor = 
HiveCloneExtractor.getExtractor(hiveTable);
+        List<FieldSchema> fields = extractor.extractSchema(client, hiveTable, 
database, table);
+        List<String> partitionKeys = extractor.extractPartitionKeys(hiveTable);
+        Map<String, String> options = extractor.extractOptions(hiveTable);
         Schema.Builder schemaBuilder =
                 Schema.newBuilder()
-                        .comment(hiveTableOptions.get("comment"))
-                        .options(paimonOptions)
-                        .partitionKeys(
-                                partitionFields.stream()
-                                        .map(FieldSchema::getName)
-                                        .collect(Collectors.toList()));
+                        .comment(options.get("comment"))
+                        .options(options)
+                        .partitionKeys(partitionKeys);
 
         fields.forEach(
                 field ->
@@ -161,23 +123,6 @@ public class HiveCloneUtils {
         return schemaBuilder.build();
     }
 
-    private static List<FieldSchema> getHiveSchema(
-            IMetaStoreClient client, Table hiveTable, String database, String 
table)
-            throws Exception {
-        List<FieldSchema> fields = client.getSchema(database, table);
-        if (!HudiCloneUtils.isHoodieTable(hiveTable)) {
-            return fields;
-        }
-
-        Set<String> hudiMetadataFields =
-                Arrays.stream(HoodieRecord.HoodieMetadataField.values())
-                        .map(HoodieRecord.HoodieMetadataField::getFieldName)
-                        .collect(Collectors.toSet());
-        return fields.stream()
-                .filter(f -> !hudiMetadataFields.contains(f.getName()))
-                .collect(Collectors.toList());
-    }
-
     public static List<HivePartitionFiles> listFiles(
             HiveCatalog hiveCatalog,
             Identifier identifier,
@@ -188,77 +133,15 @@ public class HiveCloneUtils {
         IMetaStoreClient client = hiveCatalog.getHmsClient();
         Table sourceTable =
                 client.getTable(identifier.getDatabaseName(), 
identifier.getTableName());
-
-        if (HudiCloneUtils.isHoodieTable(sourceTable)) {
-            return HudiCloneUtils.listFiles(
-                    sourceTable, hiveCatalog.fileIO(), partitionRowType, 
predicate);
-        } else {
-            return listFromPureHiveTable(
-                    client,
-                    identifier,
-                    sourceTable,
-                    hiveCatalog.fileIO(),
-                    partitionRowType,
-                    defaultPartitionName,
-                    predicate);
-        }
-    }
-
-    private static List<HivePartitionFiles> listFromPureHiveTable(
-            IMetaStoreClient client,
-            Identifier identifier,
-            Table sourceTable,
-            FileIO fileIO,
-            RowType partitionRowType,
-            String defaultPartitionName,
-            @Nullable PartitionPredicate predicate)
-            throws Exception {
-        List<Partition> partitions =
-                client.listPartitions(
-                        identifier.getDatabaseName(), 
identifier.getTableName(), Short.MAX_VALUE);
-        String format = parseFormat(sourceTable);
-
-        if (partitions.isEmpty()) {
-            String location = sourceTable.getSd().getLocation();
-            return Collections.singletonList(
-                    listFiles(fileIO, location, BinaryRow.EMPTY_ROW, format));
-        } else {
-            List<BinaryWriter.ValueSetter> valueSetters = new ArrayList<>();
-            partitionRowType
-                    .getFieldTypes()
-                    .forEach(type -> 
valueSetters.add(BinaryWriter.createValueSetter(type)));
-            List<HivePartitionFiles> results = new ArrayList<>();
-            for (Partition partition : partitions) {
-                List<String> partitionValues = partition.getValues();
-                String location = partition.getSd().getLocation();
-                BinaryRow partitionRow =
-                        FileMetaUtils.writePartitionValue(
-                                partitionRowType,
-                                partitionValues,
-                                valueSetters,
-                                defaultPartitionName);
-                if (predicate == null || predicate.test(partitionRow)) {
-                    results.add(listFiles(fileIO, location, partitionRow, 
format));
-                }
-            }
-            return results;
-        }
-    }
-
-    private static HivePartitionFiles listFiles(
-            FileIO fileIO, String location, BinaryRow partition, String 
format) throws IOException {
-        List<FileStatus> fileStatuses =
-                Arrays.stream(fileIO.listStatus(new Path(location)))
-                        .filter(s -> !s.isDir())
-                        .filter(HIDDEN_PATH_FILTER)
-                        .collect(Collectors.toList());
-        List<Path> paths = new ArrayList<>();
-        List<Long> fileSizes = new ArrayList<>();
-        for (FileStatus fileStatus : fileStatuses) {
-            paths.add(fileStatus.getPath());
-            fileSizes.add(fileStatus.getLen());
-        }
-        return new HivePartitionFiles(partition, paths, fileSizes, format);
+        return HiveCloneExtractor.getExtractor(sourceTable)
+                .extractFiles(
+                        hiveCatalog.getHmsClient(),
+                        sourceTable,
+                        hiveCatalog.fileIO(),
+                        identifier,
+                        partitionRowType,
+                        defaultPartitionName,
+                        predicate);
     }
 
     private static String parseFormat(StorageDescriptor storageDescriptor) {
@@ -288,40 +171,4 @@ public class HiveCloneUtils {
         }
         return format;
     }
-
-    private static String parseCompression(StorageDescriptor 
storageDescriptor) {
-        Map<String, String> serderParams = 
storageDescriptor.getSerdeInfo().getParameters();
-        if (serderParams.containsKey("compression")) {
-            return serderParams.get("compression");
-        }
-        return null;
-    }
-
-    private static String parseCompression(
-            Table table, String format, Map<String, String> formatOptions) {
-        String compression = null;
-        if (Objects.equals(format, "avro")) {
-            compression = formatOptions.getOrDefault("avro.codec", 
parseCompression(table.getSd()));
-        } else if (Objects.equals(format, "parquet")) {
-            compression =
-                    formatOptions.getOrDefault(
-                            "parquet.compression", 
parseCompression(table.getSd()));
-        } else if (Objects.equals(format, "orc")) {
-            compression =
-                    formatOptions.getOrDefault("orc.compress", 
parseCompression(table.getSd()));
-        }
-        return compression;
-    }
-
-    public static Map<String, String> getIdentifierPrefixOptions(
-            String formatIdentifier, Map<String, String> options) {
-        Map<String, String> result = new HashMap<>();
-        String prefix = formatIdentifier.toLowerCase() + ".";
-        for (String key : options.keySet()) {
-            if (key.toLowerCase().startsWith(prefix)) {
-                result.put(prefix + key.substring(prefix.length()), 
options.get(key));
-            }
-        }
-        return result;
-    }
 }
diff --git 
a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/clone/HiveCloneUtils.java
 
b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/clone/HiveTableCloneExtractor.java
similarity index 50%
copy from 
paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/clone/HiveCloneUtils.java
copy to 
paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/clone/HiveTableCloneExtractor.java
index b3e53cd8d3..0b503b0086 100644
--- 
a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/clone/HiveCloneUtils.java
+++ 
b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/clone/HiveTableCloneExtractor.java
@@ -24,23 +24,15 @@ import org.apache.paimon.data.BinaryWriter;
 import org.apache.paimon.fs.FileIO;
 import org.apache.paimon.fs.FileStatus;
 import org.apache.paimon.fs.Path;
-import org.apache.paimon.hive.HiveCatalog;
-import org.apache.paimon.hudi.HudiCloneUtils;
 import org.apache.paimon.migrate.FileMetaUtils;
 import org.apache.paimon.partition.PartitionPredicate;
-import org.apache.paimon.schema.Schema;
 import org.apache.paimon.types.RowType;
 
 import org.apache.hadoop.hive.metastore.IMetaStoreClient;
-import org.apache.hadoop.hive.metastore.api.Database;
 import org.apache.hadoop.hive.metastore.api.FieldSchema;
 import org.apache.hadoop.hive.metastore.api.Partition;
-import org.apache.hadoop.hive.metastore.api.PrimaryKeysRequest;
 import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
 import org.apache.hadoop.hive.metastore.api.Table;
-import org.apache.hudi.common.model.HoodieRecord;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import javax.annotation.Nullable;
 
@@ -52,156 +44,80 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
-import java.util.Set;
-import java.util.function.Predicate;
 import java.util.stream.Collectors;
 
 import static org.apache.paimon.CoreOptions.FILE_COMPRESSION;
 import static org.apache.paimon.CoreOptions.FILE_FORMAT;
-import static org.apache.paimon.hive.HiveTypeUtils.toPaimonType;
+import static org.apache.paimon.hive.clone.HiveCloneUtils.HIDDEN_PATH_FILTER;
+import static org.apache.paimon.hive.clone.HiveCloneUtils.parseFormat;
 
-/** Utils for cloning Hive table to Paimon table. */
-public class HiveCloneUtils {
+/** A {@link HiveCloneExtractor} for hive tables. */
+public class HiveTableCloneExtractor implements HiveCloneExtractor {
 
-    private static final Logger LOG = 
LoggerFactory.getLogger(HiveCloneUtils.class);
+    public static final HiveTableCloneExtractor INSTANCE = new 
HiveTableCloneExtractor();
 
-    public static final Predicate<FileStatus> HIDDEN_PATH_FILTER =
-            p -> !p.getPath().getName().startsWith("_") && 
!p.getPath().getName().startsWith(".");
-
-    public static Map<String, String> getDatabaseOptions(
-            HiveCatalog hiveCatalog, String databaseName) throws Exception {
-        IMetaStoreClient client = hiveCatalog.getHmsClient();
-        Database database = client.getDatabase(databaseName);
-        Map<String, String> paimonOptions = new HashMap<>();
-        if (database.getDescription() != null) {
-            paimonOptions.put("comment", database.getDescription());
-        }
-        return paimonOptions;
-    }
-
-    public static List<Identifier> listTables(HiveCatalog hiveCatalog) throws 
Exception {
-        IMetaStoreClient client = hiveCatalog.getHmsClient();
-        List<Identifier> results = new ArrayList<>();
-        for (String database : client.getAllDatabases()) {
-            for (String table : client.getAllTables(database)) {
-                results.add(Identifier.create(database, table));
-            }
-        }
-        return results;
+    @Override
+    public boolean matches(Table table) {
+        return true;
     }
 
-    public static List<Identifier> listTables(HiveCatalog hiveCatalog, String 
database)
+    @Override
+    public List<FieldSchema> extractSchema(
+            IMetaStoreClient client, Table hiveTable, String database, String 
table)
             throws Exception {
-        IMetaStoreClient client = hiveCatalog.getHmsClient();
-        List<Identifier> results = new ArrayList<>();
-        for (String table : client.getAllTables(database)) {
-            results.add(Identifier.create(database, table));
-        }
-        return results;
+        return client.getSchema(database, table);
     }
 
-    public static Schema hiveTableToPaimonSchema(HiveCatalog hiveCatalog, 
Identifier identifier)
+    @Override
+    public List<HivePartitionFiles> extractFiles(
+            IMetaStoreClient client,
+            Table table,
+            FileIO fileIO,
+            Identifier identifier,
+            RowType partitionRowType,
+            String defaultPartitionName,
+            @Nullable PartitionPredicate predicate)
             throws Exception {
-        String database = identifier.getDatabaseName();
-        String table = identifier.getObjectName();
-
-        IMetaStoreClient client = hiveCatalog.getHmsClient();
-        // check primary key
-        PrimaryKeysRequest primaryKeysRequest = new 
PrimaryKeysRequest(database, table);
-        try {
-            if (!client.getPrimaryKeys(primaryKeysRequest).isEmpty()) {
-                throw new IllegalArgumentException("Can't migrate primary key 
table yet.");
-            }
-        } catch (Exception e) {
-            LOG.warn(
-                    "Your Hive version is low which not support 
get_primary_keys, skip primary key check firstly!");
-        }
+        return listFromPureHiveTable(
+                client,
+                identifier,
+                table,
+                fileIO,
+                partitionRowType,
+                defaultPartitionName,
+                predicate);
+    }
 
-        Table hiveTable = client.getTable(database, table);
-        List<FieldSchema> fields = getHiveSchema(client, hiveTable, database, 
table);
-        List<FieldSchema> partitionFields = hiveTable.getPartitionKeys();
-        Map<String, String> hiveTableOptions = hiveTable.getParameters();
+    @Override
+    public List<String> extractPartitionKeys(Table table) {
+        return table.getPartitionKeys().stream()
+                .map(FieldSchema::getName)
+                .collect(Collectors.toList());
+    }
 
+    @Override
+    public Map<String, String> extractOptions(Table table) {
+        Map<String, String> hiveTableOptions = table.getParameters();
         Map<String, String> paimonOptions = new HashMap<>();
-        // for compatible with hive comment system
-        if (hiveTableOptions.get("comment") != null) {
-            paimonOptions.put("hive.comment", hiveTableOptions.get("comment"));
+        String comment = hiveTableOptions.get("comment");
+        if (comment != null) {
+            paimonOptions.put("hive.comment", comment);
+            paimonOptions.put("comment", comment);
         }
 
-        String format = parseFormat(hiveTable);
+        String format = parseFormat(table);
         paimonOptions.put(FILE_FORMAT.key(), format);
         Map<String, String> formatOptions = getIdentifierPrefixOptions(format, 
hiveTableOptions);
         Map<String, String> sdFormatOptions =
-                getIdentifierPrefixOptions(
-                        format, 
hiveTable.getSd().getSerdeInfo().getParameters());
+                getIdentifierPrefixOptions(format, 
table.getSd().getSerdeInfo().getParameters());
         formatOptions.putAll(sdFormatOptions);
         paimonOptions.putAll(formatOptions);
 
-        String compression = parseCompression(hiveTable, format, 
formatOptions);
+        String compression = parseCompression(table, format, formatOptions);
         if (compression != null) {
             paimonOptions.put(FILE_COMPRESSION.key(), compression);
         }
-
-        Schema.Builder schemaBuilder =
-                Schema.newBuilder()
-                        .comment(hiveTableOptions.get("comment"))
-                        .options(paimonOptions)
-                        .partitionKeys(
-                                partitionFields.stream()
-                                        .map(FieldSchema::getName)
-                                        .collect(Collectors.toList()));
-
-        fields.forEach(
-                field ->
-                        schemaBuilder.column(
-                                field.getName(),
-                                toPaimonType(field.getType()),
-                                field.getComment()));
-
-        return schemaBuilder.build();
-    }
-
-    private static List<FieldSchema> getHiveSchema(
-            IMetaStoreClient client, Table hiveTable, String database, String 
table)
-            throws Exception {
-        List<FieldSchema> fields = client.getSchema(database, table);
-        if (!HudiCloneUtils.isHoodieTable(hiveTable)) {
-            return fields;
-        }
-
-        Set<String> hudiMetadataFields =
-                Arrays.stream(HoodieRecord.HoodieMetadataField.values())
-                        .map(HoodieRecord.HoodieMetadataField::getFieldName)
-                        .collect(Collectors.toSet());
-        return fields.stream()
-                .filter(f -> !hudiMetadataFields.contains(f.getName()))
-                .collect(Collectors.toList());
-    }
-
-    public static List<HivePartitionFiles> listFiles(
-            HiveCatalog hiveCatalog,
-            Identifier identifier,
-            RowType partitionRowType,
-            String defaultPartitionName,
-            @Nullable PartitionPredicate predicate)
-            throws Exception {
-        IMetaStoreClient client = hiveCatalog.getHmsClient();
-        Table sourceTable =
-                client.getTable(identifier.getDatabaseName(), 
identifier.getTableName());
-
-        if (HudiCloneUtils.isHoodieTable(sourceTable)) {
-            return HudiCloneUtils.listFiles(
-                    sourceTable, hiveCatalog.fileIO(), partitionRowType, 
predicate);
-        } else {
-            return listFromPureHiveTable(
-                    client,
-                    identifier,
-                    sourceTable,
-                    hiveCatalog.fileIO(),
-                    partitionRowType,
-                    defaultPartitionName,
-                    predicate);
-        }
+        return paimonOptions;
     }
 
     private static List<HivePartitionFiles> listFromPureHiveTable(
@@ -211,7 +127,7 @@ public class HiveCloneUtils {
             FileIO fileIO,
             RowType partitionRowType,
             String defaultPartitionName,
-            @Nullable PartitionPredicate predicate)
+            @javax.annotation.Nullable PartitionPredicate predicate)
             throws Exception {
         List<Partition> partitions =
                 client.listPartitions(
@@ -261,34 +177,6 @@ public class HiveCloneUtils {
         return new HivePartitionFiles(partition, paths, fileSizes, format);
     }
 
-    private static String parseFormat(StorageDescriptor storageDescriptor) {
-        String serder = storageDescriptor.getSerdeInfo().toString();
-        if (serder.contains("avro")) {
-            return "avro";
-        } else if (serder.contains("parquet")) {
-            return "parquet";
-        } else if (serder.contains("orc")) {
-            return "orc";
-        }
-        return null;
-    }
-
-    public static String parseFormat(Table table) {
-        String format = parseFormat(table.getSd());
-        if (format == null) {
-            throw new UnsupportedOperationException("Unknown table format:" + 
table);
-        }
-        return format;
-    }
-
-    public static String parseFormat(Partition partition) {
-        String format = parseFormat(partition.getSd());
-        if (format == null) {
-            throw new UnsupportedOperationException("Unknown partition format: 
" + partition);
-        }
-        return format;
-    }
-
     private static String parseCompression(StorageDescriptor 
storageDescriptor) {
         Map<String, String> serderParams = 
storageDescriptor.getSerdeInfo().getParameters();
         if (serderParams.containsKey("compression")) {
diff --git 
a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hudi/HudiCloneUtils.java
 
b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hudi/HudiCloneUtils.java
deleted file mode 100644
index 2ab091411a..0000000000
--- 
a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hudi/HudiCloneUtils.java
+++ /dev/null
@@ -1,78 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.paimon.hudi;
-
-import org.apache.paimon.fs.FileIO;
-import org.apache.paimon.hive.clone.HivePartitionFiles;
-import org.apache.paimon.partition.PartitionPredicate;
-import org.apache.paimon.types.RowType;
-
-import org.apache.flink.configuration.Configuration;
-import org.apache.hadoop.hive.metastore.api.Table;
-import org.apache.hudi.common.model.HoodieTableType;
-import org.apache.hudi.table.catalog.TableOptionProperties;
-
-import javax.annotation.Nullable;
-
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-
-import static org.apache.hudi.configuration.FlinkOptions.TABLE_TYPE;
-import static org.apache.paimon.utils.Preconditions.checkArgument;
-
-/**
- * Utils for cloning Hudi tables. Currently, only support COW Hudi tables in 
Hive metastore, and it
- * will be cloned to unaware-bucket table.
- */
-public class HudiCloneUtils {
-
-    public static boolean isHoodieTable(Table hiveTable) {
-        return hiveTable
-                .getParameters()
-                .getOrDefault(TableOptionProperties.SPARK_SOURCE_PROVIDER, "")
-                .equalsIgnoreCase("hudi");
-    }
-
-    public static List<HivePartitionFiles> listFiles(
-            Table hudiHiveTable,
-            FileIO fileIO,
-            RowType partitionType,
-            @Nullable PartitionPredicate partitionPredicate) {
-        Map<String, String> options = hudiHiveTable.getParameters();
-        checkTableType(options);
-
-        String location = hudiHiveTable.getSd().getLocation();
-        FileIndex fileIndex = new FileIndex(location, options, partitionType, 
partitionPredicate);
-
-        if (fileIndex.isPartitioned()) {
-            return fileIndex.getAllFilteredPartitionFiles(fileIO);
-        } else {
-            return 
Collections.singletonList(fileIndex.getUnpartitionedFiles(fileIO));
-        }
-    }
-
-    private static void checkTableType(Map<String, String> conf) {
-        String type = Configuration.fromMap(conf).get(TABLE_TYPE);
-        checkArgument(
-                HoodieTableType.valueOf(type) == HoodieTableType.COPY_ON_WRITE,
-                "Only Hudi COW table is supported yet but found %s table.",
-                type);
-    }
-}
diff --git a/paimon-hive/paimon-hive-connector-common/pom.xml 
b/paimon-hive/paimon-hive-connector-common/pom.xml
index bdb1e5781f..f7d775761c 100644
--- a/paimon-hive/paimon-hive-connector-common/pom.xml
+++ b/paimon-hive/paimon-hive-connector-common/pom.xml
@@ -567,6 +567,13 @@ under the License.
         </dependency>
 
         <!-- for huid clone test -->
+        <dependency>
+            <groupId>org.apache.paimon</groupId>
+            <artifactId>paimon-hudi</artifactId>
+            <version>${project.version}</version>
+            <scope>test</scope>
+        </dependency>
+
         <dependency>
             <groupId>org.apache.hudi</groupId>
             <artifactId>hudi-flink1.18-bundle</artifactId>
diff --git a/paimon-hudi/pom.xml b/paimon-hudi/pom.xml
new file mode 100644
index 0000000000..836f1ca620
--- /dev/null
+++ b/paimon-hudi/pom.xml
@@ -0,0 +1,70 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0";
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+    <modelVersion>4.0.0</modelVersion>
+
+    <parent>
+        <artifactId>paimon-parent</artifactId>
+        <groupId>org.apache.paimon</groupId>
+        <version>1.2-SNAPSHOT</version>
+    </parent>
+
+    <artifactId>paimon-hudi</artifactId>
+    <name>Paimon : Hudi</name>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.paimon</groupId>
+            <artifactId>paimon-core</artifactId>
+            <version>${project.version}</version>
+            <scope>provided</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.paimon</groupId>
+            <artifactId>paimon-hive-catalog</artifactId>
+            <version>${project.version}</version>
+            <scope>provided</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.hudi</groupId>
+            <artifactId>hudi-common</artifactId>
+            <version>${hudi.version}</version>
+            <scope>provided</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.hudi</groupId>
+            <artifactId>hudi-flink</artifactId>
+            <version>${hudi.version}</version>
+            <scope>provided</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-core</artifactId>
+            <version>${paimon-flink-common.flink.version}</version>
+            <scope>provided</scope>
+        </dependency>
+    </dependencies>
+</project>
diff --git 
a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hudi/FileIndex.java
 b/paimon-hudi/src/main/java/org/apache/paimon/hudi/HudiFileIndex.java
similarity index 98%
rename from 
paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hudi/FileIndex.java
rename to paimon-hudi/src/main/java/org/apache/paimon/hudi/HudiFileIndex.java
index 7dbdbafc28..de0c93b50c 100644
--- 
a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hudi/FileIndex.java
+++ b/paimon-hudi/src/main/java/org/apache/paimon/hudi/HudiFileIndex.java
@@ -63,11 +63,10 @@ import static 
org.apache.paimon.utils.Preconditions.checkState;
  *
  * @see org.apache.hudi.source.FileIndex
  */
-public class FileIndex {
+public class HudiFileIndex {
 
     private final Path path;
     private final HoodieMetadataConfig metadataConfig;
-    private final Configuration hadoopConf;
     private final RowType partitionType;
     @Nullable private final PartitionPredicate partitionPredicate;
     private final HoodieEngineContext engineContext;
@@ -77,14 +76,14 @@ public class FileIndex {
 
     private List<String> partitionPaths;
 
-    public FileIndex(
+    public HudiFileIndex(
             String location,
             Map<String, String> conf,
             RowType partitionType,
             @Nullable PartitionPredicate partitionPredicate) {
         this.path = new Path(location);
         this.metadataConfig = metadataConfig(conf);
-        this.hadoopConf =
+        Configuration hadoopConf =
                 HadoopConfigurations.getHadoopConf(
                         
org.apache.flink.configuration.Configuration.fromMap(conf));
         this.partitionType = partitionType;
diff --git 
a/paimon-hudi/src/main/java/org/apache/paimon/hudi/HudiHiveCloneExtractor.java 
b/paimon-hudi/src/main/java/org/apache/paimon/hudi/HudiHiveCloneExtractor.java
new file mode 100644
index 0000000000..b83a493b18
--- /dev/null
+++ 
b/paimon-hudi/src/main/java/org/apache/paimon/hudi/HudiHiveCloneExtractor.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.hudi;
+
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.hive.clone.HiveCloneExtractor;
+import org.apache.paimon.hive.clone.HivePartitionFiles;
+import org.apache.paimon.hive.clone.HiveTableCloneExtractor;
+import org.apache.paimon.partition.PartitionPredicate;
+import org.apache.paimon.types.RowType;
+
+import org.apache.hadoop.hive.metastore.IMetaStoreClient;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieTableType;
+
+import javax.annotation.Nullable;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static org.apache.paimon.utils.Preconditions.checkArgument;
+
+/** A {@link HiveCloneExtractor} for Hudi tables. */
+public class HudiHiveCloneExtractor extends HiveTableCloneExtractor {
+
+    @Override
+    public boolean matches(Table table) {
+        return table.getParameters()
+                .getOrDefault("spark.sql.sources.provider", "")
+                .equalsIgnoreCase("hudi");
+    }
+
+    @Override
+    public List<FieldSchema> extractSchema(
+            IMetaStoreClient client, Table hiveTable, String database, String 
table)
+            throws Exception {
+        List<FieldSchema> fields = client.getSchema(database, table);
+        Set<String> hudiMetadataFields =
+                Arrays.stream(HoodieRecord.HoodieMetadataField.values())
+                        .map(HoodieRecord.HoodieMetadataField::getFieldName)
+                        .collect(Collectors.toSet());
+        return fields.stream()
+                .filter(f -> !hudiMetadataFields.contains(f.getName()))
+                .collect(Collectors.toList());
+    }
+
+    @Override
+    public List<HivePartitionFiles> extractFiles(
+            IMetaStoreClient client,
+            Table table,
+            FileIO fileIO,
+            Identifier identifier,
+            RowType partitionRowType,
+            String defaultPartitionName,
+            @Nullable PartitionPredicate predicate) {
+        Map<String, String> options = table.getParameters();
+        checkTableType(options);
+
+        String location = table.getSd().getLocation();
+        HudiFileIndex fileIndex = new HudiFileIndex(location, options, 
partitionRowType, predicate);
+
+        if (fileIndex.isPartitioned()) {
+            return fileIndex.getAllFilteredPartitionFiles(fileIO);
+        } else {
+            return 
Collections.singletonList(fileIndex.getUnpartitionedFiles(fileIO));
+        }
+    }
+
+    private static void checkTableType(Map<String, String> conf) {
+        String type = conf.getOrDefault("table.type", 
HoodieTableType.COPY_ON_WRITE.name());
+        checkArgument(
+                HoodieTableType.valueOf(type) == HoodieTableType.COPY_ON_WRITE,
+                "Only Hudi COW table is supported yet but found %s table.",
+                type);
+    }
+}
diff --git 
a/paimon-hudi/src/main/resources/META-INF/services/org.apache.paimon.hive.clone.HiveCloneExtractor
 
b/paimon-hudi/src/main/resources/META-INF/services/org.apache.paimon.hive.clone.HiveCloneExtractor
new file mode 100644
index 0000000000..06d88ce0bb
--- /dev/null
+++ 
b/paimon-hudi/src/main/resources/META-INF/services/org.apache.paimon.hive.clone.HiveCloneExtractor
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+org.apache.paimon.hudi.HudiHiveCloneExtractor
diff --git a/pom.xml b/pom.xml
index 82b472d699..f43193dfe5 100644
--- a/pom.xml
+++ b/pom.xml
@@ -70,6 +70,7 @@ under the License.
         <module>paimon-arrow</module>
         <module>tools/ci/paimon-ci-tools</module>
         <module>paimon-open-api</module>
+        <module>paimon-hudi</module>
     </modules>
 
     <properties>


Reply via email to