This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 96e4bbf239 [flink] by default, use paimon catalog to create views in 
flink generic catalog (#5611)
96e4bbf239 is described below

commit 96e4bbf239b1f0f1e0fcc8907e1b00429aa1cf17
Author: Kerwin <[email protected]>
AuthorDate: Mon May 19 11:54:37 2025 +0800

    [flink] by default, use paimon catalog to create views in flink generic 
catalog (#5611)
---
 .../java/org/apache/paimon/flink/FlinkGenericCatalog.java     | 10 ++++++++++
 .../org/apache/paimon/hive/FlinkGenericCatalogITCase.java     | 11 +++++++++++
 2 files changed, 21 insertions(+)

diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkGenericCatalog.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkGenericCatalog.java
index 75af5917bb..5f7a05d885 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkGenericCatalog.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkGenericCatalog.java
@@ -27,6 +27,7 @@ import org.apache.flink.table.catalog.CatalogDatabase;
 import org.apache.flink.table.catalog.CatalogFunction;
 import org.apache.flink.table.catalog.CatalogPartition;
 import org.apache.flink.table.catalog.CatalogPartitionSpec;
+import org.apache.flink.table.catalog.CatalogView;
 import org.apache.flink.table.catalog.ObjectPath;
 import org.apache.flink.table.catalog.TableChange;
 import org.apache.flink.table.catalog.exceptions.CatalogException;
@@ -179,6 +180,15 @@ public class FlinkGenericCatalog extends AbstractCatalog {
     @Override
     public void createTable(ObjectPath tablePath, CatalogBaseTable table, 
boolean ignoreIfExists)
             throws TableAlreadyExistException, DatabaseNotExistException, 
CatalogException {
+        // create view
+        // TODO: By default, use the Paimon catalog to create the view. Create 
views with options is
+        // not supported.
+        if (table instanceof CatalogView) {
+            paimon.createTable(tablePath, table, ignoreIfExists);
+            return;
+        }
+
+        // create table
         String connector = table.getOptions().get(CONNECTOR.key());
         if (connector == null) {
             throw new RuntimeException(
diff --git 
a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/FlinkGenericCatalogITCase.java
 
b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/FlinkGenericCatalogITCase.java
index b82c5e579c..30d4953224 100644
--- 
a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/FlinkGenericCatalogITCase.java
+++ 
b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/FlinkGenericCatalogITCase.java
@@ -219,4 +219,15 @@ public class FlinkGenericCatalogITCase extends 
AbstractTestBaseJUnit4 {
         List<Row> result = sql("SELECT tag_name FROM paimon_t$tags");
         assertThat(result).contains(Row.of("tag_1"));
     }
+
+    @Test
+    public void testCreateView() {
+        sql("CREATE TABLE paimon_t ( " + "f0 INT, " + "f1 INT " + ") WITH 
('connector'='paimon')");
+        sql("INSERT INTO paimon_t VALUES (1, 1), (2, 2)");
+        assertThat(sql("SELECT * FROM paimon_t"))
+                .containsExactlyInAnyOrder(Row.of(1, 1), Row.of(2, 2));
+        sql("CREATE VIEW paimon_t_view AS SELECT * FROM paimon_t WHERE f0=1");
+
+        assertThat(sql("SELECT * FROM 
paimon_t_view")).containsExactlyInAnyOrder(Row.of(1, 1));
+    }
 }

Reply via email to