This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new e2b5efa322 [hotfix] Extract some common codes of clone hive and
migrate hive (#5497)
e2b5efa322 is described below
commit e2b5efa322e7d4ca569ae750a10c0b7607fa3a14
Author: yuzelin <[email protected]>
AuthorDate: Mon Apr 21 14:13:06 2025 +0800
[hotfix] Extract some common codes of clone hive and migrate hive (#5497)
---
.../org/apache/paimon/migrate/FileMetaUtils.java | 32 ++++++-------
.../flink/clone/hive/CopyHiveFilesFunction.java | 56 ++++------------------
.../apache/paimon/hive/migrate/HiveCloneUtils.java | 4 +-
.../apache/paimon/hive/migrate/HiveMigrator.java | 19 +-------
4 files changed, 29 insertions(+), 82 deletions(-)
diff --git
a/paimon-core/src/main/java/org/apache/paimon/migrate/FileMetaUtils.java
b/paimon-core/src/main/java/org/apache/paimon/migrate/FileMetaUtils.java
index e05c6ecfb5..54eff281cd 100644
--- a/paimon-core/src/main/java/org/apache/paimon/migrate/FileMetaUtils.java
+++ b/paimon-core/src/main/java/org/apache/paimon/migrate/FileMetaUtils.java
@@ -104,19 +104,7 @@ public class FileMetaUtils {
Map<Path, Path> rollback) {
try {
- CoreOptions options = ((FileStoreTable) table).coreOptions();
- SimpleColStatsCollector.Factory[] factories =
- StatsCollectorFactories.createStatsFactories(
- options.statsMode(), options,
table.rowType().getFieldNames());
-
- SimpleStatsExtractor simpleStatsExtractor =
- FileFormat.fromIdentifier(format,
options.toConfiguration())
- .createStatsExtractor(table.rowType(), factories)
- .orElseThrow(
- () ->
- new RuntimeException(
- "Can't get table stats
extractor for format "
- + format));
+ SimpleStatsExtractor simpleStatsExtractor =
createSimpleStatsExtractor(table, format);
Path newPath = renameFile(fileIO, fileStatus.getPath(), dir,
format, rollback);
return constructFileMeta(
newPath.getName(),
@@ -169,8 +157,6 @@ public class FileMetaUtils {
}
}
- // -----------------------------private
method---------------------------------------------
-
private static Path renameFile(
FileIO fileIO, Path originPath, Path newDir, String format,
Map<Path, Path> rollback)
throws IOException {
@@ -184,7 +170,7 @@ public class FileMetaUtils {
return newPath;
}
- private static DataFileMeta constructFileMeta(
+ public static DataFileMeta constructFileMeta(
String fileName,
long fileSize,
Path path,
@@ -258,4 +244,18 @@ public class FileMetaUtils {
binaryRowWriter.complete();
return binaryRow;
}
+
+ public static SimpleStatsExtractor createSimpleStatsExtractor(Table table,
String format) {
+ CoreOptions options = ((FileStoreTable) table).coreOptions();
+ SimpleColStatsCollector.Factory[] factories =
+ StatsCollectorFactories.createStatsFactories(
+ options.statsMode(), options,
table.rowType().getFieldNames());
+
+ return FileFormat.fromIdentifier(format, options.toConfiguration())
+ .createStatsExtractor(table.rowType(), factories)
+ .orElseThrow(
+ () ->
+ new RuntimeException(
+ "Can't get table stats extractor for
format " + format));
+ }
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/hive/CopyHiveFilesFunction.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/hive/CopyHiveFilesFunction.java
index 20bca892dd..700bccbf1c 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/hive/CopyHiveFilesFunction.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/hive/CopyHiveFilesFunction.java
@@ -18,29 +18,19 @@
package org.apache.paimon.flink.clone.hive;
-import org.apache.paimon.CoreOptions;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.data.BinaryRow;
-import org.apache.paimon.format.FileFormat;
-import org.apache.paimon.format.SimpleColStats;
import org.apache.paimon.format.SimpleStatsExtractor;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
import org.apache.paimon.io.DataFileMeta;
-import org.apache.paimon.manifest.FileSource;
-import org.apache.paimon.statistics.SimpleColStatsCollector;
-import org.apache.paimon.stats.SimpleStats;
-import org.apache.paimon.stats.SimpleStatsConverter;
+import org.apache.paimon.migrate.FileMetaUtils;
import org.apache.paimon.table.FileStoreTable;
-import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.IOUtils;
-import org.apache.paimon.utils.Pair;
-import org.apache.paimon.utils.StatsCollectorFactories;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
-import java.util.Collections;
import java.util.Map;
/** Copy files for table. */
@@ -60,36 +50,12 @@ public class CopyHiveFilesFunction extends
CopyProcessFunction<CloneFileInfo, Da
Collector<DataFileInfo> collector)
throws Exception {
Identifier identifier = cloneFileInfo.identifier();
- long fileSize = cloneFileInfo.fileSize();
String format = cloneFileInfo.format();
Path path = cloneFileInfo.path();
BinaryRow partition = cloneFileInfo.partition();
FileIO sourceFileIO = hiveCatalog.fileIO();
FileStoreTable targetTable = (FileStoreTable) getTable(identifier);
- // util for collecting stats
- CoreOptions options = targetTable.coreOptions();
- SimpleColStatsCollector.Factory[] factories =
- StatsCollectorFactories.createStatsFactories(
- options.statsMode(), options,
targetTable.rowType().getFieldNames());
-
- SimpleStatsExtractor simpleStatsExtractor =
- FileFormat.fromIdentifier(format, options.toConfiguration())
- .createStatsExtractor(targetTable.rowType(), factories)
- .orElseThrow(
- () ->
- new RuntimeException(
- "Can't get table stats
extractor for format "
- + format));
- RowType rowTypeWithSchemaId =
-
targetTable.schemaManager().schema(targetTable.schema().id()).logicalRowType();
-
- SimpleStatsConverter statsArraySerializer = new
SimpleStatsConverter(rowTypeWithSchemaId);
-
- // extract stats
- Pair<SimpleColStats[], SimpleStatsExtractor.FileInfo> fileInfo =
- simpleStatsExtractor.extractWithFileInfo(sourceFileIO, path,
fileSize);
- SimpleStats stats =
statsArraySerializer.toBinaryAllMode(fileInfo.getLeft());
// new file name
String suffix = "." + format;
@@ -103,20 +69,16 @@ public class CopyHiveFilesFunction extends
CopyProcessFunction<CloneFileInfo, Da
targetTable.fileIO().newOutputStream(new Path(targetFilePath,
newFileName), false));
// to DataFileMeta
+ SimpleStatsExtractor simpleStatsExtractor =
+ FileMetaUtils.createSimpleStatsExtractor(targetTable, format);
DataFileMeta dataFileMeta =
- DataFileMeta.forAppend(
+ FileMetaUtils.constructFileMeta(
newFileName,
- fileSize,
- fileInfo.getRight().getRowCount(),
- stats,
- 0,
- 0,
- targetTable.schema().id(),
- Collections.emptyList(),
- null,
- FileSource.APPEND,
- null,
- null);
+ cloneFileInfo.fileSize(),
+ path,
+ simpleStatsExtractor,
+ sourceFileIO,
+ targetTable);
collector.collect(
new DataFileInfo(
diff --git
a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/migrate/HiveCloneUtils.java
b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/migrate/HiveCloneUtils.java
index 8532cbecf6..5c982968f0 100644
---
a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/migrate/HiveCloneUtils.java
+++
b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/migrate/HiveCloneUtils.java
@@ -57,7 +57,7 @@ public class HiveCloneUtils {
private static final Logger LOG =
LoggerFactory.getLogger(HiveCloneUtils.class);
- private static final Predicate<FileStatus> HIDDEN_PATH_FILTER =
+ public static final Predicate<FileStatus> HIDDEN_PATH_FILTER =
p -> !p.getPath().getName().startsWith("_") &&
!p.getPath().getName().startsWith(".");
public static List<Identifier> listTables(HiveCatalog hiveCatalog) throws
Exception {
@@ -186,7 +186,7 @@ public class HiveCloneUtils {
return new HivePartitionFiles(partition, paths, fileSizes, format);
}
- private static String parseFormat(String serder) {
+ public static String parseFormat(String serder) {
if (serder.contains("avro")) {
return "avro";
} else if (serder.contains("parquet")) {
diff --git
a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/migrate/HiveMigrator.java
b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/migrate/HiveMigrator.java
index 6450cb87c2..68215c9c1b 100644
---
a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/migrate/HiveMigrator.java
+++
b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/migrate/HiveMigrator.java
@@ -24,7 +24,6 @@ import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.data.BinaryWriter;
import org.apache.paimon.fs.FileIO;
-import org.apache.paimon.fs.FileStatus;
import org.apache.paimon.fs.Path;
import org.apache.paimon.hive.HiveCatalog;
import org.apache.paimon.io.DataFileMeta;
@@ -56,10 +55,11 @@ import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadPoolExecutor;
-import java.util.function.Predicate;
import java.util.stream.Collectors;
import static org.apache.paimon.hive.HiveTypeUtils.toPaimonType;
+import static org.apache.paimon.hive.migrate.HiveCloneUtils.HIDDEN_PATH_FILTER;
+import static org.apache.paimon.hive.migrate.HiveCloneUtils.parseFormat;
import static org.apache.paimon.utils.Preconditions.checkArgument;
import static org.apache.paimon.utils.ThreadPoolUtils.createCachedThreadPool;
@@ -69,9 +69,6 @@ public class HiveMigrator implements Migrator {
private static final Logger LOG =
LoggerFactory.getLogger(HiveMigrator.class);
private ThreadPoolExecutor executor;
- private static final Predicate<FileStatus> HIDDEN_PATH_FILTER =
- p -> !p.getPath().getName().startsWith("_") &&
!p.getPath().getName().startsWith(".");
-
private static final String PAIMON_SUFFIX = "_paimon_";
private final FileIO fileIO;
@@ -362,18 +359,6 @@ public class HiveMigrator implements Migrator {
}
}
- private String parseFormat(String serder) {
- if (serder.contains("avro")) {
- return "avro";
- } else if (serder.contains("parquet")) {
- return "parquet";
- } else if (serder.contains("orc")) {
- return "orc";
- } else {
- throw new UnsupportedOperationException("Unknown partition format:
" + serder);
- }
- }
-
/** One import task for one partition. */
public static class MigrateTask implements Callable<CommitMessage> {