This is an automated email from the ASF dual-hosted git repository.
hongshun pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git
The following commit(s) were added to refs/heads/main by this push:
new b9b92d4aa [flink] Fix not compatible with Flink 1.18 and 1.19
(java.lang.NoSuchFieldError: MATERIALIZED_TABLE) (#2083)
b9b92d4aa is described below
commit b9b92d4aaa92d8610fe3756ad9188bfd2ec4eb73
Author: Pei Yu <[email protected]>
AuthorDate: Tue Dec 23 15:42:40 2025 +0800
[flink] Fix not compatible with Flink 1.18 and 1.19
(java.lang.NoSuchFieldError: MATERIALIZED_TABLE) (#2083)
---
.../flink/table/catalog/CatalogBaseTable.java | 115 ---------------------
.../fluss/flink/adapter/CatalogTableAdapter.java | 10 +-
.../flink/adapter/Flink118CatalogTableTest.java} | 27 ++---
.../flink/table/catalog/CatalogBaseTable.java | 115 ---------------------
.../fluss/flink/adapter/CatalogTableAdapter.java | 10 +-
.../flink/adapter/Flink119CatalogTableTest.java} | 27 ++---
.../fluss/flink/adapter/CatalogTableAdapter.java | 9 +-
.../apache/fluss/flink/utils/FlinkConversions.java | 3 +-
.../flink/adapter/FlinkCatalogTableTest.java} | 31 +++---
9 files changed, 62 insertions(+), 285 deletions(-)
diff --git
a/fluss-flink/fluss-flink-1.18/src/main/java/org/apache/flink/table/catalog/CatalogBaseTable.java
b/fluss-flink/fluss-flink-1.18/src/main/java/org/apache/flink/table/catalog/CatalogBaseTable.java
deleted file mode 100644
index 623449a52..000000000
---
a/fluss-flink/fluss-flink-1.18/src/main/java/org/apache/flink/table/catalog/CatalogBaseTable.java
+++ /dev/null
@@ -1,115 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.catalog;
-
-import org.apache.flink.annotation.PublicEvolving;
-import org.apache.flink.table.api.Schema;
-import org.apache.flink.table.api.TableSchema;
-import org.apache.flink.table.factories.DynamicTableFactory;
-
-import java.util.Map;
-import java.util.Optional;
-
-/**
- * A common parent that describes the <i>unresolved</i> metadata of a table or
view in a catalog.
- *
- * <p>Copy from Apache Flink, modified the TableKind enum to add
MATERIALIZED_TABLE.
- *
- * @see CatalogTable
- * @see CatalogView
- */
-@PublicEvolving
-public interface CatalogBaseTable {
-
- /** The kind of {@link CatalogBaseTable}. */
- @PublicEvolving
- enum TableKind {
- TABLE,
- MATERIALIZED_TABLE,
- VIEW
- }
-
- /** The kind of table this {@link CatalogBaseTable} describes. */
- TableKind getTableKind();
-
- /**
- * Returns a map of string-based options.
- *
- * <p>In case of {@link CatalogTable}, these options may determine the
kind of connector and its
- * configuration for accessing the data in the external system. See {@link
DynamicTableFactory}
- * for more information. If a {@link CatalogTable} should not be
serializable, an implementation
- * can simply throw a runtime exception in this method.
- */
- Map<String, String> getOptions();
-
- /**
- * @deprecated This method returns the deprecated {@link TableSchema}
class. The old class was a
- * hybrid of resolved and unresolved schema information. It has been
replaced by the new
- * {@link Schema} which is always unresolved and will be resolved by
the framework later.
- */
- @Deprecated
- default TableSchema getSchema() {
- return null;
- }
-
- /**
- * Returns the schema of the table or view.
- *
- * <p>The schema can reference objects from other catalogs and will be
resolved and validated by
- * the framework when accessing the table or view.
- *
- * @see ResolvedCatalogTable
- * @see ResolvedCatalogView
- */
- default Schema getUnresolvedSchema() {
- final TableSchema oldSchema = getSchema();
- if (oldSchema == null) {
- throw new UnsupportedOperationException(
- "A CatalogBaseTable must implement
getUnresolvedSchema().");
- }
- return oldSchema.toSchema();
- }
-
- /**
- * Get comment of the table or view.
- *
- * @return comment of the table/view.
- */
- String getComment();
-
- /**
- * Get a deep copy of the CatalogBaseTable instance.
- *
- * @return a copy of the CatalogBaseTable instance
- */
- CatalogBaseTable copy();
-
- /**
- * Get a brief description of the table or view.
- *
- * @return an optional short description of the table/view
- */
- Optional<String> getDescription();
-
- /**
- * Get a detailed description of the table or view.
- *
- * @return an optional long description of the table/view
- */
- Optional<String> getDetailedDescription();
-}
diff --git
a/fluss-flink/fluss-flink-1.18/src/main/java/org/apache/fluss/flink/adapter/CatalogTableAdapter.java
b/fluss-flink/fluss-flink-1.18/src/main/java/org/apache/fluss/flink/adapter/CatalogTableAdapter.java
index f41dc51e3..13a3bf229 100644
---
a/fluss-flink/fluss-flink-1.18/src/main/java/org/apache/fluss/flink/adapter/CatalogTableAdapter.java
+++
b/fluss-flink/fluss-flink-1.18/src/main/java/org/apache/fluss/flink/adapter/CatalogTableAdapter.java
@@ -18,6 +18,7 @@
package org.apache.fluss.flink.adapter;
import org.apache.flink.table.api.Schema;
+import org.apache.flink.table.catalog.CatalogBaseTable;
import org.apache.flink.table.catalog.CatalogTable;
import javax.annotation.Nullable;
@@ -26,8 +27,8 @@ import java.util.List;
import java.util.Map;
/**
- * A adapter for {@link CatalogTable} constructor. TODO: remove this class
when no longer support
- * flink 1.18 and 1.19.
+ * A adapter for {@link CatalogTable} constructor, and adapter
MATERIALIZED_TABLE for {@link
+ * CatalogBaseTable.TableKind} TODO: remove this class when no longer support
flink 1.18 and 1.19.
*/
public class CatalogTableAdapter {
public static CatalogTable toCatalogTable(
@@ -37,4 +38,9 @@ public class CatalogTableAdapter {
Map<String, String> options) {
return CatalogTable.of(schema, comment, partitionKeys, options);
}
+
+ public static boolean isMaterializedTable(CatalogBaseTable.TableKind
tableKind) {
+ // flink 1.18 not support MaterializedTable
+ return false;
+ }
}
diff --git
a/fluss-flink/fluss-flink-1.18/src/main/java/org/apache/fluss/flink/adapter/CatalogTableAdapter.java
b/fluss-flink/fluss-flink-1.18/src/test/java/org/apache/fluss/flink/adapter/Flink118CatalogTableTest.java
similarity index 58%
copy from
fluss-flink/fluss-flink-1.18/src/main/java/org/apache/fluss/flink/adapter/CatalogTableAdapter.java
copy to
fluss-flink/fluss-flink-1.18/src/test/java/org/apache/fluss/flink/adapter/Flink118CatalogTableTest.java
index f41dc51e3..74bd40a0a 100644
---
a/fluss-flink/fluss-flink-1.18/src/main/java/org/apache/fluss/flink/adapter/CatalogTableAdapter.java
+++
b/fluss-flink/fluss-flink-1.18/src/test/java/org/apache/fluss/flink/adapter/Flink118CatalogTableTest.java
@@ -17,24 +17,19 @@
package org.apache.fluss.flink.adapter;
-import org.apache.flink.table.api.Schema;
-import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.CatalogBaseTable;
+import org.junit.jupiter.api.Test;
-import javax.annotation.Nullable;
+import static org.assertj.core.api.Assertions.assertThat;
-import java.util.List;
-import java.util.Map;
+/** Test for {@link CatalogTableAdapter} in flink 1.18. */
+public class Flink118CatalogTableTest extends FlinkCatalogTableTest {
-/**
- * A adapter for {@link CatalogTable} constructor. TODO: remove this class
when no longer support
- * flink 1.18 and 1.19.
- */
-public class CatalogTableAdapter {
- public static CatalogTable toCatalogTable(
- Schema schema,
- @Nullable String comment,
- List<String> partitionKeys,
- Map<String, String> options) {
- return CatalogTable.of(schema, comment, partitionKeys, options);
+ @Test
+ public void testIsMaterializedTable() {
+
assertThat(CatalogTableAdapter.isMaterializedTable(CatalogBaseTable.TableKind.VIEW))
+ .isEqualTo(false);
+
assertThat(CatalogTableAdapter.isMaterializedTable(CatalogBaseTable.TableKind.TABLE))
+ .isEqualTo(false);
}
}
diff --git
a/fluss-flink/fluss-flink-1.19/src/main/java/org/apache/flink/table/catalog/CatalogBaseTable.java
b/fluss-flink/fluss-flink-1.19/src/main/java/org/apache/flink/table/catalog/CatalogBaseTable.java
deleted file mode 100644
index 84f9f0c43..000000000
---
a/fluss-flink/fluss-flink-1.19/src/main/java/org/apache/flink/table/catalog/CatalogBaseTable.java
+++ /dev/null
@@ -1,115 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.catalog;
-
-import org.apache.flink.annotation.PublicEvolving;
-import org.apache.flink.table.api.Schema;
-import org.apache.flink.table.api.TableSchema;
-import org.apache.flink.table.factories.DynamicTableFactory;
-
-import java.util.Map;
-import java.util.Optional;
-
-/**
- * A common parent that describes the <i>unresolved</i> metadata of a table or
view in a catalog.
- *
- * <p>p> Copy from Apache Flink, modified the TableKind enum to add
MATERIALIZED_TABLE.
- *
- * @see CatalogTable
- * @see CatalogView
- */
-@PublicEvolving
-public interface CatalogBaseTable {
-
- /** The kind of {@link CatalogBaseTable}. */
- @PublicEvolving
- enum TableKind {
- TABLE,
- MATERIALIZED_TABLE,
- VIEW
- }
-
- /** The kind of table this {@link CatalogBaseTable} describes. */
- TableKind getTableKind();
-
- /**
- * Returns a map of string-based options.
- *
- * <p>In case of {@link CatalogTable}, these options may determine the
kind of connector and its
- * configuration for accessing the data in the external system. See {@link
DynamicTableFactory}
- * for more information. If a {@link CatalogTable} should not be
serializable, an implementation
- * can simply throw a runtime exception in this method.
- */
- Map<String, String> getOptions();
-
- /**
- * @deprecated This method returns the deprecated {@link TableSchema}
class. The old class was a
- * hybrid of resolved and unresolved schema information. It has been
replaced by the new
- * {@link Schema} which is always unresolved and will be resolved by
the framework later.
- */
- @Deprecated
- default TableSchema getSchema() {
- return null;
- }
-
- /**
- * Returns the schema of the table or view.
- *
- * <p>The schema can reference objects from other catalogs and will be
resolved and validated by
- * the framework when accessing the table or view.
- *
- * @see ResolvedCatalogTable
- * @see ResolvedCatalogView
- */
- default Schema getUnresolvedSchema() {
- final TableSchema oldSchema = getSchema();
- if (oldSchema == null) {
- throw new UnsupportedOperationException(
- "A CatalogBaseTable must implement
getUnresolvedSchema().");
- }
- return oldSchema.toSchema();
- }
-
- /**
- * Get comment of the table or view.
- *
- * @return comment of the table/view.
- */
- String getComment();
-
- /**
- * Get a deep copy of the CatalogBaseTable instance.
- *
- * @return a copy of the CatalogBaseTable instance
- */
- CatalogBaseTable copy();
-
- /**
- * Get a brief description of the table or view.
- *
- * @return an optional short description of the table/view
- */
- Optional<String> getDescription();
-
- /**
- * Get a detailed description of the table or view.
- *
- * @return an optional long description of the table/view
- */
- Optional<String> getDetailedDescription();
-}
diff --git
a/fluss-flink/fluss-flink-1.19/src/main/java/org/apache/fluss/flink/adapter/CatalogTableAdapter.java
b/fluss-flink/fluss-flink-1.19/src/main/java/org/apache/fluss/flink/adapter/CatalogTableAdapter.java
index f41dc51e3..f70a21ceb 100644
---
a/fluss-flink/fluss-flink-1.19/src/main/java/org/apache/fluss/flink/adapter/CatalogTableAdapter.java
+++
b/fluss-flink/fluss-flink-1.19/src/main/java/org/apache/fluss/flink/adapter/CatalogTableAdapter.java
@@ -18,6 +18,7 @@
package org.apache.fluss.flink.adapter;
import org.apache.flink.table.api.Schema;
+import org.apache.flink.table.catalog.CatalogBaseTable;
import org.apache.flink.table.catalog.CatalogTable;
import javax.annotation.Nullable;
@@ -26,8 +27,8 @@ import java.util.List;
import java.util.Map;
/**
- * A adapter for {@link CatalogTable} constructor. TODO: remove this class
when no longer support
- * flink 1.18 and 1.19.
+ * A adapter for {@link CatalogTable} constructor, and adapter
MATERIALIZED_TABLE for {@link
+ * CatalogBaseTable.TableKind} TODO: remove this class when no longer support
flink 1.18 and 1.19.
*/
public class CatalogTableAdapter {
public static CatalogTable toCatalogTable(
@@ -37,4 +38,9 @@ public class CatalogTableAdapter {
Map<String, String> options) {
return CatalogTable.of(schema, comment, partitionKeys, options);
}
+
+ public static boolean isMaterializedTable(CatalogBaseTable.TableKind
tableKind) {
+ // flink 1.19 not support MaterializedTable
+ return false;
+ }
}
diff --git
a/fluss-flink/fluss-flink-1.18/src/main/java/org/apache/fluss/flink/adapter/CatalogTableAdapter.java
b/fluss-flink/fluss-flink-1.19/src/test/java/org/apache/fluss/flink/adapter/Flink119CatalogTableTest.java
similarity index 58%
copy from
fluss-flink/fluss-flink-1.18/src/main/java/org/apache/fluss/flink/adapter/CatalogTableAdapter.java
copy to
fluss-flink/fluss-flink-1.19/src/test/java/org/apache/fluss/flink/adapter/Flink119CatalogTableTest.java
index f41dc51e3..dbe6a15bd 100644
---
a/fluss-flink/fluss-flink-1.18/src/main/java/org/apache/fluss/flink/adapter/CatalogTableAdapter.java
+++
b/fluss-flink/fluss-flink-1.19/src/test/java/org/apache/fluss/flink/adapter/Flink119CatalogTableTest.java
@@ -17,24 +17,19 @@
package org.apache.fluss.flink.adapter;
-import org.apache.flink.table.api.Schema;
-import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.CatalogBaseTable;
+import org.junit.jupiter.api.Test;
-import javax.annotation.Nullable;
+import static org.assertj.core.api.Assertions.assertThat;
-import java.util.List;
-import java.util.Map;
+/** Test for {@link CatalogTableAdapter} in flink 1.19. */
+public class Flink119CatalogTableTest extends FlinkCatalogTableTest {
-/**
- * A adapter for {@link CatalogTable} constructor. TODO: remove this class
when no longer support
- * flink 1.18 and 1.19.
- */
-public class CatalogTableAdapter {
- public static CatalogTable toCatalogTable(
- Schema schema,
- @Nullable String comment,
- List<String> partitionKeys,
- Map<String, String> options) {
- return CatalogTable.of(schema, comment, partitionKeys, options);
+ @Test
+ public void testIsMaterializedTable() {
+
assertThat(CatalogTableAdapter.isMaterializedTable(CatalogBaseTable.TableKind.VIEW))
+ .isEqualTo(false);
+
assertThat(CatalogTableAdapter.isMaterializedTable(CatalogBaseTable.TableKind.TABLE))
+ .isEqualTo(false);
}
}
diff --git
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/adapter/CatalogTableAdapter.java
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/adapter/CatalogTableAdapter.java
index 8485c2f21..e9e610ee5 100644
---
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/adapter/CatalogTableAdapter.java
+++
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/adapter/CatalogTableAdapter.java
@@ -18,6 +18,7 @@
package org.apache.fluss.flink.adapter;
import org.apache.flink.table.api.Schema;
+import org.apache.flink.table.catalog.CatalogBaseTable;
import org.apache.flink.table.catalog.CatalogTable;
import javax.annotation.Nullable;
@@ -26,8 +27,8 @@ import java.util.List;
import java.util.Map;
/**
- * A adapter for {@link CatalogTable} constructor. TODO: remove this class
when no longer support
- * flink 1.18 and 1.19.
+ * A adapter for {@link CatalogTable} constructor, and adapter
MATERIALIZED_TABLE for {@link
+ * CatalogBaseTable.TableKind} TODO: remove this class when no longer support
flink 1.18 and 1.19.
*/
public class CatalogTableAdapter {
public static CatalogTable toCatalogTable(
@@ -42,4 +43,8 @@ public class CatalogTableAdapter {
.options(options)
.build();
}
+
+ public static boolean isMaterializedTable(CatalogBaseTable.TableKind
tableKind) {
+ return CatalogBaseTable.TableKind.MATERIALIZED_TABLE == tableKind;
+ }
}
diff --git
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/FlinkConversions.java
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/FlinkConversions.java
index f8a05573c..c571df496 100644
---
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/FlinkConversions.java
+++
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/FlinkConversions.java
@@ -21,6 +21,7 @@ import org.apache.fluss.annotation.VisibleForTesting;
import org.apache.fluss.config.ConfigOption;
import org.apache.fluss.config.MemorySize;
import org.apache.fluss.config.Password;
+import org.apache.fluss.flink.adapter.CatalogTableAdapter;
import org.apache.fluss.flink.catalog.FlinkCatalogFactory;
import org.apache.fluss.metadata.DatabaseDescriptor;
import org.apache.fluss.metadata.Schema;
@@ -242,7 +243,7 @@ public class FlinkConversions {
customProperties,
catalogBaseTable.getResolvedSchema().getWatermarkSpecs());
// Set materialized table flags to fluss table custom properties
- if (CatalogBaseTable.TableKind.MATERIALIZED_TABLE == tableKind) {
+ if (CatalogTableAdapter.isMaterializedTable(tableKind)) {
CatalogMaterializedTable.RefreshMode refreshMode =
((ResolvedCatalogMaterializedTable)
catalogBaseTable).getRefreshMode();
if (refreshMode == CatalogMaterializedTable.RefreshMode.FULL) {
diff --git
a/fluss-flink/fluss-flink-1.18/src/main/java/org/apache/fluss/flink/adapter/CatalogTableAdapter.java
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/adapter/FlinkCatalogTableTest.java
similarity index 53%
copy from
fluss-flink/fluss-flink-1.18/src/main/java/org/apache/fluss/flink/adapter/CatalogTableAdapter.java
copy to
fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/adapter/FlinkCatalogTableTest.java
index f41dc51e3..bc997f322 100644
---
a/fluss-flink/fluss-flink-1.18/src/main/java/org/apache/fluss/flink/adapter/CatalogTableAdapter.java
+++
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/adapter/FlinkCatalogTableTest.java
@@ -17,24 +17,23 @@
package org.apache.fluss.flink.adapter;
-import org.apache.flink.table.api.Schema;
-import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.CatalogBaseTable;
+import org.junit.jupiter.api.Test;
-import javax.annotation.Nullable;
+import static org.assertj.core.api.Assertions.assertThat;
-import java.util.List;
-import java.util.Map;
+/** Test for {@link CatalogTableAdapter}. */
+public class FlinkCatalogTableTest {
-/**
- * A adapter for {@link CatalogTable} constructor. TODO: remove this class
when no longer support
- * flink 1.18 and 1.19.
- */
-public class CatalogTableAdapter {
- public static CatalogTable toCatalogTable(
- Schema schema,
- @Nullable String comment,
- List<String> partitionKeys,
- Map<String, String> options) {
- return CatalogTable.of(schema, comment, partitionKeys, options);
+ @Test
+ public void testIsMaterializedTable() {
+ assertThat(
+ CatalogTableAdapter.isMaterializedTable(
+ CatalogBaseTable.TableKind.MATERIALIZED_TABLE))
+ .isEqualTo(true);
+
assertThat(CatalogTableAdapter.isMaterializedTable(CatalogBaseTable.TableKind.VIEW))
+ .isEqualTo(false);
+
assertThat(CatalogTableAdapter.isMaterializedTable(CatalogBaseTable.TableKind.TABLE))
+ .isEqualTo(false);
}
}