This is an automated email from the ASF dual-hosted git repository.

hongshun pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git


The following commit(s) were added to refs/heads/main by this push:
     new b9b92d4aa [flink] Fix not compatible with Flink 1.18 and 1.19 
(java.lang.NoSuchFieldError: MATERIALIZED_TABLE) (#2083)
b9b92d4aa is described below

commit b9b92d4aaa92d8610fe3756ad9188bfd2ec4eb73
Author: Pei Yu <[email protected]>
AuthorDate: Tue Dec 23 15:42:40 2025 +0800

    [flink] Fix not compatible with Flink 1.18 and 1.19 
(java.lang.NoSuchFieldError: MATERIALIZED_TABLE) (#2083)
---
 .../flink/table/catalog/CatalogBaseTable.java      | 115 ---------------------
 .../fluss/flink/adapter/CatalogTableAdapter.java   |  10 +-
 .../flink/adapter/Flink118CatalogTableTest.java}   |  27 ++---
 .../flink/table/catalog/CatalogBaseTable.java      | 115 ---------------------
 .../fluss/flink/adapter/CatalogTableAdapter.java   |  10 +-
 .../flink/adapter/Flink119CatalogTableTest.java}   |  27 ++---
 .../fluss/flink/adapter/CatalogTableAdapter.java   |   9 +-
 .../apache/fluss/flink/utils/FlinkConversions.java |   3 +-
 .../flink/adapter/FlinkCatalogTableTest.java}      |  31 +++---
 9 files changed, 62 insertions(+), 285 deletions(-)

diff --git 
a/fluss-flink/fluss-flink-1.18/src/main/java/org/apache/flink/table/catalog/CatalogBaseTable.java
 
