This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 4f2054cde [core][bug] fix the issue of failing to get table with 
timeTravel semantics from catalog (#761)
4f2054cde is described below

commit 4f2054cdebde5e09e1cc457ce58a58c178c4209c
Author: liming30 <[email protected]>
AuthorDate: Tue Apr 4 15:23:31 2023 +0800

    [core][bug] fix the issue of failing to get table with timeTravel semantics 
from catalog (#761)
---
 .../paimon/table/AbstractFileStoreTable.java       |   6 +
 .../org/apache/paimon/flink/FlinkCatalogTest.java  | 202 ++++++++++++---------
 2 files changed, 120 insertions(+), 88 deletions(-)

diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java 
b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
index 7881d113a..e0427a23b 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
@@ -61,6 +61,12 @@ public abstract class AbstractFileStoreTable implements 
FileStoreTable {
     public AbstractFileStoreTable(FileIO fileIO, Path path, TableSchema 
tableSchema) {
         this.fileIO = fileIO;
         this.path = path;
+        if (!tableSchema.options().containsKey(PATH.key())) {
+            // make sure table is always available
+            Map<String, String> newOptions = new 
HashMap<>(tableSchema.options());
+            newOptions.put(PATH.key(), path.toString());
+            tableSchema = tableSchema.copy(newOptions);
+        }
         this.tableSchema = tableSchema;
     }
 
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkCatalogTest.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkCatalogTest.java
index b9c974957..1af579c61 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkCatalogTest.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkCatalogTest.java
@@ -18,6 +18,7 @@
 
 package org.apache.paimon.flink;
 
+import org.apache.paimon.CoreOptions;
 import org.apache.paimon.catalog.AbstractCatalog;
 import org.apache.paimon.catalog.CatalogContext;
 import org.apache.paimon.fs.Path;
@@ -43,9 +44,12 @@ import 
org.apache.flink.table.catalog.exceptions.TableNotExistException;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.io.TempDir;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.MethodSource;
 
 import java.io.File;
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
@@ -53,6 +57,7 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.UUID;
+import java.util.stream.Stream;
 
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
@@ -94,38 +99,11 @@ public class FlinkCatalogTest {
                 null);
     }
 
-    private CatalogTable createStreamingTable() {
-        ResolvedSchema resolvedSchema = this.createSchema();
-        CatalogTable origin =
-                CatalogTable.of(
-                        
Schema.newBuilder().fromResolvedSchema(resolvedSchema).build(),
-                        "test comment",
-                        Collections.emptyList(),
-                        this.getStreamingTableProperties());
-        return new ResolvedCatalogTable(origin, resolvedSchema);
-    }
-
     private List<String> createPartitionKeys() {
         return Arrays.asList("second", "third");
     }
 
-    private Map<String, String> getBatchTableProperties() {
-        return new HashMap<String, String>() {
-            {
-                this.put("is_streaming", "false");
-            }
-        };
-    }
-
-    private Map<String, String> getStreamingTableProperties() {
-        return new HashMap<String, String>() {
-            {
-                this.put("is_streaming", "true");
-            }
-        };
-    }
-
-    private CatalogTable createAnotherTable() {
+    private CatalogTable createAnotherTable(Map<String, String> options) {
         // TODO support change schema, modify it to createAnotherSchema
         ResolvedSchema resolvedSchema = this.createSchema();
         CatalogTable origin =
@@ -133,11 +111,11 @@ public class FlinkCatalogTest {
                         
Schema.newBuilder().fromResolvedSchema(resolvedSchema).build(),
                         "test comment",
                         Collections.emptyList(),
-                        this.getBatchTableProperties());
+                        options);
         return new ResolvedCatalogTable(origin, resolvedSchema);
     }
 
-    private CatalogTable createAnotherPartitionedTable() {
+    private CatalogTable createAnotherPartitionedTable(Map<String, String> 
options) {
         // TODO support change schema, modify it to createAnotherSchema
         ResolvedSchema resolvedSchema = this.createSchema();
         CatalogTable origin =
@@ -145,39 +123,40 @@ public class FlinkCatalogTest {
                         
Schema.newBuilder().fromResolvedSchema(resolvedSchema).build(),
                         "test comment",
                         this.createPartitionKeys(),
-                        this.getBatchTableProperties());
+                        options);
         return new ResolvedCatalogTable(origin, resolvedSchema);
     }
 
-    private CatalogTable createTable() {
+    private CatalogTable createTable(Map<String, String> options) {
         ResolvedSchema resolvedSchema = this.createSchema();
         CatalogTable origin =
                 CatalogTable.of(
                         
Schema.newBuilder().fromResolvedSchema(resolvedSchema).build(),
                         "test comment",
                         Collections.emptyList(),
-                        this.getBatchTableProperties());
+                        options);
         return new ResolvedCatalogTable(origin, resolvedSchema);
     }
 
-    private CatalogTable createPartitionedTable() {
+    private CatalogTable createPartitionedTable(Map<String, String> options) {
         ResolvedSchema resolvedSchema = this.createSchema();
         CatalogTable origin =
                 CatalogTable.of(
                         
Schema.newBuilder().fromResolvedSchema(resolvedSchema).build(),
                         "test comment",
                         this.createPartitionKeys(),
-                        this.getBatchTableProperties());
+                        options);
         return new ResolvedCatalogTable(origin, resolvedSchema);
     }
 
-    @Test
-    public void testAlterTable() throws Exception {
+    @ParameterizedTest
+    @MethodSource("batchOptionProvider")
+    public void testAlterTable(Map<String, String> options) throws Exception {
         catalog.createDatabase(path1.getDatabaseName(), null, false);
-        CatalogTable table = this.createTable();
+        CatalogTable table = this.createTable(options);
         catalog.createTable(this.path1, table, false);
         checkEquals(path1, table, (CatalogTable) catalog.getTable(this.path1));
-        CatalogTable newTable = this.createAnotherTable();
+        CatalogTable newTable = this.createAnotherTable(options);
         catalog.alterTable(this.path1, newTable, false);
         assertNotEquals(table, catalog.getTable(this.path1));
         checkEquals(path1, newTable, (CatalogTable) 
catalog.getTable(this.path1));
@@ -186,11 +165,12 @@ public class FlinkCatalogTest {
         // Not support views
     }
 
-    @Test
-    public void testListTables() throws Exception {
+    @ParameterizedTest
+    @MethodSource("batchOptionProvider")
+    public void testListTables(Map<String, String> options) throws Exception {
         catalog.createDatabase(path1.getDatabaseName(), null, false);
-        catalog.createTable(this.path1, this.createTable(), false);
-        catalog.createTable(this.path3, this.createTable(), false);
+        catalog.createTable(this.path1, this.createTable(options), false);
+        catalog.createTable(this.path3, this.createTable(options), false);
         assertEquals(2L, catalog.listTables("db1").size());
 
         // Not support views
@@ -201,10 +181,11 @@ public class FlinkCatalogTest {
         // TODO support this
     }
 
-    @Test
-    public void testCreateFlinkTable() {
+    @ParameterizedTest
+    @MethodSource("batchOptionProvider")
+    public void testCreateFlinkTable(Map<String, String> options) {
         // create a flink table
-        CatalogTable table = createTable();
+        CatalogTable table = createTable(options);
         HashMap<String, String> newOptions = new HashMap<>(table.getOptions());
         newOptions.put("connector", "filesystem");
         CatalogTable newTable = table.copy(newOptions);
@@ -216,29 +197,32 @@ public class FlinkCatalogTest {
                                 + " not 'filesystem' connector. You can create 
TEMPORARY table instead.");
     }
 
-    @Test
-    public void testCreateTable_Streaming() throws Exception {
+    @ParameterizedTest
+    @MethodSource("streamingOptionProvider")
+    public void testCreateTable_Streaming(Map<String, String> options) throws 
Exception {
         catalog.createDatabase(path1.getDatabaseName(), null, false);
-        CatalogTable table = createStreamingTable();
+        CatalogTable table = createTable(options);
         catalog.createTable(path1, table, false);
         checkEquals(path1, table, (CatalogTable) catalog.getTable(path1));
     }
 
-    @Test
-    public void testAlterPartitionedTable() throws Exception {
+    @ParameterizedTest
+    @MethodSource("batchOptionProvider")
+    public void testAlterPartitionedTable(Map<String, String> options) throws 
Exception {
         catalog.createDatabase(path1.getDatabaseName(), null, false);
-        CatalogTable table = this.createPartitionedTable();
+        CatalogTable table = this.createPartitionedTable(options);
         catalog.createTable(this.path1, table, false);
         checkEquals(path1, table, (CatalogTable) catalog.getTable(this.path1));
-        CatalogTable newTable = this.createAnotherPartitionedTable();
+        CatalogTable newTable = this.createAnotherPartitionedTable(options);
         catalog.alterTable(this.path1, newTable, false);
         checkEquals(path1, newTable, (CatalogTable) 
catalog.getTable(this.path1));
     }
 
-    @Test
-    public void testCreateTable_Batch() throws Exception {
+    @ParameterizedTest
+    @MethodSource("batchOptionProvider")
+    public void testCreateTable_Batch(Map<String, String> options) throws 
Exception {
         catalog.createDatabase(path1.getDatabaseName(), null, false);
-        CatalogTable table = this.createTable();
+        CatalogTable table = this.createTable(options);
         catalog.createTable(this.path1, table, false);
         CatalogBaseTable tableCreated = catalog.getTable(this.path1);
         checkEquals(path1, table, (CatalogTable) tableCreated);
@@ -249,20 +233,23 @@ public class FlinkCatalogTest {
         catalog.dropTable(this.path1, false);
     }
 
-    @Test
-    public void testCreateTable_TableAlreadyExist_ignored() throws Exception {
+    @ParameterizedTest
+    @MethodSource("batchOptionProvider")
+    public void testCreateTable_TableAlreadyExist_ignored(Map<String, String> 
options)
+            throws Exception {
         catalog.createDatabase(path1.getDatabaseName(), null, false);
-        CatalogTable table = this.createTable();
+        CatalogTable table = this.createTable(options);
         catalog.createTable(this.path1, table, false);
         checkEquals(path1, table, (CatalogTable) catalog.getTable(this.path1));
-        catalog.createTable(this.path1, this.createAnotherTable(), true);
+        catalog.createTable(this.path1, this.createAnotherTable(options), 
true);
         checkEquals(path1, table, (CatalogTable) catalog.getTable(this.path1));
     }
 
-    @Test
-    public void testCreatePartitionedTable_Batch() throws Exception {
+    @ParameterizedTest
+    @MethodSource("batchOptionProvider")
+    public void testCreatePartitionedTable_Batch(Map<String, String> options) 
throws Exception {
         catalog.createDatabase(path1.getDatabaseName(), null, false);
-        CatalogTable table = this.createPartitionedTable();
+        CatalogTable table = this.createPartitionedTable(options);
         catalog.createTable(this.path1, table, false);
         checkEquals(path1, table, (CatalogTable) catalog.getTable(this.path1));
         List<String> tables = catalog.listTables("db1");
@@ -270,10 +257,11 @@ public class FlinkCatalogTest {
         assertEquals(this.path1.getObjectName(), tables.get(0));
     }
 
-    @Test
-    public void testDropDb_DatabaseNotEmptyException() throws Exception {
+    @ParameterizedTest
+    @MethodSource("batchOptionProvider")
+    public void testDropDb_DatabaseNotEmptyException(Map<String, String> 
options) throws Exception {
         catalog.createDatabase(path1.getDatabaseName(), null, false);
-        catalog.createTable(this.path1, this.createTable(), false);
+        catalog.createTable(this.path1, this.createTable(options), false);
         assertThatThrownBy(
                         () -> {
                             catalog.dropDatabase("db1", true, false);
@@ -282,11 +270,12 @@ public class FlinkCatalogTest {
                 .hasMessage("Database db1 in catalog test-catalog is not 
empty.");
     }
 
-    @Test
-    public void testTableExists() throws Exception {
+    @ParameterizedTest
+    @MethodSource("batchOptionProvider")
+    public void testTableExists(Map<String, String> options) throws Exception {
         catalog.createDatabase(path1.getDatabaseName(), null, false);
         assertFalse(catalog.tableExists(this.path1));
-        catalog.createTable(this.path1, this.createTable(), false);
+        catalog.createTable(this.path1, this.createTable(options), false);
         assertTrue(catalog.tableExists(this.path1));
 
         // system tables
@@ -300,9 +289,10 @@ public class FlinkCatalogTest {
                                 path1.getDatabaseName(), path1.getObjectName() 
+ "$unknown")));
     }
 
-    @Test
-    public void testAlterTable_TableNotExist_ignored() throws Exception {
-        catalog.alterTable(this.nonExistObjectPath, this.createTable(), true);
+    @ParameterizedTest
+    @MethodSource("batchOptionProvider")
+    public void testAlterTable_TableNotExist_ignored(Map<String, String> 
options) throws Exception {
+        catalog.alterTable(this.nonExistObjectPath, this.createTable(options), 
true);
         assertFalse(catalog.tableExists(this.nonExistObjectPath));
     }
 
@@ -311,19 +301,22 @@ public class FlinkCatalogTest {
         catalog.dropTable(this.nonExistObjectPath, true);
     }
 
-    @Test
-    public void testCreateTable_TableAlreadyExistException() throws Exception {
+    @ParameterizedTest
+    @MethodSource("batchOptionProvider")
+    public void testCreateTable_TableAlreadyExistException(Map<String, String> 
options)
+            throws Exception {
         catalog.createDatabase(path1.getDatabaseName(), null, false);
-        catalog.createTable(this.path1, this.createTable(), false);
-        assertThatThrownBy(() -> catalog.createTable(this.path1, 
this.createTable(), false))
+        catalog.createTable(this.path1, this.createTable(options), false);
+        assertThatThrownBy(() -> catalog.createTable(this.path1, 
this.createTable(options), false))
                 .isInstanceOf(TableAlreadyExistException.class)
                 .hasMessage("Table (or view) db1.t1 already exists in Catalog 
test-catalog.");
     }
 
-    @Test
-    public void testDropTable_nonPartitionedTable() throws Exception {
+    @ParameterizedTest
+    @MethodSource("batchOptionProvider")
+    public void testDropTable_nonPartitionedTable(Map<String, String> options) 
throws Exception {
         catalog.createDatabase(path1.getDatabaseName(), null, false);
-        catalog.createTable(this.path1, this.createTable(), false);
+        catalog.createTable(this.path1, this.createTable(options), false);
         assertTrue(catalog.tableExists(this.path1));
         catalog.dropTable(this.path1, false);
         assertFalse(catalog.tableExists(this.path1));
@@ -336,10 +329,11 @@ public class FlinkCatalogTest {
                 .hasMessage("Table (or view) db1.nonexist does not exist in 
Catalog test-catalog.");
     }
 
-    @Test
-    public void testDbExists() throws Exception {
+    @ParameterizedTest
+    @MethodSource("batchOptionProvider")
+    public void testDbExists(Map<String, String> options) throws Exception {
         catalog.createDatabase(path1.getDatabaseName(), null, false);
-        catalog.createTable(this.path1, this.createTable(), false);
+        catalog.createTable(this.path1, this.createTable(options), false);
         assertTrue(catalog.databaseExists("db1"));
     }
 
@@ -359,9 +353,14 @@ public class FlinkCatalogTest {
         catalog.dropDatabase("db1", true, false);
     }
 
-    @Test
-    public void testAlterTable_TableNotExistException() throws Exception {
-        assertThatThrownBy(() -> catalog.alterTable(this.nonExistDbPath, 
this.createTable(), false))
+    @ParameterizedTest
+    @MethodSource("batchOptionProvider")
+    public void testAlterTable_TableNotExistException(Map<String, String> 
options)
+            throws Exception {
+        assertThatThrownBy(
+                        () ->
+                                catalog.alterTable(
+                                        this.nonExistDbPath, 
this.createTable(options), false))
                 .isInstanceOf(TableNotExistException.class)
                 .hasMessage("Table (or view) non.exist does not exist in 
Catalog test-catalog.");
     }
@@ -411,11 +410,13 @@ public class FlinkCatalogTest {
                 .hasMessage("Create database with description is 
unsupported.");
     }
 
-    @Test
-    public void testCreateTable_DatabaseNotExistException() {
+    @ParameterizedTest
+    @MethodSource("batchOptionProvider")
+    public void testCreateTable_DatabaseNotExistException(Map<String, String> 
options) {
         assertThat(catalog.databaseExists(path1.getDatabaseName())).isFalse();
 
-        assertThatThrownBy(() -> catalog.createTable(nonExistObjectPath, 
createTable(), false))
+        assertThatThrownBy(
+                        () -> catalog.createTable(nonExistObjectPath, 
createTable(options), false))
                 .isInstanceOf(DatabaseNotExistException.class)
                 .hasMessage("Database db1 does not exist in Catalog 
test-catalog.");
     }
@@ -445,4 +446,29 @@ public class FlinkCatalogTest {
         assertThat(t2.isPartitioned()).isEqualTo(t1.isPartitioned());
         assertThat(t2.getOptions()).isEqualTo(t1.getOptions());
     }
+
+    static Stream<Map<String, String>> streamingOptionProvider() {
+        return optionProvider(true);
+    }
+
+    static Stream<Map<String, String>> batchOptionProvider() {
+        return optionProvider(false);
+    }
+
+    private static Stream<Map<String, String>> optionProvider(boolean 
isStreaming) {
+        List<Map<String, String>> allOptions = new ArrayList<>();
+        for (CoreOptions.StartupMode mode : CoreOptions.StartupMode.values()) {
+            Map<String, String> options = new HashMap<>();
+            options.put("is_streaming", String.valueOf(isStreaming));
+            options.put("scan.mode", mode.toString());
+            if (mode == CoreOptions.StartupMode.FROM_SNAPSHOT
+                    || mode == CoreOptions.StartupMode.FROM_SNAPSHOT_FULL) {
+                options.put("scan.snapshot-id", "1");
+            } else if (mode == CoreOptions.StartupMode.FROM_TIMESTAMP) {
+                options.put("scan.timestamp-millis", 
System.currentTimeMillis() + "");
+            }
+            allOptions.add(options);
+        }
+        return allOptions.stream();
+    }
 }

Reply via email to