This is an automated email from the ASF dual-hosted git repository. lzljs3620320 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/paimon.git
commit f4b8e3dd96b49304f1c576ac7e4549efa7a3de25 Author: JingsongLi <[email protected]> AuthorDate: Mon May 12 12:06:36 2025 +0800 [hudi] Extract paimon hudi module --- paimon-hive/paimon-hive-catalog/pom.xml | 29 --- .../paimon/hive/clone/HiveCloneExtractor.java | 71 +++++++ .../apache/paimon/hive/clone/HiveCloneUtils.java | 185 ++---------------- ...loneUtils.java => HiveTableCloneExtractor.java} | 208 +++++---------------- .../org/apache/paimon/hudi/HudiCloneUtils.java | 78 -------- paimon-hive/paimon-hive-connector-common/pom.xml | 7 + paimon-hudi/pom.xml | 70 +++++++ .../java/org/apache/paimon/hudi/HudiFileIndex.java | 7 +- .../apache/paimon/hudi/HudiHiveCloneExtractor.java | 99 ++++++++++ ...org.apache.paimon.hive.clone.HiveCloneExtractor | 16 ++ pom.xml | 1 + 11 files changed, 331 insertions(+), 440 deletions(-) diff --git a/paimon-hive/paimon-hive-catalog/pom.xml b/paimon-hive/paimon-hive-catalog/pom.xml index fc94856633..5c49a76f1d 100644 --- a/paimon-hive/paimon-hive-catalog/pom.xml +++ b/paimon-hive/paimon-hive-catalog/pom.xml @@ -112,35 +112,6 @@ under the License. </exclusions> </dependency> - <!-- for hudi clone --> - <dependency> - <groupId>org.apache.hudi</groupId> - <artifactId>hudi-common</artifactId> - <version>${hudi.version}</version> - <scope>provided</scope> - </dependency> - - <dependency> - <groupId>org.apache.hudi</groupId> - <artifactId>hudi-flink</artifactId> - <version>${hudi.version}</version> - <scope>provided</scope> - </dependency> - - <dependency> - <groupId>org.apache.flink</groupId> - <artifactId>flink-core</artifactId> - <version>${paimon-flink-common.flink.version}</version> - <scope>provided</scope> - </dependency> - - <dependency> - <groupId>org.apache.flink</groupId> - <artifactId>flink-table-common</artifactId> - <version>${paimon-flink-common.flink.version}</version> - <scope>provided</scope> - </dependency> - <!-- Test --> <dependency> diff --git a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/clone/HiveCloneExtractor.java b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/clone/HiveCloneExtractor.java new file mode 100644 index 0000000000..a223db56e3 --- /dev/null +++ b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/clone/HiveCloneExtractor.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.hive.clone; + +import org.apache.paimon.catalog.Identifier; +import org.apache.paimon.factories.FactoryUtil; +import org.apache.paimon.fs.FileIO; +import org.apache.paimon.partition.PartitionPredicate; +import org.apache.paimon.types.RowType; + +import org.apache.hadoop.hive.metastore.IMetaStoreClient; +import org.apache.hadoop.hive.metastore.api.FieldSchema; +import org.apache.hadoop.hive.metastore.api.Table; + +import javax.annotation.Nullable; + +import java.util.List; +import java.util.Map; + +/** An interface for hive clone schema and files extractor. */ +public interface HiveCloneExtractor { + + boolean matches(Table table); + + List<FieldSchema> extractSchema( + IMetaStoreClient client, Table hiveTable, String database, String table) + throws Exception; + + List<HivePartitionFiles> extractFiles( + IMetaStoreClient client, + Table table, + FileIO fileIO, + Identifier identifier, + RowType partitionRowType, + String defaultPartitionName, + @Nullable PartitionPredicate predicate) + throws Exception; + + List<String> extractPartitionKeys(Table table); + + Map<String, String> extractOptions(Table table); + + List<HiveCloneExtractor> EXTRACTORS = + FactoryUtil.discoverFactories( + HiveCloneExtractor.class.getClassLoader(), HiveCloneExtractor.class); + + static HiveCloneExtractor getExtractor(Table table) { + for (HiveCloneExtractor extractor : EXTRACTORS) { + if (extractor.matches(table)) { + return extractor; + } + } + return HiveTableCloneExtractor.INSTANCE; + } +} diff --git a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/clone/HiveCloneUtils.java b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/clone/HiveCloneUtils.java index b3e53cd8d3..e1326ede41 100644 --- a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/clone/HiveCloneUtils.java +++ b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/clone/HiveCloneUtils.java @@ -19,14 +19,8 @@ package org.apache.paimon.hive.clone; import org.apache.paimon.catalog.Identifier; -import org.apache.paimon.data.BinaryRow; -import org.apache.paimon.data.BinaryWriter; -import org.apache.paimon.fs.FileIO; import org.apache.paimon.fs.FileStatus; -import org.apache.paimon.fs.Path; import org.apache.paimon.hive.HiveCatalog; -import org.apache.paimon.hudi.HudiCloneUtils; -import org.apache.paimon.migrate.FileMetaUtils; import org.apache.paimon.partition.PartitionPredicate; import org.apache.paimon.schema.Schema; import org.apache.paimon.types.RowType; @@ -38,26 +32,17 @@ import org.apache.hadoop.hive.metastore.api.Partition; import org.apache.hadoop.hive.metastore.api.PrimaryKeysRequest; import org.apache.hadoop.hive.metastore.api.StorageDescriptor; import org.apache.hadoop.hive.metastore.api.Table; -import org.apache.hudi.common.model.HoodieRecord; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import javax.annotation.Nullable; -import java.io.IOException; import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.Objects; -import java.util.Set; import java.util.function.Predicate; -import java.util.stream.Collectors; -import static org.apache.paimon.CoreOptions.FILE_COMPRESSION; -import static org.apache.paimon.CoreOptions.FILE_FORMAT; import static org.apache.paimon.hive.HiveTypeUtils.toPaimonType; /** Utils for cloning Hive table to Paimon table. */ @@ -118,38 +103,15 @@ public class HiveCloneUtils { } Table hiveTable = client.getTable(database, table); - List<FieldSchema> fields = getHiveSchema(client, hiveTable, database, table); - List<FieldSchema> partitionFields = hiveTable.getPartitionKeys(); - Map<String, String> hiveTableOptions = hiveTable.getParameters(); - - Map<String, String> paimonOptions = new HashMap<>(); - // for compatible with hive comment system - if (hiveTableOptions.get("comment") != null) { - paimonOptions.put("hive.comment", hiveTableOptions.get("comment")); - } - - String format = parseFormat(hiveTable); - paimonOptions.put(FILE_FORMAT.key(), format); - Map<String, String> formatOptions = getIdentifierPrefixOptions(format, hiveTableOptions); - Map<String, String> sdFormatOptions = - getIdentifierPrefixOptions( - format, hiveTable.getSd().getSerdeInfo().getParameters()); - formatOptions.putAll(sdFormatOptions); - paimonOptions.putAll(formatOptions); - - String compression = parseCompression(hiveTable, format, formatOptions); - if (compression != null) { - paimonOptions.put(FILE_COMPRESSION.key(), compression); - } - + HiveCloneExtractor extractor = HiveCloneExtractor.getExtractor(hiveTable); + List<FieldSchema> fields = extractor.extractSchema(client, hiveTable, database, table); + List<String> partitionKeys = extractor.extractPartitionKeys(hiveTable); + Map<String, String> options = extractor.extractOptions(hiveTable); Schema.Builder schemaBuilder = Schema.newBuilder() - .comment(hiveTableOptions.get("comment")) - .options(paimonOptions) - .partitionKeys( - partitionFields.stream() - .map(FieldSchema::getName) - .collect(Collectors.toList())); + .comment(options.get("comment")) + .options(options) + .partitionKeys(partitionKeys); fields.forEach( field -> @@ -161,23 +123,6 @@ public class HiveCloneUtils { return schemaBuilder.build(); } - private static List<FieldSchema> getHiveSchema( - IMetaStoreClient client, Table hiveTable, String database, String table) - throws Exception { - List<FieldSchema> fields = client.getSchema(database, table); - if (!HudiCloneUtils.isHoodieTable(hiveTable)) { - return fields; - } - - Set<String> hudiMetadataFields = - Arrays.stream(HoodieRecord.HoodieMetadataField.values()) - .map(HoodieRecord.HoodieMetadataField::getFieldName) - .collect(Collectors.toSet()); - return fields.stream() - .filter(f -> !hudiMetadataFields.contains(f.getName())) - .collect(Collectors.toList()); - } - public static List<HivePartitionFiles> listFiles( HiveCatalog hiveCatalog, Identifier identifier, @@ -188,77 +133,15 @@ public class HiveCloneUtils { IMetaStoreClient client = hiveCatalog.getHmsClient(); Table sourceTable = client.getTable(identifier.getDatabaseName(), identifier.getTableName()); - - if (HudiCloneUtils.isHoodieTable(sourceTable)) { - return HudiCloneUtils.listFiles( - sourceTable, hiveCatalog.fileIO(), partitionRowType, predicate); - } else { - return listFromPureHiveTable( - client, - identifier, - sourceTable, - hiveCatalog.fileIO(), - partitionRowType, - defaultPartitionName, - predicate); - } - } - - private static List<HivePartitionFiles> listFromPureHiveTable( - IMetaStoreClient client, - Identifier identifier, - Table sourceTable, - FileIO fileIO, - RowType partitionRowType, - String defaultPartitionName, - @Nullable PartitionPredicate predicate) - throws Exception { - List<Partition> partitions = - client.listPartitions( - identifier.getDatabaseName(), identifier.getTableName(), Short.MAX_VALUE); - String format = parseFormat(sourceTable); - - if (partitions.isEmpty()) { - String location = sourceTable.getSd().getLocation(); - return Collections.singletonList( - listFiles(fileIO, location, BinaryRow.EMPTY_ROW, format)); - } else { - List<BinaryWriter.ValueSetter> valueSetters = new ArrayList<>(); - partitionRowType - .getFieldTypes() - .forEach(type -> valueSetters.add(BinaryWriter.createValueSetter(type))); - List<HivePartitionFiles> results = new ArrayList<>(); - for (Partition partition : partitions) { - List<String> partitionValues = partition.getValues(); - String location = partition.getSd().getLocation(); - BinaryRow partitionRow = - FileMetaUtils.writePartitionValue( - partitionRowType, - partitionValues, - valueSetters, - defaultPartitionName); - if (predicate == null || predicate.test(partitionRow)) { - results.add(listFiles(fileIO, location, partitionRow, format)); - } - } - return results; - } - } - - private static HivePartitionFiles listFiles( - FileIO fileIO, String location, BinaryRow partition, String format) throws IOException { - List<FileStatus> fileStatuses = - Arrays.stream(fileIO.listStatus(new Path(location))) - .filter(s -> !s.isDir()) - .filter(HIDDEN_PATH_FILTER) - .collect(Collectors.toList()); - List<Path> paths = new ArrayList<>(); - List<Long> fileSizes = new ArrayList<>(); - for (FileStatus fileStatus : fileStatuses) { - paths.add(fileStatus.getPath()); - fileSizes.add(fileStatus.getLen()); - } - return new HivePartitionFiles(partition, paths, fileSizes, format); + return HiveCloneExtractor.getExtractor(sourceTable) + .extractFiles( + hiveCatalog.getHmsClient(), + sourceTable, + hiveCatalog.fileIO(), + identifier, + partitionRowType, + defaultPartitionName, + predicate); } private static String parseFormat(StorageDescriptor storageDescriptor) { @@ -288,40 +171,4 @@ public class HiveCloneUtils { } return format; } - - private static String parseCompression(StorageDescriptor storageDescriptor) { - Map<String, String> serderParams = storageDescriptor.getSerdeInfo().getParameters(); - if (serderParams.containsKey("compression")) { - return serderParams.get("compression"); - } - return null; - } - - private static String parseCompression( - Table table, String format, Map<String, String> formatOptions) { - String compression = null; - if (Objects.equals(format, "avro")) { - compression = formatOptions.getOrDefault("avro.codec", parseCompression(table.getSd())); - } else if (Objects.equals(format, "parquet")) { - compression = - formatOptions.getOrDefault( - "parquet.compression", parseCompression(table.getSd())); - } else if (Objects.equals(format, "orc")) { - compression = - formatOptions.getOrDefault("orc.compress", parseCompression(table.getSd())); - } - return compression; - } - - public static Map<String, String> getIdentifierPrefixOptions( - String formatIdentifier, Map<String, String> options) { - Map<String, String> result = new HashMap<>(); - String prefix = formatIdentifier.toLowerCase() + "."; - for (String key : options.keySet()) { - if (key.toLowerCase().startsWith(prefix)) { - result.put(prefix + key.substring(prefix.length()), options.get(key)); - } - } - return result; - } } diff --git a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/clone/HiveCloneUtils.java b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/clone/HiveTableCloneExtractor.java similarity index 50% copy from paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/clone/HiveCloneUtils.java copy to paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/clone/HiveTableCloneExtractor.java index b3e53cd8d3..0b503b0086 100644 --- a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/clone/HiveCloneUtils.java +++ b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/clone/HiveTableCloneExtractor.java @@ -24,23 +24,15 @@ import org.apache.paimon.data.BinaryWriter; import org.apache.paimon.fs.FileIO; import org.apache.paimon.fs.FileStatus; import org.apache.paimon.fs.Path; -import org.apache.paimon.hive.HiveCatalog; -import org.apache.paimon.hudi.HudiCloneUtils; import org.apache.paimon.migrate.FileMetaUtils; import org.apache.paimon.partition.PartitionPredicate; -import org.apache.paimon.schema.Schema; import org.apache.paimon.types.RowType; import org.apache.hadoop.hive.metastore.IMetaStoreClient; -import org.apache.hadoop.hive.metastore.api.Database; import org.apache.hadoop.hive.metastore.api.FieldSchema; import org.apache.hadoop.hive.metastore.api.Partition; -import org.apache.hadoop.hive.metastore.api.PrimaryKeysRequest; import org.apache.hadoop.hive.metastore.api.StorageDescriptor; import org.apache.hadoop.hive.metastore.api.Table; -import org.apache.hudi.common.model.HoodieRecord; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import javax.annotation.Nullable; @@ -52,156 +44,80 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Objects; -import java.util.Set; -import java.util.function.Predicate; import java.util.stream.Collectors; import static org.apache.paimon.CoreOptions.FILE_COMPRESSION; import static org.apache.paimon.CoreOptions.FILE_FORMAT; -import static org.apache.paimon.hive.HiveTypeUtils.toPaimonType; +import static org.apache.paimon.hive.clone.HiveCloneUtils.HIDDEN_PATH_FILTER; +import static org.apache.paimon.hive.clone.HiveCloneUtils.parseFormat; -/** Utils for cloning Hive table to Paimon table. */ -public class HiveCloneUtils { +/** A {@link HiveCloneExtractor} for hive tables. */ +public class HiveTableCloneExtractor implements HiveCloneExtractor { - private static final Logger LOG = LoggerFactory.getLogger(HiveCloneUtils.class); + public static final HiveTableCloneExtractor INSTANCE = new HiveTableCloneExtractor(); - public static final Predicate<FileStatus> HIDDEN_PATH_FILTER = - p -> !p.getPath().getName().startsWith("_") && !p.getPath().getName().startsWith("."); - - public static Map<String, String> getDatabaseOptions( - HiveCatalog hiveCatalog, String databaseName) throws Exception { - IMetaStoreClient client = hiveCatalog.getHmsClient(); - Database database = client.getDatabase(databaseName); - Map<String, String> paimonOptions = new HashMap<>(); - if (database.getDescription() != null) { - paimonOptions.put("comment", database.getDescription()); - } - return paimonOptions; - } - - public static List<Identifier> listTables(HiveCatalog hiveCatalog) throws Exception { - IMetaStoreClient client = hiveCatalog.getHmsClient(); - List<Identifier> results = new ArrayList<>(); - for (String database : client.getAllDatabases()) { - for (String table : client.getAllTables(database)) { - results.add(Identifier.create(database, table)); - } - } - return results; + @Override + public boolean matches(Table table) { + return true; } - public static List<Identifier> listTables(HiveCatalog hiveCatalog, String database) + @Override + public List<FieldSchema> extractSchema( + IMetaStoreClient client, Table hiveTable, String database, String table) throws Exception { - IMetaStoreClient client = hiveCatalog.getHmsClient(); - List<Identifier> results = new ArrayList<>(); - for (String table : client.getAllTables(database)) { - results.add(Identifier.create(database, table)); - } - return results; + return client.getSchema(database, table); } - public static Schema hiveTableToPaimonSchema(HiveCatalog hiveCatalog, Identifier identifier) + @Override + public List<HivePartitionFiles> extractFiles( + IMetaStoreClient client, + Table table, + FileIO fileIO, + Identifier identifier, + RowType partitionRowType, + String defaultPartitionName, + @Nullable PartitionPredicate predicate) throws Exception { - String database = identifier.getDatabaseName(); - String table = identifier.getObjectName(); - - IMetaStoreClient client = hiveCatalog.getHmsClient(); - // check primary key - PrimaryKeysRequest primaryKeysRequest = new PrimaryKeysRequest(database, table); - try { - if (!client.getPrimaryKeys(primaryKeysRequest).isEmpty()) { - throw new IllegalArgumentException("Can't migrate primary key table yet."); - } - } catch (Exception e) { - LOG.warn( - "Your Hive version is low which not support get_primary_keys, skip primary key check firstly!"); - } + return listFromPureHiveTable( + client, + identifier, + table, + fileIO, + partitionRowType, + defaultPartitionName, + predicate); + } - Table hiveTable = client.getTable(database, table); - List<FieldSchema> fields = getHiveSchema(client, hiveTable, database, table); - List<FieldSchema> partitionFields = hiveTable.getPartitionKeys(); - Map<String, String> hiveTableOptions = hiveTable.getParameters(); + @Override + public List<String> extractPartitionKeys(Table table) { + return table.getPartitionKeys().stream() + .map(FieldSchema::getName) + .collect(Collectors.toList()); + } + @Override + public Map<String, String> extractOptions(Table table) { + Map<String, String> hiveTableOptions = table.getParameters(); Map<String, String> paimonOptions = new HashMap<>(); - // for compatible with hive comment system - if (hiveTableOptions.get("comment") != null) { - paimonOptions.put("hive.comment", hiveTableOptions.get("comment")); + String comment = hiveTableOptions.get("comment"); + if (comment != null) { + paimonOptions.put("hive.comment", comment); + paimonOptions.put("comment", comment); } - String format = parseFormat(hiveTable); + String format = parseFormat(table); paimonOptions.put(FILE_FORMAT.key(), format); Map<String, String> formatOptions = getIdentifierPrefixOptions(format, hiveTableOptions); Map<String, String> sdFormatOptions = - getIdentifierPrefixOptions( - format, hiveTable.getSd().getSerdeInfo().getParameters()); + getIdentifierPrefixOptions(format, table.getSd().getSerdeInfo().getParameters()); formatOptions.putAll(sdFormatOptions); paimonOptions.putAll(formatOptions); - String compression = parseCompression(hiveTable, format, formatOptions); + String compression = parseCompression(table, format, formatOptions); if (compression != null) { paimonOptions.put(FILE_COMPRESSION.key(), compression); } - - Schema.Builder schemaBuilder = - Schema.newBuilder() - .comment(hiveTableOptions.get("comment")) - .options(paimonOptions) - .partitionKeys( - partitionFields.stream() - .map(FieldSchema::getName) - .collect(Collectors.toList())); - - fields.forEach( - field -> - schemaBuilder.column( - field.getName(), - toPaimonType(field.getType()), - field.getComment())); - - return schemaBuilder.build(); - } - - private static List<FieldSchema> getHiveSchema( - IMetaStoreClient client, Table hiveTable, String database, String table) - throws Exception { - List<FieldSchema> fields = client.getSchema(database, table); - if (!HudiCloneUtils.isHoodieTable(hiveTable)) { - return fields; - } - - Set<String> hudiMetadataFields = - Arrays.stream(HoodieRecord.HoodieMetadataField.values()) - .map(HoodieRecord.HoodieMetadataField::getFieldName) - .collect(Collectors.toSet()); - return fields.stream() - .filter(f -> !hudiMetadataFields.contains(f.getName())) - .collect(Collectors.toList()); - } - - public static List<HivePartitionFiles> listFiles( - HiveCatalog hiveCatalog, - Identifier identifier, - RowType partitionRowType, - String defaultPartitionName, - @Nullable PartitionPredicate predicate) - throws Exception { - IMetaStoreClient client = hiveCatalog.getHmsClient(); - Table sourceTable = - client.getTable(identifier.getDatabaseName(), identifier.getTableName()); - - if (HudiCloneUtils.isHoodieTable(sourceTable)) { - return HudiCloneUtils.listFiles( - sourceTable, hiveCatalog.fileIO(), partitionRowType, predicate); - } else { - return listFromPureHiveTable( - client, - identifier, - sourceTable, - hiveCatalog.fileIO(), - partitionRowType, - defaultPartitionName, - predicate); - } + return paimonOptions; } private static List<HivePartitionFiles> listFromPureHiveTable( @@ -211,7 +127,7 @@ public class HiveCloneUtils { FileIO fileIO, RowType partitionRowType, String defaultPartitionName, - @Nullable PartitionPredicate predicate) + @javax.annotation.Nullable PartitionPredicate predicate) throws Exception { List<Partition> partitions = client.listPartitions( @@ -261,34 +177,6 @@ public class HiveCloneUtils { return new HivePartitionFiles(partition, paths, fileSizes, format); } - private static String parseFormat(StorageDescriptor storageDescriptor) { - String serder = storageDescriptor.getSerdeInfo().toString(); - if (serder.contains("avro")) { - return "avro"; - } else if (serder.contains("parquet")) { - return "parquet"; - } else if (serder.contains("orc")) { - return "orc"; - } - return null; - } - - public static String parseFormat(Table table) { - String format = parseFormat(table.getSd()); - if (format == null) { - throw new UnsupportedOperationException("Unknown table format:" + table); - } - return format; - } - - public static String parseFormat(Partition partition) { - String format = parseFormat(partition.getSd()); - if (format == null) { - throw new UnsupportedOperationException("Unknown partition format: " + partition); - } - return format; - } - private static String parseCompression(StorageDescriptor storageDescriptor) { Map<String, String> serderParams = storageDescriptor.getSerdeInfo().getParameters(); if (serderParams.containsKey("compression")) { diff --git a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hudi/HudiCloneUtils.java b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hudi/HudiCloneUtils.java deleted file mode 100644 index 2ab091411a..0000000000 --- a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hudi/HudiCloneUtils.java +++ /dev/null @@ -1,78 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.paimon.hudi; - -import org.apache.paimon.fs.FileIO; -import org.apache.paimon.hive.clone.HivePartitionFiles; -import org.apache.paimon.partition.PartitionPredicate; -import org.apache.paimon.types.RowType; - -import org.apache.flink.configuration.Configuration; -import org.apache.hadoop.hive.metastore.api.Table; -import org.apache.hudi.common.model.HoodieTableType; -import org.apache.hudi.table.catalog.TableOptionProperties; - -import javax.annotation.Nullable; - -import java.util.Collections; -import java.util.List; -import java.util.Map; - -import static org.apache.hudi.configuration.FlinkOptions.TABLE_TYPE; -import static org.apache.paimon.utils.Preconditions.checkArgument; - -/** - * Utils for cloning Hudi tables. Currently, only support COW Hudi tables in Hive metastore, and it - * will be cloned to unaware-bucket table. - */ -public class HudiCloneUtils { - - public static boolean isHoodieTable(Table hiveTable) { - return hiveTable - .getParameters() - .getOrDefault(TableOptionProperties.SPARK_SOURCE_PROVIDER, "") - .equalsIgnoreCase("hudi"); - } - - public static List<HivePartitionFiles> listFiles( - Table hudiHiveTable, - FileIO fileIO, - RowType partitionType, - @Nullable PartitionPredicate partitionPredicate) { - Map<String, String> options = hudiHiveTable.getParameters(); - checkTableType(options); - - String location = hudiHiveTable.getSd().getLocation(); - FileIndex fileIndex = new FileIndex(location, options, partitionType, partitionPredicate); - - if (fileIndex.isPartitioned()) { - return fileIndex.getAllFilteredPartitionFiles(fileIO); - } else { - return Collections.singletonList(fileIndex.getUnpartitionedFiles(fileIO)); - } - } - - private static void checkTableType(Map<String, String> conf) { - String type = Configuration.fromMap(conf).get(TABLE_TYPE); - checkArgument( - HoodieTableType.valueOf(type) == HoodieTableType.COPY_ON_WRITE, - "Only Hudi COW table is supported yet but found %s table.", - type); - } -} diff --git a/paimon-hive/paimon-hive-connector-common/pom.xml b/paimon-hive/paimon-hive-connector-common/pom.xml index bdb1e5781f..f7d775761c 100644 --- a/paimon-hive/paimon-hive-connector-common/pom.xml +++ b/paimon-hive/paimon-hive-connector-common/pom.xml @@ -567,6 +567,13 @@ under the License. </dependency> <!-- for huid clone test --> + <dependency> + <groupId>org.apache.paimon</groupId> + <artifactId>paimon-hudi</artifactId> + <version>${project.version}</version> + <scope>test</scope> + </dependency> + <dependency> <groupId>org.apache.hudi</groupId> <artifactId>hudi-flink1.18-bundle</artifactId> diff --git a/paimon-hudi/pom.xml b/paimon-hudi/pom.xml new file mode 100644 index 0000000000..836f1ca620 --- /dev/null +++ b/paimon-hudi/pom.xml @@ -0,0 +1,70 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + + <parent> + <artifactId>paimon-parent</artifactId> + <groupId>org.apache.paimon</groupId> + <version>1.2-SNAPSHOT</version> + </parent> + + <artifactId>paimon-hudi</artifactId> + <name>Paimon : Hudi</name> + + <dependencies> + <dependency> + <groupId>org.apache.paimon</groupId> + <artifactId>paimon-core</artifactId> + <version>${project.version}</version> + <scope>provided</scope> + </dependency> + + <dependency> + <groupId>org.apache.paimon</groupId> + <artifactId>paimon-hive-catalog</artifactId> + <version>${project.version}</version> + <scope>provided</scope> + </dependency> + + <dependency> + <groupId>org.apache.hudi</groupId> + <artifactId>hudi-common</artifactId> + <version>${hudi.version}</version> + <scope>provided</scope> + </dependency> + + <dependency> + <groupId>org.apache.hudi</groupId> + <artifactId>hudi-flink</artifactId> + <version>${hudi.version}</version> + <scope>provided</scope> + </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-core</artifactId> + <version>${paimon-flink-common.flink.version}</version> + <scope>provided</scope> + </dependency> + </dependencies> +</project> diff --git a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hudi/FileIndex.java b/paimon-hudi/src/main/java/org/apache/paimon/hudi/HudiFileIndex.java similarity index 98% rename from paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hudi/FileIndex.java rename to paimon-hudi/src/main/java/org/apache/paimon/hudi/HudiFileIndex.java index 7dbdbafc28..de0c93b50c 100644 --- a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hudi/FileIndex.java +++ b/paimon-hudi/src/main/java/org/apache/paimon/hudi/HudiFileIndex.java @@ -63,11 +63,10 @@ import static org.apache.paimon.utils.Preconditions.checkState; * * @see org.apache.hudi.source.FileIndex */ -public class FileIndex { +public class HudiFileIndex { private final Path path; private final HoodieMetadataConfig metadataConfig; - private final Configuration hadoopConf; private final RowType partitionType; @Nullable private final PartitionPredicate partitionPredicate; private final HoodieEngineContext engineContext; @@ -77,14 +76,14 @@ public class FileIndex { private List<String> partitionPaths; - public FileIndex( + public HudiFileIndex( String location, Map<String, String> conf, RowType partitionType, @Nullable PartitionPredicate partitionPredicate) { this.path = new Path(location); this.metadataConfig = metadataConfig(conf); - this.hadoopConf = + Configuration hadoopConf = HadoopConfigurations.getHadoopConf( org.apache.flink.configuration.Configuration.fromMap(conf)); this.partitionType = partitionType; diff --git a/paimon-hudi/src/main/java/org/apache/paimon/hudi/HudiHiveCloneExtractor.java b/paimon-hudi/src/main/java/org/apache/paimon/hudi/HudiHiveCloneExtractor.java new file mode 100644 index 0000000000..b83a493b18 --- /dev/null +++ b/paimon-hudi/src/main/java/org/apache/paimon/hudi/HudiHiveCloneExtractor.java @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.hudi; + +import org.apache.paimon.catalog.Identifier; +import org.apache.paimon.fs.FileIO; +import org.apache.paimon.hive.clone.HiveCloneExtractor; +import org.apache.paimon.hive.clone.HivePartitionFiles; +import org.apache.paimon.hive.clone.HiveTableCloneExtractor; +import org.apache.paimon.partition.PartitionPredicate; +import org.apache.paimon.types.RowType; + +import org.apache.hadoop.hive.metastore.IMetaStoreClient; +import org.apache.hadoop.hive.metastore.api.FieldSchema; +import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.model.HoodieTableType; + +import javax.annotation.Nullable; + +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; + +import static org.apache.paimon.utils.Preconditions.checkArgument; + +/** A {@link HiveCloneExtractor} for Hudi tables. */ +public class HudiHiveCloneExtractor extends HiveTableCloneExtractor { + + @Override + public boolean matches(Table table) { + return table.getParameters() + .getOrDefault("spark.sql.sources.provider", "") + .equalsIgnoreCase("hudi"); + } + + @Override + public List<FieldSchema> extractSchema( + IMetaStoreClient client, Table hiveTable, String database, String table) + throws Exception { + List<FieldSchema> fields = client.getSchema(database, table); + Set<String> hudiMetadataFields = + Arrays.stream(HoodieRecord.HoodieMetadataField.values()) + .map(HoodieRecord.HoodieMetadataField::getFieldName) + .collect(Collectors.toSet()); + return fields.stream() + .filter(f -> !hudiMetadataFields.contains(f.getName())) + .collect(Collectors.toList()); + } + + @Override + public List<HivePartitionFiles> extractFiles( + IMetaStoreClient client, + Table table, + FileIO fileIO, + Identifier identifier, + RowType partitionRowType, + String defaultPartitionName, + @Nullable PartitionPredicate predicate) { + Map<String, String> options = table.getParameters(); + checkTableType(options); + + String location = table.getSd().getLocation(); + HudiFileIndex fileIndex = new HudiFileIndex(location, options, partitionRowType, predicate); + + if (fileIndex.isPartitioned()) { + return fileIndex.getAllFilteredPartitionFiles(fileIO); + } else { + return Collections.singletonList(fileIndex.getUnpartitionedFiles(fileIO)); + } + } + + private static void checkTableType(Map<String, String> conf) { + String type = conf.getOrDefault("table.type", HoodieTableType.COPY_ON_WRITE.name()); + checkArgument( + HoodieTableType.valueOf(type) == HoodieTableType.COPY_ON_WRITE, + "Only Hudi COW table is supported yet but found %s table.", + type); + } +} diff --git a/paimon-hudi/src/main/resources/META-INF/services/org.apache.paimon.hive.clone.HiveCloneExtractor b/paimon-hudi/src/main/resources/META-INF/services/org.apache.paimon.hive.clone.HiveCloneExtractor new file mode 100644 index 0000000000..06d88ce0bb --- /dev/null +++ b/paimon-hudi/src/main/resources/META-INF/services/org.apache.paimon.hive.clone.HiveCloneExtractor @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +org.apache.paimon.hudi.HudiHiveCloneExtractor diff --git a/pom.xml b/pom.xml index 82b472d699..f43193dfe5 100644 --- a/pom.xml +++ b/pom.xml @@ -70,6 +70,7 @@ under the License. <module>paimon-arrow</module> <module>tools/ci/paimon-ci-tools</module> <module>paimon-open-api</module> + <module>paimon-hudi</module> </modules> <properties>
