This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 2e9f77330 [flink] Fix the bug that Flink will throw
UnsupportedOperationException when using TableDescriptor in Flink jobs (#2334)
2e9f77330 is described below
commit 2e9f7733004d8a64291bf11a1570b9275c417f34
Author: Lijie Wang <[email protected]>
AuthorDate: Fri Nov 17 14:10:39 2023 +0800
[flink] Fix the bug that Flink will throw UnsupportedOperationException
when using TableDescriptor in Flink jobs (#2334)
---
.../java/org/apache/paimon/flink/FlinkCatalog.java | 4 +-
.../org/apache/paimon/flink/FlinkCatalogTest.java | 61 ++++++++++++++++++----
2 files changed, 54 insertions(+), 11 deletions(-)
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java
index b49d748de..84c628f9e 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java
@@ -301,7 +301,9 @@ public class FlinkCatalog extends AbstractCatalog {
}
Identifier identifier = toIdentifier(tablePath);
- Map<String, String> options = table.getOptions();
+ // the returned value of "table.getOptions" may be unmodifiable (for
example from
+ // TableDescriptor)
+ Map<String, String> options = new HashMap<>(table.getOptions());
Schema paimonSchema = buildPaimonSchema(identifier, (CatalogTable)
table, options);
boolean unRegisterLogSystem = false;
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkCatalogTest.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkCatalogTest.java
index 0e69c2672..afae1a1ab 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkCatalogTest.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkCatalogTest.java
@@ -31,6 +31,7 @@ import org.apache.paimon.options.Options;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.Schema;
+import org.apache.flink.table.api.TableDescriptor;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.catalog.Catalog;
import org.apache.flink.table.catalog.CatalogBaseTable;
@@ -67,9 +68,11 @@ import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.UUID;
import java.util.stream.Stream;
+import static org.apache.flink.table.factories.FactoryUtil.CONNECTOR;
import static
org.apache.paimon.flink.FlinkCatalogOptions.DISABLE_CREATE_TABLE_IN_DEFAULT_DB;
import static
org.apache.paimon.flink.FlinkCatalogOptions.LOG_SYSTEM_AUTO_REGISTER;
import static org.apache.paimon.flink.FlinkConnectorOptions.LOG_SYSTEM;
@@ -181,11 +184,11 @@ public class FlinkCatalogTest {
catalog.createDatabase(path1.getDatabaseName(), null, false);
CatalogTable table = this.createTable(options);
catalog.createTable(this.path1, table, false);
- checkEquals(path1, table, (CatalogTable) catalog.getTable(this.path1));
+ checkCreateTable(path1, table, (CatalogTable)
catalog.getTable(this.path1));
CatalogTable newTable = this.createAnotherTable(options);
catalog.alterTable(this.path1, newTable, false);
assertThat(catalog.getTable(this.path1)).isNotEqualTo(table);
- checkEquals(path1, newTable, (CatalogTable)
catalog.getTable(this.path1));
+ checkAlterTable(path1, newTable, (CatalogTable)
catalog.getTable(this.path1));
catalog.dropTable(this.path1, false);
// Not support views
@@ -227,7 +230,7 @@ public class FlinkCatalogTest {
catalog.createDatabase(path1.getDatabaseName(), null, false);
CatalogTable table = createTable(options);
catalog.createTable(path1, table, false);
- checkEquals(path1, table, (CatalogTable) catalog.getTable(path1));
+ checkCreateTable(path1, table, (CatalogTable) catalog.getTable(path1));
}
@ParameterizedTest
@@ -236,10 +239,10 @@ public class FlinkCatalogTest {
catalog.createDatabase(path1.getDatabaseName(), null, false);
CatalogTable table = this.createPartitionedTable(options);
catalog.createTable(this.path1, table, false);
- checkEquals(path1, table, (CatalogTable) catalog.getTable(this.path1));
+ checkCreateTable(path1, table, (CatalogTable)
catalog.getTable(this.path1));
CatalogTable newTable = this.createAnotherPartitionedTable(options);
catalog.alterTable(this.path1, newTable, false);
- checkEquals(path1, newTable, (CatalogTable)
catalog.getTable(this.path1));
+ checkAlterTable(path1, newTable, (CatalogTable)
catalog.getTable(this.path1));
}
@ParameterizedTest
@@ -249,7 +252,7 @@ public class FlinkCatalogTest {
CatalogTable table = this.createTable(options);
catalog.createTable(this.path1, table, false);
CatalogBaseTable tableCreated = catalog.getTable(this.path1);
- checkEquals(path1, table, (CatalogTable) tableCreated);
+ checkCreateTable(path1, table, (CatalogTable) tableCreated);
assertThat(tableCreated.getDescription().get()).isEqualTo("test
comment");
List<String> tables = catalog.listTables("db1");
assertThat(tables.size()).isEqualTo(1L);
@@ -264,9 +267,9 @@ public class FlinkCatalogTest {
catalog.createDatabase(path1.getDatabaseName(), null, false);
CatalogTable table = this.createTable(options);
catalog.createTable(this.path1, table, false);
- checkEquals(path1, table, (CatalogTable) catalog.getTable(this.path1));
+ checkCreateTable(path1, table, (CatalogTable)
catalog.getTable(this.path1));
catalog.createTable(this.path1, this.createAnotherTable(options),
true);
- checkEquals(path1, table, (CatalogTable) catalog.getTable(this.path1));
+ checkCreateTable(path1, table, (CatalogTable)
catalog.getTable(this.path1));
}
@ParameterizedTest
@@ -275,7 +278,7 @@ public class FlinkCatalogTest {
catalog.createDatabase(path1.getDatabaseName(), null, false);
CatalogTable table = this.createPartitionedTable(options);
catalog.createTable(this.path1, table, false);
- checkEquals(path1, table, (CatalogTable) catalog.getTable(this.path1));
+ checkCreateTable(path1, table, (CatalogTable)
catalog.getTable(this.path1));
List<String> tables = catalog.listTables("db1");
assertThat(tables.size()).isEqualTo(1L);
assertThat(tables.get(0)).isEqualTo(this.path1.getObjectName());
@@ -479,6 +482,7 @@ public class FlinkCatalogTest {
Map<String, String> expected = got.getOptions();
expected.remove("path");
+ expected.remove(FlinkCatalogOptions.REGISTER_TIMEOUT.key());
assertThat(catalogTable.getOptions()).isEqualTo(expected);
}
@@ -557,12 +561,49 @@ public class FlinkCatalogTest {
"Creating table in default database is disabled,
please specify a database name.");
}
- private void checkEquals(ObjectPath path, CatalogTable t1, CatalogTable
t2) {
+ @Test
+ void testCreateTableFromTableDescriptor() throws Exception {
+ catalog.createDatabase(path1.getDatabaseName(), null, false);
+
+ final ResolvedSchema resolvedSchema = this.createSchema();
+ final TableDescriptor tableDescriptor =
+ TableDescriptor.forConnector("paimon")
+
.schema(Schema.newBuilder().fromResolvedSchema(resolvedSchema).build())
+ .build();
+ final CatalogTable catalogTable =
+ new ResolvedCatalogTable(tableDescriptor.toCatalogTable(),
resolvedSchema);
+ catalog.createTable(path1, catalogTable, false);
+ checkCreateTable(path1, catalogTable, (CatalogTable)
catalog.getTable(path1));
+ }
+
+ private void checkCreateTable(ObjectPath path, CatalogTable expected,
CatalogTable actual) {
+ checkEquals(
+ path,
+ expected,
+ actual,
+ Collections.singletonMap(
+ FlinkCatalogOptions.REGISTER_TIMEOUT.key(),
+
FlinkCatalogOptions.REGISTER_TIMEOUT.defaultValue().toString()),
+ Collections.singleton(CONNECTOR.key()));
+ }
+
+ private void checkAlterTable(ObjectPath path, CatalogTable expected,
CatalogTable actual) {
+ checkEquals(path, expected, actual, Collections.emptyMap(),
Collections.emptySet());
+ }
+
+ private void checkEquals(
+ ObjectPath path,
+ CatalogTable t1,
+ CatalogTable t2,
+ Map<String, String> optionsToAdd,
+ Set<String> optionsToRemove) {
Path tablePath =
((AbstractCatalog) ((FlinkCatalog) catalog).catalog())
.getDataTableLocation(FlinkCatalog.toIdentifier(path));
Map<String, String> options = new HashMap<>(t1.getOptions());
options.put("path", tablePath.toString());
+ options.putAll(optionsToAdd);
+ optionsToRemove.forEach(options::remove);
t1 = ((ResolvedCatalogTable) t1).copy(options);
checkEquals(t1, t2);
}