This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new c2386cfb2 [flink] Check that cannot set non-existed fallback branch 
(#4106)
c2386cfb2 is described below

commit c2386cfb23144e0963eb34280b0ded0704c0c0fe
Author: yuzelin <[email protected]>
AuthorDate: Fri Aug 30 21:24:14 2024 +0800

    [flink] Check that cannot set non-existed fallback branch (#4106)
---
 .../java/org/apache/paimon/schema/SchemaManager.java  |  1 +
 .../org/apache/paimon/schema/SchemaValidation.java    | 13 +++++++++++++
 .../java/org/apache/paimon/flink/BranchSqlITCase.java | 19 +++++++++++++++++++
 3 files changed, 33 insertions(+)

diff --git 
a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java 
b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java
index cfa6eb5d2..5d4adeb40 100644
--- a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java
+++ b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java
@@ -535,6 +535,7 @@ public class SchemaManager implements Serializable {
     @VisibleForTesting
     boolean commit(TableSchema newSchema) throws Exception {
         SchemaValidation.validateTableSchema(newSchema);
+        SchemaValidation.validateFallbackBranch(this, newSchema);
         Path schemaPath = toSchemaPath(newSchema.id());
         Callable<Boolean> callable =
                 () -> fileIO.tryToWriteAtomic(schemaPath, 
newSchema.toString());
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java 
b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java
index b3a2bf11c..72ea96024 100644
--- a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java
+++ b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java
@@ -34,6 +34,7 @@ import org.apache.paimon.types.MapType;
 import org.apache.paimon.types.MultisetType;
 import org.apache.paimon.types.RowType;
 import org.apache.paimon.types.VarCharType;
+import org.apache.paimon.utils.StringUtils;
 
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -196,6 +197,18 @@ public class SchemaValidation {
         }
     }
 
+    public static void validateFallbackBranch(SchemaManager schemaManager, 
TableSchema schema) {
+        String fallbackBranch = 
schema.options().get(CoreOptions.SCAN_FALLBACK_BRANCH.key());
+        if (!StringUtils.isNullOrWhitespaceOnly(fallbackBranch)) {
+            checkArgument(
+                    
schemaManager.copyWithBranch(fallbackBranch).latest().isPresent(),
+                    "Cannot set '%s' = '%s' because the branch '%s' isn't 
existed.",
+                    CoreOptions.SCAN_FALLBACK_BRANCH.key(),
+                    fallbackBranch,
+                    fallbackBranch);
+        }
+    }
+
     private static void validateOnlyContainPrimitiveType(
             List<DataField> fields, List<String> fieldNames, String 
errorMessageIntro) {
         if (!fieldNames.isEmpty()) {
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BranchSqlITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BranchSqlITCase.java
index 36a1a11c9..6970eb043 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BranchSqlITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BranchSqlITCase.java
@@ -31,6 +31,7 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 
+import static 
org.apache.paimon.testutils.assertj.PaimonAssertions.anyCauseMatches;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
@@ -510,6 +511,24 @@ public class BranchSqlITCase extends CatalogITCaseBase {
                 .containsExactlyInAnyOrder("+I[[1], 2, 2]", "+I[[2], 3, 2]");
     }
 
+    @Test
+    public void testCannotSetEmptyFallbackBranch() {
+        String errMsg =
+                "Cannot set 'scan.fallback-branch' = 'test' because the branch 
'test' isn't existed.";
+        assertThatThrownBy(
+                        () ->
+                                sql(
+                                        "CREATE TABLE t1 (a INT, b INT) WITH 
('scan.fallback-branch' = 'test')"))
+                .satisfies(anyCauseMatches(IllegalArgumentException.class, 
errMsg));
+
+        assertThatThrownBy(
+                        () -> {
+                            sql("CREATE TABLE t2 (a INT, b INT)");
+                            sql("ALTER TABLE t2 SET ('scan.fallback-branch' = 
'test')");
+                        })
+                .satisfies(anyCauseMatches(IllegalArgumentException.class, 
errMsg));
+    }
+
     private List<String> collectResult(String sql) throws Exception {
         List<String> result = new ArrayList<>();
         try (CloseableIterator<Row> it = tEnv.executeSql(sql).collect()) {

Reply via email to