This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 4f2054cde [core][bug] fix the issue of failing to get table with
timeTravel semantics from catalog (#761)
4f2054cde is described below
commit 4f2054cdebde5e09e1cc457ce58a58c178c4209c
Author: liming30 <[email protected]>
AuthorDate: Tue Apr 4 15:23:31 2023 +0800
[core][bug] fix the issue of failing to get table with timeTravel semantics
from catalog (#761)
---
.../paimon/table/AbstractFileStoreTable.java | 6 +
.../org/apache/paimon/flink/FlinkCatalogTest.java | 202 ++++++++++++---------
2 files changed, 120 insertions(+), 88 deletions(-)
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
index 7881d113a..e0427a23b 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
@@ -61,6 +61,12 @@ public abstract class AbstractFileStoreTable implements
FileStoreTable {
public AbstractFileStoreTable(FileIO fileIO, Path path, TableSchema
tableSchema) {
this.fileIO = fileIO;
this.path = path;
+ if (!tableSchema.options().containsKey(PATH.key())) {
+ // make sure table is always available
+ Map<String, String> newOptions = new
HashMap<>(tableSchema.options());
+ newOptions.put(PATH.key(), path.toString());
+ tableSchema = tableSchema.copy(newOptions);
+ }
this.tableSchema = tableSchema;
}
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkCatalogTest.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkCatalogTest.java
index b9c974957..1af579c61 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkCatalogTest.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkCatalogTest.java
@@ -18,6 +18,7 @@
package org.apache.paimon.flink;
+import org.apache.paimon.CoreOptions;
import org.apache.paimon.catalog.AbstractCatalog;
import org.apache.paimon.catalog.CatalogContext;
import org.apache.paimon.fs.Path;
@@ -43,9 +44,12 @@ import
org.apache.flink.table.catalog.exceptions.TableNotExistException;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.MethodSource;
import java.io.File;
import java.io.IOException;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
@@ -53,6 +57,7 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.UUID;
+import java.util.stream.Stream;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
@@ -94,38 +99,11 @@ public class FlinkCatalogTest {
null);
}
- private CatalogTable createStreamingTable() {
- ResolvedSchema resolvedSchema = this.createSchema();
- CatalogTable origin =
- CatalogTable.of(
-
Schema.newBuilder().fromResolvedSchema(resolvedSchema).build(),
- "test comment",
- Collections.emptyList(),
- this.getStreamingTableProperties());
- return new ResolvedCatalogTable(origin, resolvedSchema);
- }
-
private List<String> createPartitionKeys() {
return Arrays.asList("second", "third");
}
- private Map<String, String> getBatchTableProperties() {
- return new HashMap<String, String>() {
- {
- this.put("is_streaming", "false");
- }
- };
- }
-
- private Map<String, String> getStreamingTableProperties() {
- return new HashMap<String, String>() {
- {
- this.put("is_streaming", "true");
- }
- };
- }
-
- private CatalogTable createAnotherTable() {
+ private CatalogTable createAnotherTable(Map<String, String> options) {
// TODO support change schema, modify it to createAnotherSchema
ResolvedSchema resolvedSchema = this.createSchema();
CatalogTable origin =
@@ -133,11 +111,11 @@ public class FlinkCatalogTest {
Schema.newBuilder().fromResolvedSchema(resolvedSchema).build(),
"test comment",
Collections.emptyList(),
- this.getBatchTableProperties());
+ options);
return new ResolvedCatalogTable(origin, resolvedSchema);
}
- private CatalogTable createAnotherPartitionedTable() {
+ private CatalogTable createAnotherPartitionedTable(Map<String, String>
options) {
// TODO support change schema, modify it to createAnotherSchema
ResolvedSchema resolvedSchema = this.createSchema();
CatalogTable origin =
@@ -145,39 +123,40 @@ public class FlinkCatalogTest {
Schema.newBuilder().fromResolvedSchema(resolvedSchema).build(),
"test comment",
this.createPartitionKeys(),
- this.getBatchTableProperties());
+ options);
return new ResolvedCatalogTable(origin, resolvedSchema);
}
- private CatalogTable createTable() {
+ private CatalogTable createTable(Map<String, String> options) {
ResolvedSchema resolvedSchema = this.createSchema();
CatalogTable origin =
CatalogTable.of(
Schema.newBuilder().fromResolvedSchema(resolvedSchema).build(),
"test comment",
Collections.emptyList(),
- this.getBatchTableProperties());
+ options);
return new ResolvedCatalogTable(origin, resolvedSchema);
}
- private CatalogTable createPartitionedTable() {
+ private CatalogTable createPartitionedTable(Map<String, String> options) {
ResolvedSchema resolvedSchema = this.createSchema();
CatalogTable origin =
CatalogTable.of(
Schema.newBuilder().fromResolvedSchema(resolvedSchema).build(),
"test comment",
this.createPartitionKeys(),
- this.getBatchTableProperties());
+ options);
return new ResolvedCatalogTable(origin, resolvedSchema);
}
- @Test
- public void testAlterTable() throws Exception {
+ @ParameterizedTest
+ @MethodSource("batchOptionProvider")
+ public void testAlterTable(Map<String, String> options) throws Exception {
catalog.createDatabase(path1.getDatabaseName(), null, false);
- CatalogTable table = this.createTable();
+ CatalogTable table = this.createTable(options);
catalog.createTable(this.path1, table, false);
checkEquals(path1, table, (CatalogTable) catalog.getTable(this.path1));
- CatalogTable newTable = this.createAnotherTable();
+ CatalogTable newTable = this.createAnotherTable(options);
catalog.alterTable(this.path1, newTable, false);
assertNotEquals(table, catalog.getTable(this.path1));
checkEquals(path1, newTable, (CatalogTable)
catalog.getTable(this.path1));
@@ -186,11 +165,12 @@ public class FlinkCatalogTest {
// Not support views
}
- @Test
- public void testListTables() throws Exception {
+ @ParameterizedTest
+ @MethodSource("batchOptionProvider")
+ public void testListTables(Map<String, String> options) throws Exception {
catalog.createDatabase(path1.getDatabaseName(), null, false);
- catalog.createTable(this.path1, this.createTable(), false);
- catalog.createTable(this.path3, this.createTable(), false);
+ catalog.createTable(this.path1, this.createTable(options), false);
+ catalog.createTable(this.path3, this.createTable(options), false);
assertEquals(2L, catalog.listTables("db1").size());
// Not support views
@@ -201,10 +181,11 @@ public class FlinkCatalogTest {
// TODO support this
}
- @Test
- public void testCreateFlinkTable() {
+ @ParameterizedTest
+ @MethodSource("batchOptionProvider")
+ public void testCreateFlinkTable(Map<String, String> options) {
// create a flink table
- CatalogTable table = createTable();
+ CatalogTable table = createTable(options);
HashMap<String, String> newOptions = new HashMap<>(table.getOptions());
newOptions.put("connector", "filesystem");
CatalogTable newTable = table.copy(newOptions);
@@ -216,29 +197,32 @@ public class FlinkCatalogTest {
+ " not 'filesystem' connector. You can create
TEMPORARY table instead.");
}
- @Test
- public void testCreateTable_Streaming() throws Exception {
+ @ParameterizedTest
+ @MethodSource("streamingOptionProvider")
+ public void testCreateTable_Streaming(Map<String, String> options) throws
Exception {
catalog.createDatabase(path1.getDatabaseName(), null, false);
- CatalogTable table = createStreamingTable();
+ CatalogTable table = createTable(options);
catalog.createTable(path1, table, false);
checkEquals(path1, table, (CatalogTable) catalog.getTable(path1));
}
- @Test
- public void testAlterPartitionedTable() throws Exception {
+ @ParameterizedTest
+ @MethodSource("batchOptionProvider")
+ public void testAlterPartitionedTable(Map<String, String> options) throws
Exception {
catalog.createDatabase(path1.getDatabaseName(), null, false);
- CatalogTable table = this.createPartitionedTable();
+ CatalogTable table = this.createPartitionedTable(options);
catalog.createTable(this.path1, table, false);
checkEquals(path1, table, (CatalogTable) catalog.getTable(this.path1));
- CatalogTable newTable = this.createAnotherPartitionedTable();
+ CatalogTable newTable = this.createAnotherPartitionedTable(options);
catalog.alterTable(this.path1, newTable, false);
checkEquals(path1, newTable, (CatalogTable)
catalog.getTable(this.path1));
}
- @Test
- public void testCreateTable_Batch() throws Exception {
+ @ParameterizedTest
+ @MethodSource("batchOptionProvider")
+ public void testCreateTable_Batch(Map<String, String> options) throws
Exception {
catalog.createDatabase(path1.getDatabaseName(), null, false);
- CatalogTable table = this.createTable();
+ CatalogTable table = this.createTable(options);
catalog.createTable(this.path1, table, false);
CatalogBaseTable tableCreated = catalog.getTable(this.path1);
checkEquals(path1, table, (CatalogTable) tableCreated);
@@ -249,20 +233,23 @@ public class FlinkCatalogTest {
catalog.dropTable(this.path1, false);
}
- @Test
- public void testCreateTable_TableAlreadyExist_ignored() throws Exception {
+ @ParameterizedTest
+ @MethodSource("batchOptionProvider")
+ public void testCreateTable_TableAlreadyExist_ignored(Map<String, String>
options)
+ throws Exception {
catalog.createDatabase(path1.getDatabaseName(), null, false);
- CatalogTable table = this.createTable();
+ CatalogTable table = this.createTable(options);
catalog.createTable(this.path1, table, false);
checkEquals(path1, table, (CatalogTable) catalog.getTable(this.path1));
- catalog.createTable(this.path1, this.createAnotherTable(), true);
+ catalog.createTable(this.path1, this.createAnotherTable(options),
true);
checkEquals(path1, table, (CatalogTable) catalog.getTable(this.path1));
}
- @Test
- public void testCreatePartitionedTable_Batch() throws Exception {
+ @ParameterizedTest
+ @MethodSource("batchOptionProvider")
+ public void testCreatePartitionedTable_Batch(Map<String, String> options)
throws Exception {
catalog.createDatabase(path1.getDatabaseName(), null, false);
- CatalogTable table = this.createPartitionedTable();
+ CatalogTable table = this.createPartitionedTable(options);
catalog.createTable(this.path1, table, false);
checkEquals(path1, table, (CatalogTable) catalog.getTable(this.path1));
List<String> tables = catalog.listTables("db1");
@@ -270,10 +257,11 @@ public class FlinkCatalogTest {
assertEquals(this.path1.getObjectName(), tables.get(0));
}
- @Test
- public void testDropDb_DatabaseNotEmptyException() throws Exception {
+ @ParameterizedTest
+ @MethodSource("batchOptionProvider")
+ public void testDropDb_DatabaseNotEmptyException(Map<String, String>
options) throws Exception {
catalog.createDatabase(path1.getDatabaseName(), null, false);
- catalog.createTable(this.path1, this.createTable(), false);
+ catalog.createTable(this.path1, this.createTable(options), false);
assertThatThrownBy(
() -> {
catalog.dropDatabase("db1", true, false);
@@ -282,11 +270,12 @@ public class FlinkCatalogTest {
.hasMessage("Database db1 in catalog test-catalog is not
empty.");
}
- @Test
- public void testTableExists() throws Exception {
+ @ParameterizedTest
+ @MethodSource("batchOptionProvider")
+ public void testTableExists(Map<String, String> options) throws Exception {
catalog.createDatabase(path1.getDatabaseName(), null, false);
assertFalse(catalog.tableExists(this.path1));
- catalog.createTable(this.path1, this.createTable(), false);
+ catalog.createTable(this.path1, this.createTable(options), false);
assertTrue(catalog.tableExists(this.path1));
// system tables
@@ -300,9 +289,10 @@ public class FlinkCatalogTest {
path1.getDatabaseName(), path1.getObjectName()
+ "$unknown")));
}
- @Test
- public void testAlterTable_TableNotExist_ignored() throws Exception {
- catalog.alterTable(this.nonExistObjectPath, this.createTable(), true);
+ @ParameterizedTest
+ @MethodSource("batchOptionProvider")
+ public void testAlterTable_TableNotExist_ignored(Map<String, String>
options) throws Exception {
+ catalog.alterTable(this.nonExistObjectPath, this.createTable(options),
true);
assertFalse(catalog.tableExists(this.nonExistObjectPath));
}
@@ -311,19 +301,22 @@ public class FlinkCatalogTest {
catalog.dropTable(this.nonExistObjectPath, true);
}
- @Test
- public void testCreateTable_TableAlreadyExistException() throws Exception {
+ @ParameterizedTest
+ @MethodSource("batchOptionProvider")
+ public void testCreateTable_TableAlreadyExistException(Map<String, String>
options)
+ throws Exception {
catalog.createDatabase(path1.getDatabaseName(), null, false);
- catalog.createTable(this.path1, this.createTable(), false);
- assertThatThrownBy(() -> catalog.createTable(this.path1,
this.createTable(), false))
+ catalog.createTable(this.path1, this.createTable(options), false);
+ assertThatThrownBy(() -> catalog.createTable(this.path1,
this.createTable(options), false))
.isInstanceOf(TableAlreadyExistException.class)
.hasMessage("Table (or view) db1.t1 already exists in Catalog
test-catalog.");
}
- @Test
- public void testDropTable_nonPartitionedTable() throws Exception {
+ @ParameterizedTest
+ @MethodSource("batchOptionProvider")
+ public void testDropTable_nonPartitionedTable(Map<String, String> options)
throws Exception {
catalog.createDatabase(path1.getDatabaseName(), null, false);
- catalog.createTable(this.path1, this.createTable(), false);
+ catalog.createTable(this.path1, this.createTable(options), false);
assertTrue(catalog.tableExists(this.path1));
catalog.dropTable(this.path1, false);
assertFalse(catalog.tableExists(this.path1));
@@ -336,10 +329,11 @@ public class FlinkCatalogTest {
.hasMessage("Table (or view) db1.nonexist does not exist in
Catalog test-catalog.");
}
- @Test
- public void testDbExists() throws Exception {
+ @ParameterizedTest
+ @MethodSource("batchOptionProvider")
+ public void testDbExists(Map<String, String> options) throws Exception {
catalog.createDatabase(path1.getDatabaseName(), null, false);
- catalog.createTable(this.path1, this.createTable(), false);
+ catalog.createTable(this.path1, this.createTable(options), false);
assertTrue(catalog.databaseExists("db1"));
}
@@ -359,9 +353,14 @@ public class FlinkCatalogTest {
catalog.dropDatabase("db1", true, false);
}
- @Test
- public void testAlterTable_TableNotExistException() throws Exception {
- assertThatThrownBy(() -> catalog.alterTable(this.nonExistDbPath,
this.createTable(), false))
+ @ParameterizedTest
+ @MethodSource("batchOptionProvider")
+ public void testAlterTable_TableNotExistException(Map<String, String>
options)
+ throws Exception {
+ assertThatThrownBy(
+ () ->
+ catalog.alterTable(
+ this.nonExistDbPath,
this.createTable(options), false))
.isInstanceOf(TableNotExistException.class)
.hasMessage("Table (or view) non.exist does not exist in
Catalog test-catalog.");
}
@@ -411,11 +410,13 @@ public class FlinkCatalogTest {
.hasMessage("Create database with description is
unsupported.");
}
- @Test
- public void testCreateTable_DatabaseNotExistException() {
+ @ParameterizedTest
+ @MethodSource("batchOptionProvider")
+ public void testCreateTable_DatabaseNotExistException(Map<String, String>
options) {
assertThat(catalog.databaseExists(path1.getDatabaseName())).isFalse();
- assertThatThrownBy(() -> catalog.createTable(nonExistObjectPath,
createTable(), false))
+ assertThatThrownBy(
+ () -> catalog.createTable(nonExistObjectPath,
createTable(options), false))
.isInstanceOf(DatabaseNotExistException.class)
.hasMessage("Database db1 does not exist in Catalog
test-catalog.");
}
@@ -445,4 +446,29 @@ public class FlinkCatalogTest {
assertThat(t2.isPartitioned()).isEqualTo(t1.isPartitioned());
assertThat(t2.getOptions()).isEqualTo(t1.getOptions());
}
+
+ static Stream<Map<String, String>> streamingOptionProvider() {
+ return optionProvider(true);
+ }
+
+ static Stream<Map<String, String>> batchOptionProvider() {
+ return optionProvider(false);
+ }
+
+ private static Stream<Map<String, String>> optionProvider(boolean
isStreaming) {
+ List<Map<String, String>> allOptions = new ArrayList<>();
+ for (CoreOptions.StartupMode mode : CoreOptions.StartupMode.values()) {
+ Map<String, String> options = new HashMap<>();
+ options.put("is_streaming", String.valueOf(isStreaming));
+ options.put("scan.mode", mode.toString());
+ if (mode == CoreOptions.StartupMode.FROM_SNAPSHOT
+ || mode == CoreOptions.StartupMode.FROM_SNAPSHOT_FULL) {
+ options.put("scan.snapshot-id", "1");
+ } else if (mode == CoreOptions.StartupMode.FROM_TIMESTAMP) {
+ options.put("scan.timestamp-millis",
System.currentTimeMillis() + "");
+ }
+ allOptions.add(options);
+ }
+ return allOptions.stream();
+ }
}