This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new bfd0a0a8b4 [core] Throw exception if increment query with rescale 
bucket (#4984)
bfd0a0a8b4 is described below

commit bfd0a0a8b48e75465ade9ff2c14c995b4c4ee706
Author: yuzelin <[email protected]>
AuthorDate: Thu Jan 23 10:25:00 2025 +0800

    [core] Throw exception if increment query with rescale bucket (#4984)
---
 .../paimon/table/source/AbstractDataTableScan.java |  5 +++-
 .../snapshot/IncrementalTagStartingScanner.java    | 14 ++++++++++-
 .../table/source/snapshot/TimeTravelUtil.java      | 24 ++++++++++++++++--
 .../apache/paimon/flink/BatchFileStoreITCase.java  | 29 ++++++++++++++++++++++
 4 files changed, 68 insertions(+), 4 deletions(-)

diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractDataTableScan.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractDataTableScan.java
index 59b11281cc..5bb9ba1378 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractDataTableScan.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractDataTableScan.java
@@ -245,7 +245,10 @@ public abstract class AbstractDataTableScan implements 
DataTableScan {
 
         Options conf = options.toConfiguration();
         TagManager tagManager =
-                new TagManager(snapshotManager.fileIO(), 
snapshotManager.tablePath());
+                new TagManager(
+                        snapshotManager.fileIO(),
+                        snapshotManager.tablePath(),
+                        snapshotManager.branch());
         if (conf.contains(CoreOptions.INCREMENTAL_BETWEEN)) {
             Pair<String, String> incrementalBetween = 
options.incrementalBetween();
             Optional<Tag> startTag = 
tagManager.get(incrementalBetween.getLeft());
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/IncrementalTagStartingScanner.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/IncrementalTagStartingScanner.java
index 8c4b5d5cec..835b7595a3 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/IncrementalTagStartingScanner.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/IncrementalTagStartingScanner.java
@@ -20,6 +20,7 @@ package org.apache.paimon.table.source.snapshot;
 
 import org.apache.paimon.CoreOptions;
 import org.apache.paimon.Snapshot;
+import org.apache.paimon.schema.SchemaManager;
 import org.apache.paimon.tag.Tag;
 import org.apache.paimon.tag.TagPeriodHandler;
 import org.apache.paimon.utils.Pair;
@@ -50,6 +51,14 @@ public class IncrementalTagStartingScanner extends 
AbstractStartingScanner {
         this.start = start;
         this.end = end;
         this.startingSnapshotId = start.id();
+
+        TimeTravelUtil.checkRescaleBucketForIncrementalTagQuery(
+                new SchemaManager(
+                        snapshotManager.fileIO(),
+                        snapshotManager.tablePath(),
+                        snapshotManager.branch()),
+                start.schemaId(),
+                end.schemaId());
     }
 
     @Override
@@ -66,7 +75,10 @@ public class IncrementalTagStartingScanner extends 
AbstractStartingScanner {
                 endTagName);
 
         TagManager tagManager =
-                new TagManager(snapshotManager.fileIO(), 
snapshotManager.tablePath());
+                new TagManager(
+                        snapshotManager.fileIO(),
+                        snapshotManager.tablePath(),
+                        snapshotManager.branch());
 
         Optional<Tag> endTag = tagManager.get(endTagName);
         if (!endTag.isPresent()) {
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/TimeTravelUtil.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/TimeTravelUtil.java
index 5b4ee4e58c..4a0f4290df 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/TimeTravelUtil.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/TimeTravelUtil.java
@@ -20,8 +20,9 @@ package org.apache.paimon.table.source.snapshot;
 
 import org.apache.paimon.CoreOptions;
 import org.apache.paimon.Snapshot;
+import org.apache.paimon.schema.SchemaManager;
+import org.apache.paimon.schema.TableSchema;
 import org.apache.paimon.table.FileStoreTable;
-import org.apache.paimon.utils.Preconditions;
 import org.apache.paimon.utils.SnapshotManager;
 import org.apache.paimon.utils.SnapshotNotExistException;
 import org.apache.paimon.utils.TagManager;
@@ -30,6 +31,7 @@ import java.util.ArrayList;
 import java.util.List;
 
 import static org.apache.paimon.CoreOptions.SCAN_SNAPSHOT_ID;
+import static org.apache.paimon.utils.Preconditions.checkArgument;
 
 /** The util class of resolve snapshot from scan params for time travel. */
 public class TimeTravelUtil {
@@ -58,7 +60,7 @@ public class TimeTravelUtil {
             return snapshotManager.latestSnapshot();
         }
 
-        Preconditions.checkArgument(
+        checkArgument(
                 scanHandleKey.size() == 1,
                 String.format(
                         "Only one of the following parameters may be set : 
[%s, %s, %s, %s]",
@@ -124,4 +126,22 @@ public class TimeTravelUtil {
                 new TagManager(snapshotManager.fileIO(), 
snapshotManager.tablePath());
         return tagManager.getOrThrow(tagName).trimToSnapshot();
     }
+
+    public static void checkRescaleBucketForIncrementalTagQuery(
+            SchemaManager schemaManager, long schemaId1, long schemaId2) {
+        if (schemaId1 != schemaId2) {
+            int bucketNumber1 = bucketNumber(schemaManager, schemaId1);
+            int bucketNumber2 = bucketNumber(schemaManager, schemaId2);
+            checkArgument(
+                    bucketNumber1 == bucketNumber2,
+                    "The bucket number of two tags are different (%s, %s), 
which is not supported in incremental tag query.",
+                    bucketNumber1,
+                    bucketNumber2);
+        }
+    }
+
+    private static int bucketNumber(SchemaManager schemaManager, long 
schemaId) {
+        TableSchema schema = schemaManager.schema(schemaId);
+        return CoreOptions.fromMap(schema.options()).bucket();
+    }
 }
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java
index ee24dc8ef3..d9a5cab1d4 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java
@@ -21,6 +21,7 @@ package org.apache.paimon.flink;
 import org.apache.paimon.CoreOptions;
 import org.apache.paimon.flink.util.AbstractTestBase;
 import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.Table;
 import org.apache.paimon.utils.BlockingIterator;
 import org.apache.paimon.utils.DateTimeUtils;
 import org.apache.paimon.utils.SnapshotNotExistException;
@@ -37,6 +38,7 @@ import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.ValueSource;
 
 import java.math.BigDecimal;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
@@ -652,6 +654,33 @@ public class BatchFileStoreITCase extends 
CatalogITCaseBase {
         assertThat(result).containsExactlyInAnyOrder(Row.of(1, 11, 111), 
Row.of(2, 22, 222));
     }
 
+    @Test
+    public void testIncrementTagQueryWithRescaleBucket() throws Exception {
+        sql("CREATE TABLE test (a INT PRIMARY KEY NOT ENFORCED, b INT) WITH 
('bucket' = '1')");
+        Table table = paimonTable("test");
+
+        sql("INSERT INTO test VALUES (1, 11), (2, 22)");
+        sql("ALTER TABLE test SET ('bucket' = '2')");
+        sql("INSERT OVERWRITE test SELECT * FROM test");
+        sql("INSERT INTO test VALUES (3, 33)");
+
+        table.createTag("2024-01-01", 1);
+        table.createTag("2024-01-02", 3);
+
+        List<String> incrementalOptions =
+                Arrays.asList(
+                        "'incremental-between'='2024-01-01,2024-01-02'",
+                        "'incremental-to-auto-tag'='2024-01-02'");
+
+        for (String option : incrementalOptions) {
+            assertThatThrownBy(() -> sql("SELECT * FROM test /*+ OPTIONS (%s) 
*/", option))
+                    .satisfies(
+                            anyCauseMatches(
+                                    IllegalArgumentException.class,
+                                    "The bucket number of two tags are 
different (1, 2), which is not supported in incremental tag query."));
+        }
+    }
+
     private void validateCount1PushDown(String sql) {
         Transformation<?> transformation = AbstractTestBase.translate(tEnv, 
sql);
         while (!transformation.getInputs().isEmpty()) {

Reply via email to