This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new b8e5a4b164 [core] Fix bug when both main branch and fallback branch
have no primary key (#5665)
b8e5a4b164 is described below
commit b8e5a4b16495150730b0876f563f348e78339dff
Author: tsreaper <[email protected]>
AuthorDate: Tue May 27 17:13:07 2025 +0800
[core] Fix bug when both main branch and fallback branch have no primary
key (#5665)
---
.../paimon/table/FallbackReadFileStoreTable.java | 63 +++++++++++++++++++---
.../org/apache/paimon/table/source/DataSplit.java | 2 +-
.../org/apache/paimon/flink/BranchSqlITCase.java | 22 ++++++++
3 files changed, 79 insertions(+), 8 deletions(-)
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/FallbackReadFileStoreTable.java
b/paimon-core/src/main/java/org/apache/paimon/table/FallbackReadFileStoreTable.java
index 07d837bd42..4ceabed38b 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/FallbackReadFileStoreTable.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/FallbackReadFileStoreTable.java
@@ -25,6 +25,10 @@ import org.apache.paimon.data.InternalRow;
import org.apache.paimon.disk.IOManager;
import org.apache.paimon.fs.Path;
import org.apache.paimon.io.DataFileMeta;
+import org.apache.paimon.io.DataInputView;
+import org.apache.paimon.io.DataInputViewStreamWrapper;
+import org.apache.paimon.io.DataOutputView;
+import org.apache.paimon.io.DataOutputViewStreamWrapper;
import org.apache.paimon.manifest.PartitionEntry;
import org.apache.paimon.metrics.MetricRegistry;
import org.apache.paimon.options.Options;
@@ -49,11 +53,14 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
@@ -233,6 +240,49 @@ public class FallbackReadFileStoreTable extends
DelegatedFileStoreTable {
return true;
}
+ private static class FallbackDataSplit extends DataSplit {
+
+ private static final long serialVersionUID = 1L;
+
+ private boolean isFallback;
+
+ private FallbackDataSplit(DataSplit dataSplit, boolean isFallback) {
+ assign(dataSplit);
+ this.isFallback = isFallback;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ return super.equals(o) && isFallback == ((FallbackDataSplit)
o).isFallback;
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(super.hashCode(), isFallback);
+ }
+
+ private void writeObject(ObjectOutputStream out) throws IOException {
+ serialize(new DataOutputViewStreamWrapper(out));
+ }
+
+ private void readObject(ObjectInputStream in) throws IOException,
ClassNotFoundException {
+ FallbackDataSplit split = deserialize(new
DataInputViewStreamWrapper(in));
+ assign(split);
+ this.isFallback = split.isFallback;
+ }
+
+ @Override
+ public void serialize(DataOutputView out) throws IOException {
+ super.serialize(out);
+ out.writeBoolean(isFallback);
+ }
+
+ public static FallbackDataSplit deserialize(DataInputView in) throws
IOException {
+ DataSplit dataSplit = DataSplit.deserialize(in);
+ return new FallbackDataSplit(dataSplit, in.readBoolean());
+ }
+ }
+
/** Scan implementation for {@link FallbackReadFileStoreTable}. */
public static class FallbackReadScan implements DataTableScan {
@@ -320,7 +370,7 @@ public class FallbackReadFileStoreTable extends
DelegatedFileStoreTable {
Set<BinaryRow> completePartitions = new HashSet<>();
for (Split split : mainScan.plan().splits()) {
DataSplit dataSplit = (DataSplit) split;
- splits.add(dataSplit);
+ splits.add(new FallbackDataSplit(dataSplit, false));
completePartitions.add(dataSplit.partition());
}
@@ -331,7 +381,7 @@ public class FallbackReadFileStoreTable extends
DelegatedFileStoreTable {
if (!remainingPartitions.isEmpty()) {
fallbackScan.withPartitionFilter(remainingPartitions);
for (Split split : fallbackScan.plan().splits()) {
- splits.add((DataSplit) split);
+ splits.add(new FallbackDataSplit((DataSplit) split, true));
}
}
return new DataFilePlan(splits);
@@ -405,11 +455,10 @@ public class FallbackReadFileStoreTable extends
DelegatedFileStoreTable {
@Override
public RecordReader<InternalRow> createReader(Split split) throws
IOException {
- DataSplit dataSplit = (DataSplit) split;
- if (!dataSplit.dataFiles().isEmpty()
- && dataSplit.dataFiles().get(0).minKey().getFieldCount() >
0) {
+ FallbackDataSplit dataSplit = (FallbackDataSplit) split;
+ if (dataSplit.isFallback) {
try {
- return fallbackRead.createReader(split);
+ return fallbackRead.createReader(dataSplit);
} catch (Exception ignored) {
LOG.error(
"Reading from fallback branch has problems for
files: {}",
@@ -418,7 +467,7 @@ public class FallbackReadFileStoreTable extends
DelegatedFileStoreTable {
.collect(Collectors.joining(", ")));
}
}
- return mainRead.createReader(split);
+ return mainRead.createReader(dataSplit);
}
}
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/DataSplit.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/DataSplit.java
index c34fca6997..f4491618d8 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/source/DataSplit.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/source/DataSplit.java
@@ -312,7 +312,7 @@ public class DataSplit implements Split {
assign(deserialize(new DataInputViewStreamWrapper(in)));
}
- private void assign(DataSplit other) {
+ protected void assign(DataSplit other) {
this.snapshotId = other.snapshotId;
this.partition = other.partition;
this.bucket = other.bucket;
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BranchSqlITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BranchSqlITCase.java
index ec161e76ac..57a6746fea 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BranchSqlITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BranchSqlITCase.java
@@ -655,6 +655,28 @@ public class BranchSqlITCase extends CatalogITCaseBase {
"+I[2, 20, dog, 2]");
}
+ @Test
+ public void testMainAndFallbackNoPrimaryKeys() throws Exception {
+ sql("CREATE TABLE t ( pt INT, v INT ) PARTITIONED BY (pt) WITH (
'bucket' = '-1' )");
+ sql("INSERT INTO t VALUES (1, 110)");
+ sql("CALL sys.create_branch('default.t', 'test')");
+ sql("ALTER TABLE t SET ( 'scan.fallback-branch' = 'test' )");
+ sql("INSERT INTO `t$branch_test` VALUES (2, 210)");
+ assertThat(collectResult("SELECT * FROM t"))
+ .containsExactlyInAnyOrder("+I[1, 110]", "+I[2, 210]");
+
+ sql("ALTER TABLE t ADD v2 INT");
+ sql("ALTER TABLE `t$branch_test` ADD v2 INT");
+ sql("INSERT INTO t VALUES (1, 120, 1200)");
+ sql("INSERT INTO `t$branch_test` VALUES (2, 220, 2200)");
+ assertThat(collectResult("SELECT * FROM t"))
+ .containsExactlyInAnyOrder(
+ "+I[1, 110, null]",
+ "+I[2, 210, null]",
+ "+I[1, 120, 1200]",
+ "+I[2, 220, 2200]");
+ }
+
private List<String> collectResult(String sql) throws Exception {
List<String> result = new ArrayList<>();
try (CloseableIterator<Row> it = tEnv.executeSql(sql).collect()) {