This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new aadfb6d83 [flink] Fix that schema validation fails when using 'scan.x' 
options to do time travel on schema changed tables (#2543)
aadfb6d83 is described below

commit aadfb6d83de7636752d5773dbfc6e8abf24e0682
Author: yuzelin <[email protected]>
AuthorDate: Thu Dec 21 09:47:54 2023 +0800

    [flink] Fix that schema validation fails when using 'scan.x' options to do 
time travel on schema changed tables (#2543)
---
 .../org/apache/paimon/utils/DateTimeUtils.java     |  2 +-
 .../paimon/table/AbstractFileStoreTable.java       | 27 ++++++++++++++----
 .../org/apache/paimon/table/FileStoreTable.java    |  3 ++
 .../paimon/flink/AbstractFlinkTableFactory.java    |  8 ++++--
 .../apache/paimon/flink/BatchFileStoreITCase.java  | 30 ++++++++++++++++++++
 .../paimon/flink/ContinuousFileStoreITCase.java    | 32 ++++++++++++++++++++++
 6 files changed, 93 insertions(+), 9 deletions(-)

diff --git 
a/paimon-common/src/main/java/org/apache/paimon/utils/DateTimeUtils.java 
b/paimon-common/src/main/java/org/apache/paimon/utils/DateTimeUtils.java
index ad48b2d14..16d84bc95 100644
--- a/paimon-common/src/main/java/org/apache/paimon/utils/DateTimeUtils.java
+++ b/paimon-common/src/main/java/org/apache/paimon/utils/DateTimeUtils.java
@@ -407,7 +407,7 @@ public class DateTimeUtils {
     // Format
     // 
--------------------------------------------------------------------------------------------
 
-    private static String formatTimestamp(Timestamp ts, int precision) {
+    public static String formatTimestamp(Timestamp ts, int precision) {
         LocalDateTime ldt = ts.toLocalDateTime();
 
         String fraction = pad(9, ldt.getNano());
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java 
b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
index 7e206d787..14bc66316 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
@@ -163,6 +163,22 @@ public abstract class AbstractFileStoreTable implements 
FileStoreTable {
 
     @Override
     public FileStoreTable copy(Map<String, String> dynamicOptions) {
+        checkImmutability(dynamicOptions);
+        return copyInternal(dynamicOptions, true);
+    }
+
+    @Override
+    public FileStoreTable copyWithoutTimeTravel(Map<String, String> 
dynamicOptions) {
+        checkImmutability(dynamicOptions);
+        return copyInternal(dynamicOptions, false);
+    }
+
+    @Override
+    public FileStoreTable internalCopyWithoutCheck(Map<String, String> 
dynamicOptions) {
+        return copyInternal(dynamicOptions, true);
+    }
+
+    private void checkImmutability(Map<String, String> dynamicOptions) {
         Map<String, String> options = tableSchema.options();
         // check option is not immutable
         dynamicOptions.forEach(
@@ -171,12 +187,9 @@ public abstract class AbstractFileStoreTable implements 
FileStoreTable {
                         SchemaManager.checkAlterTableOption(k);
                     }
                 });
-
-        return internalCopyWithoutCheck(dynamicOptions);
     }
 
-    @Override
-    public FileStoreTable internalCopyWithoutCheck(Map<String, String> 
dynamicOptions) {
+    private FileStoreTable copyInternal(Map<String, String> dynamicOptions, 
boolean tryTimeTravel) {
         Map<String, String> options = new HashMap<>(tableSchema.options());
 
         // merge non-null dynamic options into schema.options
@@ -203,8 +216,10 @@ public abstract class AbstractFileStoreTable implements 
FileStoreTable {
         // validate schema with new options
         SchemaValidation.validateTableSchema(newTableSchema);
 
-        // see if merged options contain time travel option
-        newTableSchema = tryTimeTravel(newOptions).orElse(newTableSchema);
+        if (tryTimeTravel) {
+            // see if merged options contain time travel option
+            newTableSchema = tryTimeTravel(newOptions).orElse(newTableSchema);
+        }
 
         return copy(newTableSchema);
     }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/FileStoreTable.java 
b/paimon-core/src/main/java/org/apache/paimon/table/FileStoreTable.java
index 860bb91be..5ff78a197 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/FileStoreTable.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/FileStoreTable.java
@@ -79,6 +79,9 @@ public interface FileStoreTable extends DataTable {
     @Override
     FileStoreTable copy(Map<String, String> dynamicOptions);
 
+    /** Doesn't change table schema even when there exists time travel scan 
options. */
+    FileStoreTable copyWithoutTimeTravel(Map<String, String> dynamicOptions);
+
     /** Sometimes we have to change some Immutable options to implement 
features. */
     FileStoreTable internalCopyWithoutCheck(Map<String, String> 
dynamicOptions);
 
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/AbstractFlinkTableFactory.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/AbstractFlinkTableFactory.java
index cc559c190..782a9804e 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/AbstractFlinkTableFactory.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/AbstractFlinkTableFactory.java
@@ -248,10 +248,14 @@ public abstract class AbstractFlinkTableFactory
         newOptions.putAll(origin.getOptions());
         newOptions.putAll(dynamicOptions);
 
+        // notice that the Paimon table schema must be the same with the 
Flink's
         if (origin instanceof DataCatalogTable) {
-            table = ((DataCatalogTable) origin).table().copy(newOptions);
+            FileStoreTable fileStoreTable = (FileStoreTable) 
((DataCatalogTable) origin).table();
+            table = fileStoreTable.copyWithoutTimeTravel(newOptions);
         } else {
-            table = 
FileStoreTableFactory.create(createCatalogContext(context)).copy(newOptions);
+            table =
+                    FileStoreTableFactory.create(createCatalogContext(context))
+                            .copyWithoutTimeTravel(newOptions);
         }
 
         Schema schema = 
FlinkCatalog.fromCatalogTable(context.getCatalogTable());
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java
index ac68dcad7..0a1e06473 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java
@@ -21,6 +21,7 @@ package org.apache.paimon.flink;
 import org.apache.paimon.CoreOptions;
 import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.utils.BlockingIterator;
+import org.apache.paimon.utils.DateTimeUtils;
 
 import org.apache.flink.types.Row;
 import org.apache.flink.types.RowKind;
@@ -386,4 +387,33 @@ public class BatchFileStoreITCase extends 
CatalogITCaseBase {
                         Row.ofKind(RowKind.INSERT, 1, "B"), 
Row.ofKind(RowKind.INSERT, 2, "B"));
         iterator.close();
     }
+
+    @Test
+    public void testScanFromOldSchema() throws InterruptedException {
+        sql("CREATE TABLE select_old (f0 INT PRIMARY KEY NOT ENFORCED, f1 
STRING)");
+
+        sql("INSERT INTO select_old VALUES (1, 'a'), (2, 'b')");
+
+        Thread.sleep(1_000);
+        long timestamp = System.currentTimeMillis();
+
+        sql("ALTER TABLE select_old ADD f2 STRING");
+        sql("INSERT INTO select_old VALUES (3, 'c', 'C')");
+
+        // this way will initialize source with the latest schema
+        assertThat(
+                        sql(
+                                "SELECT * FROM select_old /*+ 
OPTIONS('scan.timestamp-millis'='%s') */",
+                                timestamp))
+                // old schema doesn't have column f2
+                .containsExactlyInAnyOrder(Row.of(1, "a", null), Row.of(2, 
"b", null));
+
+        // this way will initialize source with time-travelled schema
+        assertThat(
+                        sql(
+                                "SELECT * FROM select_old FOR SYSTEM_TIME AS 
OF TIMESTAMP '%s'",
+                                DateTimeUtils.formatTimestamp(
+                                        DateTimeUtils.toInternal(timestamp, 
0), 0)))
+                .containsExactlyInAnyOrder(Row.of(1, "a"), Row.of(2, "b"));
+    }
 }
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ContinuousFileStoreITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ContinuousFileStoreITCase.java
index b056c9e86..2e88dcbb1 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ContinuousFileStoreITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ContinuousFileStoreITCase.java
@@ -21,6 +21,7 @@ package org.apache.paimon.flink;
 import org.apache.paimon.Snapshot;
 import org.apache.paimon.fs.local.LocalFileIO;
 import org.apache.paimon.utils.BlockingIterator;
+import org.apache.paimon.utils.DateTimeUtils;
 import org.apache.paimon.utils.SnapshotManager;
 
 import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableList;
@@ -521,4 +522,35 @@ public class ContinuousFileStoreITCase extends 
CatalogITCaseBase {
         sql("INSERT INTO ignore_delete VALUES (1, 'B')");
         assertThat(sql("SELECT * FROM 
ignore_delete")).containsExactly(Row.of(1, "B"));
     }
+
+    @Test
+    public void testScanFromOldSchema() throws Exception {
+        sql("CREATE TABLE select_old (f0 INT PRIMARY KEY NOT ENFORCED, f1 
STRING)");
+
+        sql("INSERT INTO select_old VALUES (1, 'a'), (2, 'b')");
+
+        Thread.sleep(1_000);
+        long timestamp = System.currentTimeMillis();
+
+        sql("ALTER TABLE select_old ADD f2 STRING");
+        sql("INSERT INTO select_old VALUES (3, 'c', 'C')");
+
+        // this way will initialize source with the latest schema
+        BlockingIterator<Row, Row> iterator =
+                BlockingIterator.of(
+                        streamSqlIter(
+                                "SELECT * FROM select_old /*+ 
OPTIONS('scan.timestamp-millis'='%s') */",
+                                timestamp));
+        assertThat(iterator.collect(1)).containsExactlyInAnyOrder(Row.of(3, 
"c", "C"));
+        iterator.close();
+
+        // this way will initialize source with time-travelled schema
+        iterator =
+                BlockingIterator.of(
+                        streamSqlIter(
+                                "SELECT * FROM select_old FOR SYSTEM_TIME AS 
OF TIMESTAMP '%s'",
+                                DateTimeUtils.formatTimestamp(
+                                        DateTimeUtils.toInternal(timestamp, 
0), 0)));
+        assertThat(iterator.collect(1)).containsExactlyInAnyOrder(Row.of(3, 
"c"));
+    }
 }

Reply via email to