b/fluss-flink/fluss-flink-1.18/src/main/java/org/apache/flink/table/catalog/CatalogBaseTable.java
deleted file mode 100644
index 623449a52..000000000
--- 
a/fluss-flink/fluss-flink-1.18/src/main/java/org/apache/flink/table/catalog/CatalogBaseTable.java
+++ /dev/null
@@ -1,115 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.catalog;
-
-import org.apache.flink.annotation.PublicEvolving;
-import org.apache.flink.table.api.Schema;
-import org.apache.flink.table.api.TableSchema;
-import org.apache.flink.table.factories.DynamicTableFactory;
-
-import java.util.Map;
-import java.util.Optional;
-
-/**
- * A common parent that describes the <i>unresolved</i> metadata of a table or 
view in a catalog.
- *
- * <p>Copy from Apache Flink, modified the TableKind enum to add 
MATERIALIZED_TABLE.
- *
- * @see CatalogTable
- * @see CatalogView
- */
-@PublicEvolving
-public interface CatalogBaseTable {
-
-    /** The kind of {@link CatalogBaseTable}. */
-    @PublicEvolving
-    enum TableKind {
-        TABLE,
-        MATERIALIZED_TABLE,
-        VIEW
-    }
-
-    /** The kind of table this {@link CatalogBaseTable} describes. */
-    TableKind getTableKind();
-
-    /**
-     * Returns a map of string-based options.
-     *
-     * <p>In case of {@link CatalogTable}, these options may determine the 
kind of connector and its
-     * configuration for accessing the data in the external system. See {@link 
DynamicTableFactory}
-     * for more information. If a {@link CatalogTable} should not be 
serializable, an implementation
-     * can simply throw a runtime exception in this method.
-     */
-    Map<String, String> getOptions();
-
-    /**
-     * @deprecated This method returns the deprecated {@link TableSchema} 
class. The old class was a
-     *     hybrid of resolved and unresolved schema information. It has been 
replaced by the new
-     *     {@link Schema} which is always unresolved and will be resolved by 
the framework later.
-     */
-    @Deprecated
-    default TableSchema getSchema() {
-        return null;
-    }
-
-    /**
-     * Returns the schema of the table or view.
-     *
-     * <p>The schema can reference objects from other catalogs and will be 
resolved and validated by
-     * the framework when accessing the table or view.
-     *
-     * @see ResolvedCatalogTable
-     * @see ResolvedCatalogView
-     */
-    default Schema getUnresolvedSchema() {
-        final TableSchema oldSchema = getSchema();
-        if (oldSchema == null) {
-            throw new UnsupportedOperationException(
-                    "A CatalogBaseTable must implement 
getUnresolvedSchema().");
-        }
-        return oldSchema.toSchema();
-    }
-
-    /**
-     * Get comment of the table or view.
-     *
-     * @return comment of the table/view.
-     */
-    String getComment();
-
-    /**
-     * Get a deep copy of the CatalogBaseTable instance.
-     *
-     * @return a copy of the CatalogBaseTable instance
-     */
-    CatalogBaseTable copy();
-
-    /**
-     * Get a brief description of the table or view.
-     *
-     * @return an optional short description of the table/view
-     */
-    Optional<String> getDescription();
-
-    /**
-     * Get a detailed description of the table or view.
-     *
-     * @return an optional long description of the table/view
-     */
-    Optional<String> getDetailedDescription();
-}
diff --git 
a/fluss-flink/fluss-flink-1.18/src/main/java/org/apache/fluss/flink/adapter/CatalogTableAdapter.java
 
b/fluss-flink/fluss-flink-1.18/src/main/java/org/apache/fluss/flink/adapter/CatalogTableAdapter.java
index f41dc51e3..13a3bf229 100644
--- 
a/fluss-flink/fluss-flink-1.18/src/main/java/org/apache/fluss/flink/adapter/CatalogTableAdapter.java
+++ 
b/fluss-flink/fluss-flink-1.18/src/main/java/org/apache/fluss/flink/adapter/CatalogTableAdapter.java
@@ -18,6 +18,7 @@
 package org.apache.fluss.flink.adapter;
 
 import org.apache.flink.table.api.Schema;
+import org.apache.flink.table.catalog.CatalogBaseTable;
 import org.apache.flink.table.catalog.CatalogTable;
 
 import javax.annotation.Nullable;
@@ -26,8 +27,8 @@ import java.util.List;
 import java.util.Map;
 
 /**
- * A adapter for {@link CatalogTable} constructor. TODO: remove this class 
when no longer support
- * flink 1.18 and 1.19.
+ * A adapter for {@link CatalogTable} constructor, and adapter 
MATERIALIZED_TABLE for {@link
+ * CatalogBaseTable.TableKind} TODO: remove this class when no longer support 
flink 1.18 and 1.19.
  */
 public class CatalogTableAdapter {
     public static CatalogTable toCatalogTable(
@@ -37,4 +38,9 @@ public class CatalogTableAdapter {
             Map<String, String> options) {
         return CatalogTable.of(schema, comment, partitionKeys, options);
     }
+
+    public static boolean isMaterializedTable(CatalogBaseTable.TableKind 
tableKind) {
+        // flink 1.18 not support MaterializedTable
+        return false;
+    }
 }
diff --git 
a/fluss-flink/fluss-flink-1.18/src/main/java/org/apache/fluss/flink/adapter/CatalogTableAdapter.java
 
b/fluss-flink/fluss-flink-1.18/src/test/java/org/apache/fluss/flink/adapter/Flink118CatalogTableTest.java
similarity index 58%
copy from 
fluss-flink/fluss-flink-1.18/src/main/java/org/apache/fluss/flink/adapter/CatalogTableAdapter.java
copy to 
fluss-flink/fluss-flink-1.18/src/test/java/org/apache/fluss/flink/adapter/Flink118CatalogTableTest.java
index f41dc51e3..74bd40a0a 100644
--- 
a/fluss-flink/fluss-flink-1.18/src/main/java/org/apache/fluss/flink/adapter/CatalogTableAdapter.java
+++ 
b/fluss-flink/fluss-flink-1.18/src/test/java/org/apache/fluss/flink/adapter/Flink118CatalogTableTest.java
@@ -17,24 +17,19 @@
 
 package org.apache.fluss.flink.adapter;
 
-import org.apache.flink.table.api.Schema;
-import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.CatalogBaseTable;
+import org.junit.jupiter.api.Test;
 
-import javax.annotation.Nullable;
+import static org.assertj.core.api.Assertions.assertThat;
 
-import java.util.List;
-import java.util.Map;
+/** Test for {@link CatalogTableAdapter} in flink 1.18. */
+public class Flink118CatalogTableTest extends FlinkCatalogTableTest {
 
-/**
- * A adapter for {@link CatalogTable} constructor. TODO: remove this class 
when no longer support
- * flink 1.18 and 1.19.
- */
-public class CatalogTableAdapter {
-    public static CatalogTable toCatalogTable(
-            Schema schema,
-            @Nullable String comment,
-            List<String> partitionKeys,
-            Map<String, String> options) {
-        return CatalogTable.of(schema, comment, partitionKeys, options);
+    @Test
+    public void testIsMaterializedTable() {
+        
assertThat(CatalogTableAdapter.isMaterializedTable(CatalogBaseTable.TableKind.VIEW))
+                .isEqualTo(false);
+        
assertThat(CatalogTableAdapter.isMaterializedTable(CatalogBaseTable.TableKind.TABLE))
+                .isEqualTo(false);
     }
 }
diff --git 
a/fluss-flink/fluss-flink-1.19/src/main/java/org/apache/flink/table/catalog/CatalogBaseTable.java
 
b/fluss-flink/fluss-flink-1.19/src/main/java/org/apache/flink/table/catalog/CatalogBaseTable.java
deleted file mode 100644
index 84f9f0c43..000000000
--- 
a/fluss-flink/fluss-flink-1.19/src/main/java/org/apache/flink/table/catalog/CatalogBaseTable.java
+++ /dev/null
@@ -1,115 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.catalog;
-
-import org.apache.flink.annotation.PublicEvolving;
-import org.apache.flink.table.api.Schema;
-import org.apache.flink.table.api.TableSchema;
-import org.apache.flink.table.factories.DynamicTableFactory;
-
-import java.util.Map;
-import java.util.Optional;
-
-/**
- * A common parent that describes the <i>unresolved</i> metadata of a table or 
view in a catalog.
- *
- * <p>p> Copy from Apache Flink, modified the TableKind enum to add 
MATERIALIZED_TABLE.
- *
- * @see CatalogTable
- * @see CatalogView
- */
-@PublicEvolving
-public interface CatalogBaseTable {
-
-    /** The kind of {@link CatalogBaseTable}. */
-    @PublicEvolving
-    enum TableKind {
-        TABLE,
-        MATERIALIZED_TABLE,
-        VIEW
-    }
-
-    /** The kind of table this {@link CatalogBaseTable} describes. */
-    TableKind getTableKind();
-
-    /**
-     * Returns a map of string-based options.
-     *
-     * <p>In case of {@link CatalogTable}, these options may determine the 
kind of connector and its
-     * configuration for accessing the data in the external system. See {@link 
DynamicTableFactory}
-     * for more information. If a {@link CatalogTable} should not be 
serializable, an implementation
-     * can simply throw a runtime exception in this method.
-     */
-    Map<String, String> getOptions();
-
-    /**
-     * @deprecated This method returns the deprecated {@link TableSchema} 
class. The old class was a
-     *     hybrid of resolved and unresolved schema information. It has been 
replaced by the new
-     *     {@link Schema} which is always unresolved and will be resolved by 
the framework later.
-     */
-    @Deprecated
-    default TableSchema getSchema() {
-        return null;
-    }
-
-    /**
-     * Returns the schema of the table or view.
-     *
-     * <p>The schema can reference objects from other catalogs and will be 
resolved and validated by
-     * the framework when accessing the table or view.
-     *
-     * @see ResolvedCatalogTable
-     * @see ResolvedCatalogView
-     */
-    default Schema getUnresolvedSchema() {
-        final TableSchema oldSchema = getSchema();
-        if (oldSchema == null) {
-            throw new UnsupportedOperationException(
-                    "A CatalogBaseTable must implement 
getUnresolvedSchema().");
-        }
-        return oldSchema.toSchema();
-    }
-
-    /**
-     * Get comment of the table or view.
-     *
-     * @return comment of the table/view.
-     */
-    String getComment();
-
-    /**
-     * Get a deep copy of the CatalogBaseTable instance.
-     *
-     * @return a copy of the CatalogBaseTable instance
-     */
-    CatalogBaseTable copy();
-
-    /**
-     * Get a brief description of the table or view.
-     *
-     * @return an optional short description of the table/view
-     */
-    Optional<String> getDescription();
-
-    /**
-     * Get a detailed description of the table or view.
-     *
-     * @return an optional long description of the table/view
-     */
-    Optional<String> getDetailedDescription();
-}
diff --git 
a/fluss-flink/fluss-flink-1.19/src/main/java/org/apache/fluss/flink/adapter/CatalogTableAdapter.java
 
b/fluss-flink/fluss-flink-1.19/src/main/java/org/apache/fluss/flink/adapter/CatalogTableAdapter.java
index f41dc51e3..f70a21ceb 100644
--- 
a/fluss-flink/fluss-flink-1.19/src/main/java/org/apache/fluss/flink/adapter/CatalogTableAdapter.java
+++ 
b/fluss-flink/fluss-flink-1.19/src/main/java/org/apache/fluss/flink/adapter/CatalogTableAdapter.java
@@ -18,6 +18,7 @@
 package org.apache.fluss.flink.adapter;
 
 import org.apache.flink.table.api.Schema;
+import org.apache.flink.table.catalog.CatalogBaseTable;
 import org.apache.flink.table.catalog.CatalogTable;
 
 import javax.annotation.Nullable;
@@ -26,8 +27,8 @@ import java.util.List;
 import java.util.Map;
 
 /**
- * A adapter for {@link CatalogTable} constructor. TODO: remove this class 
when no longer support
- * flink 1.18 and 1.19.
+ * A adapter for {@link CatalogTable} constructor, and adapter 
MATERIALIZED_TABLE for {@link
+ * CatalogBaseTable.TableKind} TODO: remove this class when no longer support 
flink 1.18 and 1.19.
  */
 public class CatalogTableAdapter {
     public static CatalogTable toCatalogTable(
@@ -37,4 +38,9 @@ public class CatalogTableAdapter {
             Map<String, String> options) {
         return CatalogTable.of(schema, comment, partitionKeys, options);
     }
+
+    public static boolean isMaterializedTable(CatalogBaseTable.TableKind 
tableKind) {
+        // flink 1.19 not support MaterializedTable
+        return false;
+    }
 }
diff --git 
a/fluss-flink/fluss-flink-1.18/src/main/java/org/apache/fluss/flink/adapter/CatalogTableAdapter.java
 
b/fluss-flink/fluss-flink-1.19/src/test/java/org/apache/fluss/flink/adapter/Flink119CatalogTableTest.java
similarity index 58%
copy from 
fluss-flink/fluss-flink-1.18/src/main/java/org/apache/fluss/flink/adapter/CatalogTableAdapter.java
copy to 
fluss-flink/fluss-flink-1.19/src/test/java/org/apache/fluss/flink/adapter/Flink119CatalogTableTest.java
index f41dc51e3..dbe6a15bd 100644
--- 
a/fluss-flink/fluss-flink-1.18/src/main/java/org/apache/fluss/flink/adapter/CatalogTableAdapter.java
+++ 
b/fluss-flink/fluss-flink-1.19/src/test/java/org/apache/fluss/flink/adapter/Flink119CatalogTableTest.java
@@ -17,24 +17,19 @@
 
 package org.apache.fluss.flink.adapter;
 
-import org.apache.flink.table.api.Schema;
-import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.CatalogBaseTable;
+import org.junit.jupiter.api.Test;
 
-import javax.annotation.Nullable;
+import static org.assertj.core.api.Assertions.assertThat;
 
-import java.util.List;
-import java.util.Map;
+/** Test for {@link CatalogTableAdapter} in flink 1.19. */
+public class Flink119CatalogTableTest extends FlinkCatalogTableTest {
 
-/**
- * A adapter for {@link CatalogTable} constructor. TODO: remove this class 
when no longer support
- * flink 1.18 and 1.19.
- */
-public class CatalogTableAdapter {
-    public static CatalogTable toCatalogTable(
-            Schema schema,
-            @Nullable String comment,
-            List<String> partitionKeys,
-            Map<String, String> options) {
-        return CatalogTable.of(schema, comment, partitionKeys, options);
+    @Test
+    public void testIsMaterializedTable() {
+        
assertThat(CatalogTableAdapter.isMaterializedTable(CatalogBaseTable.TableKind.VIEW))
+                .isEqualTo(false);
+        
assertThat(CatalogTableAdapter.isMaterializedTable(CatalogBaseTable.TableKind.TABLE))
+                .isEqualTo(false);
     }
 }
diff --git 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/adapter/CatalogTableAdapter.java
 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/adapter/CatalogTableAdapter.java
index 8485c2f21..e9e610ee5 100644
--- 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/adapter/CatalogTableAdapter.java
+++ 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/adapter/CatalogTableAdapter.java
@@ -18,6 +18,7 @@
 package org.apache.fluss.flink.adapter;
 
 import org.apache.flink.table.api.Schema;
+import org.apache.flink.table.catalog.CatalogBaseTable;
 import org.apache.flink.table.catalog.CatalogTable;
 
 import javax.annotation.Nullable;
@@ -26,8 +27,8 @@ import java.util.List;
 import java.util.Map;
 
 /**
- * A adapter for {@link CatalogTable} constructor. TODO: remove this class 
when no longer support
- * flink 1.18 and 1.19.
+ * A adapter for {@link CatalogTable} constructor, and adapter 
MATERIALIZED_TABLE for {@link
+ * CatalogBaseTable.TableKind} TODO: remove this class when no longer support 
flink 1.18 and 1.19.
  */
 public class CatalogTableAdapter {
     public static CatalogTable toCatalogTable(
@@ -42,4 +43,8 @@ public class CatalogTableAdapter {
                 .options(options)
                 .build();
     }
+
+    public static boolean isMaterializedTable(CatalogBaseTable.TableKind 
tableKind) {
+        return CatalogBaseTable.TableKind.MATERIALIZED_TABLE == tableKind;
+    }
 }
diff --git 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/FlinkConversions.java
 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/FlinkConversions.java
index f8a05573c..c571df496 100644
--- 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/FlinkConversions.java
+++ 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/FlinkConversions.java
@@ -21,6 +21,7 @@ import org.apache.fluss.annotation.VisibleForTesting;
 import org.apache.fluss.config.ConfigOption;
 import org.apache.fluss.config.MemorySize;
 import org.apache.fluss.config.Password;
+import org.apache.fluss.flink.adapter.CatalogTableAdapter;
 import org.apache.fluss.flink.catalog.FlinkCatalogFactory;
 import org.apache.fluss.metadata.DatabaseDescriptor;
 import org.apache.fluss.metadata.Schema;
@@ -242,7 +243,7 @@ public class FlinkConversions {
                 customProperties, 
catalogBaseTable.getResolvedSchema().getWatermarkSpecs());
 
         // Set materialized table flags to fluss table custom properties
-        if (CatalogBaseTable.TableKind.MATERIALIZED_TABLE == tableKind) {
+        if (CatalogTableAdapter.isMaterializedTable(tableKind)) {
             CatalogMaterializedTable.RefreshMode refreshMode =
                     ((ResolvedCatalogMaterializedTable) 
catalogBaseTable).getRefreshMode();
             if (refreshMode == CatalogMaterializedTable.RefreshMode.FULL) {
diff --git 
a/fluss-flink/fluss-flink-1.18/src/main/java/org/apache/fluss/flink/adapter/CatalogTableAdapter.java
 
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/adapter/FlinkCatalogTableTest.java
similarity index 53%
copy from 
fluss-flink/fluss-flink-1.18/src/main/java/org/apache/fluss/flink/adapter/CatalogTableAdapter.java
copy to 
fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/adapter/FlinkCatalogTableTest.java
index f41dc51e3..bc997f322 100644
--- 
a/fluss-flink/fluss-flink-1.18/src/main/java/org/apache/fluss/flink/adapter/CatalogTableAdapter.java
+++ 
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/adapter/FlinkCatalogTableTest.java
@@ -17,24 +17,23 @@
 
 package org.apache.fluss.flink.adapter;
 
-import org.apache.flink.table.api.Schema;
-import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.CatalogBaseTable;
+import org.junit.jupiter.api.Test;
 
-import javax.annotation.Nullable;
+import static org.assertj.core.api.Assertions.assertThat;
 
-import java.util.List;
-import java.util.Map;
+/** Test for {@link CatalogTableAdapter}. */
+public class FlinkCatalogTableTest {
 
-/**
- * A adapter for {@link CatalogTable} constructor. TODO: remove this class 
when no longer support
- * flink 1.18 and 1.19.
- */
-public class CatalogTableAdapter {
-    public static CatalogTable toCatalogTable(
-            Schema schema,
-            @Nullable String comment,
-            List<String> partitionKeys,
-            Map<String, String> options) {
-        return CatalogTable.of(schema, comment, partitionKeys, options);
+    @Test
+    public void testIsMaterializedTable() {
+        assertThat(
+                        CatalogTableAdapter.isMaterializedTable(
+                                CatalogBaseTable.TableKind.MATERIALIZED_TABLE))
+                .isEqualTo(true);
+        
assertThat(CatalogTableAdapter.isMaterializedTable(CatalogBaseTable.TableKind.VIEW))
+                .isEqualTo(false);
+        
assertThat(CatalogTableAdapter.isMaterializedTable(CatalogBaseTable.TableKind.TABLE))
+                .isEqualTo(false);
     }
 }

Reply via email to