This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 0e53989c4 [core] Move schema validation of FallbackReadFileStoreTable 
from constructor to scanner (#3966)
0e53989c4 is described below

commit 0e53989c44defd4d44fb67f907ec9a3899608397
Author: tsreaper <[email protected]>
AuthorDate: Fri Aug 16 15:10:03 2024 +0800

    [core] Move schema validation of FallbackReadFileStoreTable from 
constructor to scanner (#3966)
---
 .../paimon/table/FallbackReadFileStoreTable.java   | 111 +++++++++++----------
 .../org/apache/paimon/flink/BranchSqlITCase.java   |  13 ++-
 2 files changed, 67 insertions(+), 57 deletions(-)

diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/FallbackReadFileStoreTable.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/FallbackReadFileStoreTable.java
index f7238f033..ead4452b0 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/FallbackReadFileStoreTable.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/FallbackReadFileStoreTable.java
@@ -63,60 +63,6 @@ public class FallbackReadFileStoreTable extends 
DelegatedFileStoreTable {
 
         Preconditions.checkArgument(!(main instanceof 
FallbackReadFileStoreTable));
         Preconditions.checkArgument(!(fallback instanceof 
FallbackReadFileStoreTable));
-
-        String mainBranch = main.coreOptions().branch();
-        String fallbackBranch = fallback.coreOptions().branch();
-        RowType mainRowType = main.schema().logicalRowType();
-        RowType fallbackRowType = fallback.schema().logicalRowType();
-        Preconditions.checkArgument(
-                sameRowTypeIgnoreNullable(mainRowType, fallbackRowType),
-                "Branch %s and %s does not have the same row type.\n"
-                        + "Row type of branch %s is %s.\n"
-                        + "Row type of branch %s is %s.",
-                mainBranch,
-                fallbackBranch,
-                mainBranch,
-                mainRowType,
-                fallbackBranch,
-                fallbackRowType);
-
-        List<String> mainPrimaryKeys = main.schema().primaryKeys();
-        List<String> fallbackPrimaryKeys = fallback.schema().primaryKeys();
-        if (!mainPrimaryKeys.isEmpty()) {
-            if (fallbackPrimaryKeys.isEmpty()) {
-                throw new IllegalArgumentException(
-                        "Branch "
-                                + mainBranch
-                                + " has primary keys while fallback branch "
-                                + fallbackBranch
-                                + " does not. This is not allowed.");
-            }
-            Preconditions.checkArgument(
-                    mainPrimaryKeys.equals(fallbackPrimaryKeys),
-                    "Branch %s and %s both have primary keys but are not the 
same.\n"
-                            + "Primary keys of %s are %s.\n"
-                            + "Primary keys of %s are %s.",
-                    mainBranch,
-                    fallbackBranch,
-                    mainBranch,
-                    mainPrimaryKeys,
-                    fallbackBranch,
-                    fallbackPrimaryKeys);
-        }
-    }
-
-    private boolean sameRowTypeIgnoreNullable(RowType mainRowType, RowType 
fallbackRowType) {
-        if (mainRowType.getFieldCount() != fallbackRowType.getFieldCount()) {
-            return false;
-        }
-        for (int i = 0; i < mainRowType.getFieldCount(); i++) {
-            DataType mainType = mainRowType.getFields().get(i).type();
-            DataType fallbackType = fallbackRowType.getFields().get(i).type();
-            if (!mainType.equalsIgnoreNullable(fallbackType)) {
-                return false;
-            }
-        }
-        return true;
     }
 
     @Override
@@ -187,9 +133,66 @@ public class FallbackReadFileStoreTable extends 
DelegatedFileStoreTable {
 
     @Override
     public DataTableScan newScan() {
+        validateSchema();
         return new Scan();
     }
 
+    private void validateSchema() {
+        String mainBranch = wrapped.coreOptions().branch();
+        String fallbackBranch = fallback.coreOptions().branch();
+        RowType mainRowType = wrapped.schema().logicalRowType();
+        RowType fallbackRowType = fallback.schema().logicalRowType();
+        Preconditions.checkArgument(
+                sameRowTypeIgnoreNullable(mainRowType, fallbackRowType),
+                "Branch %s and %s does not have the same row type.\n"
+                        + "Row type of branch %s is %s.\n"
+                        + "Row type of branch %s is %s.",
+                mainBranch,
+                fallbackBranch,
+                mainBranch,
+                mainRowType,
+                fallbackBranch,
+                fallbackRowType);
+
+        List<String> mainPrimaryKeys = wrapped.schema().primaryKeys();
+        List<String> fallbackPrimaryKeys = fallback.schema().primaryKeys();
+        if (!mainPrimaryKeys.isEmpty()) {
+            if (fallbackPrimaryKeys.isEmpty()) {
+                throw new IllegalArgumentException(
+                        "Branch "
+                                + mainBranch
+                                + " has primary keys while fallback branch "
+                                + fallbackBranch
+                                + " does not. This is not allowed.");
+            }
+            Preconditions.checkArgument(
+                    mainPrimaryKeys.equals(fallbackPrimaryKeys),
+                    "Branch %s and %s both have primary keys but are not the 
same.\n"
+                            + "Primary keys of %s are %s.\n"
+                            + "Primary keys of %s are %s.",
+                    mainBranch,
+                    fallbackBranch,
+                    mainBranch,
+                    mainPrimaryKeys,
+                    fallbackBranch,
+                    fallbackPrimaryKeys);
+        }
+    }
+
+    private boolean sameRowTypeIgnoreNullable(RowType mainRowType, RowType 
fallbackRowType) {
+        if (mainRowType.getFieldCount() != fallbackRowType.getFieldCount()) {
+            return false;
+        }
+        for (int i = 0; i < mainRowType.getFieldCount(); i++) {
+            DataType mainType = mainRowType.getFields().get(i).type();
+            DataType fallbackType = fallbackRowType.getFields().get(i).type();
+            if (!mainType.equalsIgnoreNullable(fallbackType)) {
+                return false;
+            }
+        }
+        return true;
+    }
+
     private class Scan implements DataTableScan {
 
         private final DataTableScan mainScan;
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BranchSqlITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BranchSqlITCase.java
index d50f36ccd..36a1a11c9 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BranchSqlITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BranchSqlITCase.java
@@ -336,16 +336,23 @@ public class BranchSqlITCase extends CatalogITCaseBase {
     }
 
     @Test
-    public void testDifferentRowTypes() {
+    public void testDifferentRowTypes() throws Exception {
         sql(
                 "CREATE TABLE t ( pt INT NOT NULL, k INT NOT NULL, v STRING ) 
PARTITIONED BY (pt) WITH ( 'bucket' = '-1' )");
         sql("CALL sys.create_branch('default.t', 'pk')");
         sql("ALTER TABLE `t$branch_pk` SET ( 'primary-key' = 'pt, k', 'bucket' 
= '2' )");
         sql("ALTER TABLE `t$branch_pk` ADD (v2 INT)");
-        sql("ALTER TABLE t SET ( 'scan.fallback-branch' = 'pk' )");
+        sql("INSERT INTO t VALUES (1, 10, 'apple')");
+        sql("INSERT INTO `t$branch_pk` VALUES (1, 10, 'cat', 100)");
 
-        assertThatThrownBy(() -> sql("INSERT INTO t VALUES (1, 10, 'apple')"))
+        sql("ALTER TABLE t SET ( 'scan.fallback-branch' = 'pk' )");
+        assertThatThrownBy(() -> sql("SELECT * FROM t"))
                 .hasMessageContaining("Branch main and pk does not have the 
same row type");
+
+        sql("ALTER TABLE t RESET ( 'scan.fallback-branch' )");
+        assertThat(collectResult("SELECT v, k FROM 
t")).containsExactlyInAnyOrder("+I[apple, 10]");
+        assertThat(collectResult("SELECT v, v2, k FROM `t$branch_pk`"))
+                .containsExactlyInAnyOrder("+I[cat, 100, 10]");
     }
 
     @Test

Reply via email to