This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 2e9f77330 [flink] Fix the bug that Flink will throw 
UnsupportedOperationException when using TableDescriptor in Flink jobs (#2334)
2e9f77330 is described below

commit 2e9f7733004d8a64291bf11a1570b9275c417f34
Author: Lijie Wang <[email protected]>
AuthorDate: Fri Nov 17 14:10:39 2023 +0800

    [flink] Fix the bug that Flink will throw UnsupportedOperationException 
when using TableDescriptor in Flink jobs (#2334)
---
 .../java/org/apache/paimon/flink/FlinkCatalog.java |  4 +-
 .../org/apache/paimon/flink/FlinkCatalogTest.java  | 61 ++++++++++++++++++----
 2 files changed, 54 insertions(+), 11 deletions(-)

diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java
index b49d748de..84c628f9e 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java
@@ -301,7 +301,9 @@ public class FlinkCatalog extends AbstractCatalog {
         }
 
         Identifier identifier = toIdentifier(tablePath);
-        Map<String, String> options = table.getOptions();
+        // the returned value of "table.getOptions" may be unmodifiable (for 
example from
+        // TableDescriptor)
+        Map<String, String> options = new HashMap<>(table.getOptions());
         Schema paimonSchema = buildPaimonSchema(identifier, (CatalogTable) 
table, options);
 
         boolean unRegisterLogSystem = false;
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkCatalogTest.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkCatalogTest.java
index 0e69c2672..afae1a1ab 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkCatalogTest.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkCatalogTest.java
@@ -31,6 +31,7 @@ import org.apache.paimon.options.Options;
 
 import org.apache.flink.table.api.DataTypes;
 import org.apache.flink.table.api.Schema;
+import org.apache.flink.table.api.TableDescriptor;
 import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.table.catalog.Catalog;
 import org.apache.flink.table.catalog.CatalogBaseTable;
@@ -67,9 +68,11 @@ import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.UUID;
 import java.util.stream.Stream;
 
+import static org.apache.flink.table.factories.FactoryUtil.CONNECTOR;
 import static 
org.apache.paimon.flink.FlinkCatalogOptions.DISABLE_CREATE_TABLE_IN_DEFAULT_DB;
 import static 
org.apache.paimon.flink.FlinkCatalogOptions.LOG_SYSTEM_AUTO_REGISTER;
 import static org.apache.paimon.flink.FlinkConnectorOptions.LOG_SYSTEM;
@@ -181,11 +184,11 @@ public class FlinkCatalogTest {
         catalog.createDatabase(path1.getDatabaseName(), null, false);
         CatalogTable table = this.createTable(options);
         catalog.createTable(this.path1, table, false);
-        checkEquals(path1, table, (CatalogTable) catalog.getTable(this.path1));
+        checkCreateTable(path1, table, (CatalogTable) 
catalog.getTable(this.path1));
         CatalogTable newTable = this.createAnotherTable(options);
         catalog.alterTable(this.path1, newTable, false);
         assertThat(catalog.getTable(this.path1)).isNotEqualTo(table);
-        checkEquals(path1, newTable, (CatalogTable) 
catalog.getTable(this.path1));
+        checkAlterTable(path1, newTable, (CatalogTable) 
catalog.getTable(this.path1));
         catalog.dropTable(this.path1, false);
 
         // Not support views
@@ -227,7 +230,7 @@ public class FlinkCatalogTest {
         catalog.createDatabase(path1.getDatabaseName(), null, false);
         CatalogTable table = createTable(options);
         catalog.createTable(path1, table, false);
-        checkEquals(path1, table, (CatalogTable) catalog.getTable(path1));
+        checkCreateTable(path1, table, (CatalogTable) catalog.getTable(path1));
     }
 
     @ParameterizedTest
@@ -236,10 +239,10 @@ public class FlinkCatalogTest {
         catalog.createDatabase(path1.getDatabaseName(), null, false);
         CatalogTable table = this.createPartitionedTable(options);
         catalog.createTable(this.path1, table, false);
-        checkEquals(path1, table, (CatalogTable) catalog.getTable(this.path1));
+        checkCreateTable(path1, table, (CatalogTable) 
catalog.getTable(this.path1));
         CatalogTable newTable = this.createAnotherPartitionedTable(options);
         catalog.alterTable(this.path1, newTable, false);
-        checkEquals(path1, newTable, (CatalogTable) 
catalog.getTable(this.path1));
+        checkAlterTable(path1, newTable, (CatalogTable) 
catalog.getTable(this.path1));
     }
 
     @ParameterizedTest
@@ -249,7 +252,7 @@ public class FlinkCatalogTest {
         CatalogTable table = this.createTable(options);
         catalog.createTable(this.path1, table, false);
         CatalogBaseTable tableCreated = catalog.getTable(this.path1);
-        checkEquals(path1, table, (CatalogTable) tableCreated);
+        checkCreateTable(path1, table, (CatalogTable) tableCreated);
         assertThat(tableCreated.getDescription().get()).isEqualTo("test 
comment");
         List<String> tables = catalog.listTables("db1");
         assertThat(tables.size()).isEqualTo(1L);
@@ -264,9 +267,9 @@ public class FlinkCatalogTest {
         catalog.createDatabase(path1.getDatabaseName(), null, false);
         CatalogTable table = this.createTable(options);
         catalog.createTable(this.path1, table, false);
-        checkEquals(path1, table, (CatalogTable) catalog.getTable(this.path1));
+        checkCreateTable(path1, table, (CatalogTable) 
catalog.getTable(this.path1));
         catalog.createTable(this.path1, this.createAnotherTable(options), 
true);
-        checkEquals(path1, table, (CatalogTable) catalog.getTable(this.path1));
+        checkCreateTable(path1, table, (CatalogTable) 
catalog.getTable(this.path1));
     }
 
     @ParameterizedTest
@@ -275,7 +278,7 @@ public class FlinkCatalogTest {
         catalog.createDatabase(path1.getDatabaseName(), null, false);
         CatalogTable table = this.createPartitionedTable(options);
         catalog.createTable(this.path1, table, false);
-        checkEquals(path1, table, (CatalogTable) catalog.getTable(this.path1));
+        checkCreateTable(path1, table, (CatalogTable) 
catalog.getTable(this.path1));
         List<String> tables = catalog.listTables("db1");
         assertThat(tables.size()).isEqualTo(1L);
         assertThat(tables.get(0)).isEqualTo(this.path1.getObjectName());
@@ -479,6 +482,7 @@ public class FlinkCatalogTest {
 
         Map<String, String> expected = got.getOptions();
         expected.remove("path");
+        expected.remove(FlinkCatalogOptions.REGISTER_TIMEOUT.key());
         assertThat(catalogTable.getOptions()).isEqualTo(expected);
     }
 
@@ -557,12 +561,49 @@ public class FlinkCatalogTest {
                         "Creating table in default database is disabled, 
please specify a database name.");
     }
 
-    private void checkEquals(ObjectPath path, CatalogTable t1, CatalogTable 
t2) {
+    @Test
+    void testCreateTableFromTableDescriptor() throws Exception {
+        catalog.createDatabase(path1.getDatabaseName(), null, false);
+
+        final ResolvedSchema resolvedSchema = this.createSchema();
+        final TableDescriptor tableDescriptor =
+                TableDescriptor.forConnector("paimon")
+                        
.schema(Schema.newBuilder().fromResolvedSchema(resolvedSchema).build())
+                        .build();
+        final CatalogTable catalogTable =
+                new ResolvedCatalogTable(tableDescriptor.toCatalogTable(), 
resolvedSchema);
+        catalog.createTable(path1, catalogTable, false);
+        checkCreateTable(path1, catalogTable, (CatalogTable) 
catalog.getTable(path1));
+    }
+
+    private void checkCreateTable(ObjectPath path, CatalogTable expected, 
CatalogTable actual) {
+        checkEquals(
+                path,
+                expected,
+                actual,
+                Collections.singletonMap(
+                        FlinkCatalogOptions.REGISTER_TIMEOUT.key(),
+                        
FlinkCatalogOptions.REGISTER_TIMEOUT.defaultValue().toString()),
+                Collections.singleton(CONNECTOR.key()));
+    }
+
+    private void checkAlterTable(ObjectPath path, CatalogTable expected, 
CatalogTable actual) {
+        checkEquals(path, expected, actual, Collections.emptyMap(), 
Collections.emptySet());
+    }
+
+    private void checkEquals(
+            ObjectPath path,
+            CatalogTable t1,
+            CatalogTable t2,
+            Map<String, String> optionsToAdd,
+            Set<String> optionsToRemove) {
         Path tablePath =
                 ((AbstractCatalog) ((FlinkCatalog) catalog).catalog())
                         .getDataTableLocation(FlinkCatalog.toIdentifier(path));
         Map<String, String> options = new HashMap<>(t1.getOptions());
         options.put("path", tablePath.toString());
+        options.putAll(optionsToAdd);
+        optionsToRemove.forEach(options::remove);
         t1 = ((ResolvedCatalogTable) t1).copy(options);
         checkEquals(t1, t2);
     }

Reply via email to