This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new ae5c4cf6b0 [core] Also check row type if schema id not match in
RemoteLookupFileManager (#6597)
ae5c4cf6b0 is described below
commit ae5c4cf6b0d6131cdf049dfdc28fc2bca4835a16
Author: tsreaper <[email protected]>
AuthorDate: Fri Nov 14 12:37:06 2025 +0800
[core] Also check row type if schema id not match in
RemoteLookupFileManager (#6597)
---
.../mergetree/compact/RemoteLookupFileManager.java | 21 ++++++++++++++++++---
.../paimon/operation/KeyValueFileStoreWrite.java | 5 ++++-
2 files changed, 22 insertions(+), 4 deletions(-)
diff --git
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/RemoteLookupFileManager.java
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/RemoteLookupFileManager.java
index 23aed652c8..fa2c2c51f9 100644
---
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/RemoteLookupFileManager.java
+++
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/RemoteLookupFileManager.java
@@ -27,7 +27,9 @@ import org.apache.paimon.io.DataFilePathFactory;
import org.apache.paimon.mergetree.LookupFile;
import org.apache.paimon.mergetree.LookupLevels;
import org.apache.paimon.mergetree.LookupLevels.RemoteFileDownloader;
+import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.schema.TableSchema;
+import org.apache.paimon.types.RowType;
import org.apache.commons.io.IOUtils;
@@ -36,7 +38,9 @@ import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
/** Manager to manage remote files for lookup. */
public class RemoteLookupFileManager<T> implements RemoteFileDownloader {
@@ -45,17 +49,22 @@ public class RemoteLookupFileManager<T> implements
RemoteFileDownloader {
private final DataFilePathFactory pathFactory;
private final TableSchema schema;
private final LookupLevels<T> lookupLevels;
+ private final SchemaManager schemaManager;
+ private final Map<Long, RowType> schemaRowTypes;
public RemoteLookupFileManager(
FileIO fileIO,
DataFilePathFactory pathFactory,
TableSchema schema,
- LookupLevels<T> lookupLevels) {
+ LookupLevels<T> lookupLevels,
+ SchemaManager schemaManager) {
this.fileIO = fileIO;
this.pathFactory = pathFactory;
this.schema = schema;
this.lookupLevels = lookupLevels;
this.lookupLevels.setRemoteFileDownloader(this);
+ this.schemaManager = schemaManager;
+ this.schemaRowTypes = new HashMap<>();
}
public DataFileMeta genRemoteLookupFile(DataFileMeta file) throws
IOException {
@@ -81,8 +90,14 @@ public class RemoteLookupFileManager<T> implements
RemoteFileDownloader {
@Override
public boolean tryToDownload(DataFileMeta dataFile, File localFile) {
- if (dataFile.schemaId() != schema.id()) {
- return false;
+ long schemaId = dataFile.schemaId();
+ if (schemaId != schema.id()) {
+ if (!schemaRowTypes.containsKey(schemaId)) {
+ schemaRowTypes.put(schemaId,
schemaManager.schema(schemaId).logicalRowType());
+ }
+ if (!schema.logicalRowType().equals(schemaRowTypes.get(schemaId)))
{
+ return false;
+ }
}
String remoteSstName = lookupLevels.remoteSstName(dataFile.fileName());
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java
b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java
index ee012b11ad..4c85ca7f0a 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java
@@ -113,6 +113,7 @@ public class KeyValueFileStoreWrite extends
MemoryFileStoreWrite<KeyValue> {
private final RowType keyType;
private final RowType valueType;
private final FileIO fileIO;
+ private final SchemaManager schemaManager;
private final RowType partitionType;
private final String commitUser;
@Nullable private final RecordLevelExpire recordLevelExpire;
@@ -148,6 +149,7 @@ public class KeyValueFileStoreWrite extends
MemoryFileStoreWrite<KeyValue> {
dvMaintainerFactory,
tableName);
this.fileIO = fileIO;
+ this.schemaManager = schemaManager;
this.partitionType = partitionType;
this.keyType = keyType;
this.valueType = valueType;
@@ -378,7 +380,8 @@ public class KeyValueFileStoreWrite extends
MemoryFileStoreWrite<KeyValue> {
fileIO,
keyReaderFactory.pathFactory(),
keyReaderFactory.schema(),
- lookupLevels);
+ lookupLevels,
+ schemaManager);
}
return new LookupMergeTreeCompactRewriter(
maxLevel,