This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 0e53989c4 [core] Move schema validation of FallbackReadFileStoreTable
from constructor to scanner (#3966)
0e53989c4 is described below
commit 0e53989c44defd4d44fb67f907ec9a3899608397
Author: tsreaper <[email protected]>
AuthorDate: Fri Aug 16 15:10:03 2024 +0800
[core] Move schema validation of FallbackReadFileStoreTable from
constructor to scanner (#3966)
---
.../paimon/table/FallbackReadFileStoreTable.java | 111 +++++++++++----------
.../org/apache/paimon/flink/BranchSqlITCase.java | 13 ++-
2 files changed, 67 insertions(+), 57 deletions(-)
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/FallbackReadFileStoreTable.java
b/paimon-core/src/main/java/org/apache/paimon/table/FallbackReadFileStoreTable.java
index f7238f033..ead4452b0 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/FallbackReadFileStoreTable.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/FallbackReadFileStoreTable.java
@@ -63,60 +63,6 @@ public class FallbackReadFileStoreTable extends
DelegatedFileStoreTable {
Preconditions.checkArgument(!(main instanceof
FallbackReadFileStoreTable));
Preconditions.checkArgument(!(fallback instanceof
FallbackReadFileStoreTable));
-
- String mainBranch = main.coreOptions().branch();
- String fallbackBranch = fallback.coreOptions().branch();
- RowType mainRowType = main.schema().logicalRowType();
- RowType fallbackRowType = fallback.schema().logicalRowType();
- Preconditions.checkArgument(
- sameRowTypeIgnoreNullable(mainRowType, fallbackRowType),
- "Branch %s and %s does not have the same row type.\n"
- + "Row type of branch %s is %s.\n"
- + "Row type of branch %s is %s.",
- mainBranch,
- fallbackBranch,
- mainBranch,
- mainRowType,
- fallbackBranch,
- fallbackRowType);
-
- List<String> mainPrimaryKeys = main.schema().primaryKeys();
- List<String> fallbackPrimaryKeys = fallback.schema().primaryKeys();
- if (!mainPrimaryKeys.isEmpty()) {
- if (fallbackPrimaryKeys.isEmpty()) {
- throw new IllegalArgumentException(
- "Branch "
- + mainBranch
- + " has primary keys while fallback branch "
- + fallbackBranch
- + " does not. This is not allowed.");
- }
- Preconditions.checkArgument(
- mainPrimaryKeys.equals(fallbackPrimaryKeys),
- "Branch %s and %s both have primary keys but are not the
same.\n"
- + "Primary keys of %s are %s.\n"
- + "Primary keys of %s are %s.",
- mainBranch,
- fallbackBranch,
- mainBranch,
- mainPrimaryKeys,
- fallbackBranch,
- fallbackPrimaryKeys);
- }
- }
-
- private boolean sameRowTypeIgnoreNullable(RowType mainRowType, RowType
fallbackRowType) {
- if (mainRowType.getFieldCount() != fallbackRowType.getFieldCount()) {
- return false;
- }
- for (int i = 0; i < mainRowType.getFieldCount(); i++) {
- DataType mainType = mainRowType.getFields().get(i).type();
- DataType fallbackType = fallbackRowType.getFields().get(i).type();
- if (!mainType.equalsIgnoreNullable(fallbackType)) {
- return false;
- }
- }
- return true;
}
@Override
@@ -187,9 +133,66 @@ public class FallbackReadFileStoreTable extends
DelegatedFileStoreTable {
@Override
public DataTableScan newScan() {
+ validateSchema();
return new Scan();
}
+ private void validateSchema() {
+ String mainBranch = wrapped.coreOptions().branch();
+ String fallbackBranch = fallback.coreOptions().branch();
+ RowType mainRowType = wrapped.schema().logicalRowType();
+ RowType fallbackRowType = fallback.schema().logicalRowType();
+ Preconditions.checkArgument(
+ sameRowTypeIgnoreNullable(mainRowType, fallbackRowType),
+ "Branch %s and %s does not have the same row type.\n"
+ + "Row type of branch %s is %s.\n"
+ + "Row type of branch %s is %s.",
+ mainBranch,
+ fallbackBranch,
+ mainBranch,
+ mainRowType,
+ fallbackBranch,
+ fallbackRowType);
+
+ List<String> mainPrimaryKeys = wrapped.schema().primaryKeys();
+ List<String> fallbackPrimaryKeys = fallback.schema().primaryKeys();
+ if (!mainPrimaryKeys.isEmpty()) {
+ if (fallbackPrimaryKeys.isEmpty()) {
+ throw new IllegalArgumentException(
+ "Branch "
+ + mainBranch
+ + " has primary keys while fallback branch "
+ + fallbackBranch
+ + " does not. This is not allowed.");
+ }
+ Preconditions.checkArgument(
+ mainPrimaryKeys.equals(fallbackPrimaryKeys),
+ "Branch %s and %s both have primary keys but are not the
same.\n"
+ + "Primary keys of %s are %s.\n"
+ + "Primary keys of %s are %s.",
+ mainBranch,
+ fallbackBranch,
+ mainBranch,
+ mainPrimaryKeys,
+ fallbackBranch,
+ fallbackPrimaryKeys);
+ }
+ }
+
+ private boolean sameRowTypeIgnoreNullable(RowType mainRowType, RowType
fallbackRowType) {
+ if (mainRowType.getFieldCount() != fallbackRowType.getFieldCount()) {
+ return false;
+ }
+ for (int i = 0; i < mainRowType.getFieldCount(); i++) {
+ DataType mainType = mainRowType.getFields().get(i).type();
+ DataType fallbackType = fallbackRowType.getFields().get(i).type();
+ if (!mainType.equalsIgnoreNullable(fallbackType)) {
+ return false;
+ }
+ }
+ return true;
+ }
+
private class Scan implements DataTableScan {
private final DataTableScan mainScan;
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BranchSqlITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BranchSqlITCase.java
index d50f36ccd..36a1a11c9 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BranchSqlITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BranchSqlITCase.java
@@ -336,16 +336,23 @@ public class BranchSqlITCase extends CatalogITCaseBase {
}
@Test
- public void testDifferentRowTypes() {
+ public void testDifferentRowTypes() throws Exception {
sql(
"CREATE TABLE t ( pt INT NOT NULL, k INT NOT NULL, v STRING )
PARTITIONED BY (pt) WITH ( 'bucket' = '-1' )");
sql("CALL sys.create_branch('default.t', 'pk')");
sql("ALTER TABLE `t$branch_pk` SET ( 'primary-key' = 'pt, k', 'bucket'
= '2' )");
sql("ALTER TABLE `t$branch_pk` ADD (v2 INT)");
- sql("ALTER TABLE t SET ( 'scan.fallback-branch' = 'pk' )");
+ sql("INSERT INTO t VALUES (1, 10, 'apple')");
+ sql("INSERT INTO `t$branch_pk` VALUES (1, 10, 'cat', 100)");
- assertThatThrownBy(() -> sql("INSERT INTO t VALUES (1, 10, 'apple')"))
+ sql("ALTER TABLE t SET ( 'scan.fallback-branch' = 'pk' )");
+ assertThatThrownBy(() -> sql("SELECT * FROM t"))
.hasMessageContaining("Branch main and pk does not have the
same row type");
+
+ sql("ALTER TABLE t RESET ( 'scan.fallback-branch' )");
+ assertThat(collectResult("SELECT v, k FROM
t")).containsExactlyInAnyOrder("+I[apple, 10]");
+ assertThat(collectResult("SELECT v, v2, k FROM `t$branch_pk`"))
+ .containsExactlyInAnyOrder("+I[cat, 100, 10]");
}
@Test