This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch release-0.9
in repository https://gitbox.apache.org/repos/asf/fluss.git

commit 86f704f4711920934736ff2a00a1833e77abb6dd
Author: yuxia Luo <[email protected]>
AuthorDate: Thu Feb 12 17:45:06 2026 +0800

    [lake/flink] Fix lake tiering doesn't work in flink 2.2 (#2657)
---
 .../flink/adapter/TypeInformationAdapter.java      | 64 +++++++++++++++++++
 .../fluss/flink/tiering/Flink18TieringITCase.java  | 22 +++++++
 .../fluss/flink/tiering/Flink19TieringITCase.java  | 22 +++++++
 .../fluss/flink/tiering/Flink20TieringITCase.java  | 22 +++++++
 .../flink/adapter/TypeInformationAdapter.java      | 64 +++++++++++++++++++
 .../fluss/flink/tiering/Flink22TieringITCase.java  | 22 +++++++
 .../flink/adapter/TypeInformationAdapter.java      | 73 ++++++++++++++++++++++
 .../shuffle/StatisticsOrRecordTypeInformation.java | 20 +++---
 .../committer/CommittableMessageTypeInfo.java      |  8 +--
 .../source/TableBucketWriteResultTypeInfo.java     |  8 +--
 .../apache/fluss/flink/tiering/TieringITCase.java  |  2 +-
 11 files changed, 306 insertions(+), 21 deletions(-)

diff --git 
a/fluss-flink/fluss-flink-1.18/src/main/java/org/apache/fluss/flink/adapter/TypeInformationAdapter.java
 
b/fluss-flink/fluss-flink-1.18/src/main/java/org/apache/fluss/flink/adapter/TypeInformationAdapter.java
new file mode 100644
index 000000000..1efc177bd
--- /dev/null
+++ 
b/fluss-flink/fluss-flink-1.18/src/main/java/org/apache/fluss/flink/adapter/TypeInformationAdapter.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.flink.adapter;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+
+/**
+ * Flink 1.18 variant of the type information adapter.
+ *
+ * <p>In Flink 1.18, {@link TypeInformation} only declares {@code
+ * createSerializer(ExecutionConfig)}. This adapter implements that method and 
delegates to {@link
+ * #createSerializer(TypeSerializerCreator)} with a creator that forwards the 
config, so subclasses
+ * (e.g. in fluss-flink-common) can obtain the inner serializer and wrap it 
without touching
+ * ExecutionConfig here.
+ *
+ * <p>When building for Flink 1.18, this class is used instead of the common 
adapter so that
+ * SerializerConfig is not required on the classpath. See fluss-flink-common's
+ * TypeInformationAdapter for the version that implements both 
createSerializer(SerializerConfig)
+ * and createSerializer(ExecutionConfig).
+ *
+ * @param <T> the type described by this type information
+ */
+public abstract class TypeInformationAdapter<T> extends TypeInformation<T> {
+
+    @Override
+    public TypeSerializer<T> createSerializer(ExecutionConfig config) {
+        return createSerializer(typeInfo -> typeInfo.createSerializer(config));
+    }
+
+    /**
+     * Creates the type serializer using the given creator. The creator 
captures the config from the
+     * framework; subclasses call {@code 
creator.createSerializer(innerTypeInfo)} to obtain the
+     * inner serializer and then wrap or return it.
+     */
+    protected abstract TypeSerializer<T> createSerializer(
+            TypeSerializerCreator typeSerializerCreator);
+
+    /**
+     * Creator that, given a TypeInformation, returns its serializer for the 
current config. Passed
+     * by the adapter so that config is forwarded to the underlying 
TypeInformation.
+     */
+    @FunctionalInterface
+    public interface TypeSerializerCreator {
+        TypeSerializer<?> createSerializer(TypeInformation<?> typeInfo);
+    }
+}
diff --git 
a/fluss-flink/fluss-flink-1.18/src/test/java/org/apache/fluss/flink/tiering/Flink18TieringITCase.java
 
b/fluss-flink/fluss-flink-1.18/src/test/java/org/apache/fluss/flink/tiering/Flink18TieringITCase.java
new file mode 100644
index 000000000..ea7cd618e
--- /dev/null
+++ 
b/fluss-flink/fluss-flink-1.18/src/test/java/org/apache/fluss/flink/tiering/Flink18TieringITCase.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.flink.tiering;
+
+/** The IT case for tiering in Flink 1.18. */
+class Flink18TieringITCase extends TieringITCase {}
diff --git 
a/fluss-flink/fluss-flink-1.19/src/test/java/org/apache/fluss/flink/tiering/Flink19TieringITCase.java
 
b/fluss-flink/fluss-flink-1.19/src/test/java/org/apache/fluss/flink/tiering/Flink19TieringITCase.java
new file mode 100644
index 000000000..81fb9fc63
--- /dev/null
+++ 
b/fluss-flink/fluss-flink-1.19/src/test/java/org/apache/fluss/flink/tiering/Flink19TieringITCase.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.flink.tiering;
+
+/** The IT case for tiering in Flink 1.19. */
+class Flink19TieringITCase extends TieringITCase {}
diff --git 
a/fluss-flink/fluss-flink-1.20/src/test/java/org/apache/fluss/flink/tiering/Flink20TieringITCase.java
 
b/fluss-flink/fluss-flink-1.20/src/test/java/org/apache/fluss/flink/tiering/Flink20TieringITCase.java
new file mode 100644
index 000000000..73d2778b0
--- /dev/null
+++ 
b/fluss-flink/fluss-flink-1.20/src/test/java/org/apache/fluss/flink/tiering/Flink20TieringITCase.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.flink.tiering;
+
+/** The IT case for tiering in Flink 1.20. */
+class Flink20TieringITCase extends TieringITCase {}
diff --git 
a/fluss-flink/fluss-flink-2.2/src/main/java/org/apache/fluss/flink/adapter/TypeInformationAdapter.java
 
b/fluss-flink/fluss-flink-2.2/src/main/java/org/apache/fluss/flink/adapter/TypeInformationAdapter.java
new file mode 100644
index 000000000..81f05766e
--- /dev/null
+++ 
b/fluss-flink/fluss-flink-2.2/src/main/java/org/apache/fluss/flink/adapter/TypeInformationAdapter.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.flink.adapter;
+
+import org.apache.flink.api.common.serialization.SerializerConfig;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+
+/**
+ * Flink 2.2 variant of the type information adapter.
+ *
+ * <p>In Flink 2.2, {@link TypeInformation} declares {@code 
createSerializer(SerializerConfig)} as
+ * the abstract method. This adapter implements that method and delegates to 
{@link
+ * #createSerializer(TypeSerializerCreator)} with a creator that forwards the 
config, so subclasses
+ * (e.g. in fluss-flink-common) can obtain the inner serializer and wrap it 
without touching
+ * SerializerConfig here.
+ *
+ * <p>When building for Flink 2.2, this class is used instead of the common 
adapter so that the
+ * correct API is implemented for this Flink version. See fluss-flink-common's
+ * TypeInformationAdapter for the version that implements both 
createSerializer(SerializerConfig)
+ * and createSerializer(ExecutionConfig).
+ *
+ * @param <T> the type described by this type information
+ */
+public abstract class TypeInformationAdapter<T> extends TypeInformation<T> {
+
+    @Override
+    public TypeSerializer<T> createSerializer(SerializerConfig config) {
+        return createSerializer(typeInfo -> typeInfo.createSerializer(config));
+    }
+
+    /**
+     * Creates the type serializer using the given creator. The creator 
captures the config from the
+     * framework; subclasses call {@code 
creator.createSerializer(innerTypeInfo)} to obtain the
+     * inner serializer and then wrap or return it.
+     */
+    protected abstract TypeSerializer<T> createSerializer(
+            TypeInformationAdapter.TypeSerializerCreator 
typeSerializerCreator);
+
+    /**
+     * Creator that, given a TypeInformation, returns its serializer for the 
current config. Passed
+     * by the adapter so that config is forwarded to the underlying 
TypeInformation.
+     */
+    @FunctionalInterface
+    public interface TypeSerializerCreator {
+        TypeSerializer<?> createSerializer(TypeInformation<?> typeInfo);
+    }
+}
diff --git 
a/fluss-flink/fluss-flink-2.2/src/test/java/org/apache/fluss/flink/tiering/Flink22TieringITCase.java
 
b/fluss-flink/fluss-flink-2.2/src/test/java/org/apache/fluss/flink/tiering/Flink22TieringITCase.java
new file mode 100644
index 000000000..36269d854
--- /dev/null
+++ 
b/fluss-flink/fluss-flink-2.2/src/test/java/org/apache/fluss/flink/tiering/Flink22TieringITCase.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.flink.tiering;
+
+/** The IT case for tiering in Flink 2.2. */
+class Flink22TieringITCase extends TieringITCase {}
diff --git 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/adapter/TypeInformationAdapter.java
 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/adapter/TypeInformationAdapter.java
new file mode 100644
index 000000000..fdd31b05f
--- /dev/null
+++ 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/adapter/TypeInformationAdapter.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.flink.adapter;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.serialization.SerializerConfig;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+
+/**
+ * Type information adapter which hides the different version of {@code 
createSerializer} method.
+ *
+ * <p>Implements both {@link #createSerializer(SerializerConfig)} and {@link
+ * #createSerializer(ExecutionConfig)} by delegating to {@link
+ * #createSerializer(TypeSerializerCreator)} with a creator that, given a 
{@link TypeInformation},
+ * returns {@code typeInfo.createSerializer(config)}. Subclasses implement 
{@link
+ * #createSerializer(TypeSerializerCreator)} by calling the creator with their 
inner type
+ * information (e.g. row type) and wrapping the result. This way config is 
passed through to the
+ * underlying TypeInformation.
+ *
+ * <p>See {@link 
org.apache.fluss.flink.sink.shuffle.StatisticsOrRecordTypeInformation} for 
usage.
+ *
+ * <p>TODO: remove this class when no longer supporting Flink versions that 
only have one of the two
+ * createSerializer signatures.
+ *
+ * @param <T> the type described by this type information
+ */
+public abstract class TypeInformationAdapter<T> extends TypeInformation<T> {
+
+    @Override
+    public TypeSerializer<T> createSerializer(SerializerConfig config) {
+        return createSerializer(typeInfo -> typeInfo.createSerializer(config));
+    }
+
+    @Override
+    @Deprecated
+    public TypeSerializer<T> createSerializer(ExecutionConfig config) {
+        return createSerializer(typeInfo -> typeInfo.createSerializer(config));
+    }
+
+    /**
+     * Creates the type serializer using the given creator. The creator 
captures the config from the
+     * framework; subclasses call {@code 
creator.createSerializer(innerTypeInfo)} to obtain the
+     * inner serializer and then wrap or return it.
+     */
+    protected abstract TypeSerializer<T> createSerializer(
+            TypeSerializerCreator typeSerializerCreator);
+
+    /**
+     * Creator that, given a TypeInformation, returns its serializer for the 
current config. Passed
+     * by the adapter so that config is forwarded to the underlying 
TypeInformation.
+     */
+    @FunctionalInterface
+    public interface TypeSerializerCreator {
+        TypeSerializer<?> createSerializer(TypeInformation<?> typeInfo);
+    }
+}
diff --git 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/shuffle/StatisticsOrRecordTypeInformation.java
 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/shuffle/StatisticsOrRecordTypeInformation.java
index ca5721040..a410fdb57 100644
--- 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/shuffle/StatisticsOrRecordTypeInformation.java
+++ 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/shuffle/StatisticsOrRecordTypeInformation.java
@@ -18,9 +18,8 @@
 package org.apache.fluss.flink.sink.shuffle;
 
 import org.apache.fluss.annotation.Internal;
+import org.apache.fluss.flink.adapter.TypeInformationAdapter;
 
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.serialization.SerializerConfig;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 
@@ -33,7 +32,7 @@ import java.util.Objects;
  */
 @Internal
 public class StatisticsOrRecordTypeInformation<InputT>
-        extends TypeInformation<StatisticsOrRecord<InputT>> {
+        extends TypeInformationAdapter<StatisticsOrRecord<InputT>> {
 
     private final TypeInformation<InputT> rowTypeInformation;
     private final DataStatisticsSerializer globalStatisticsSerializer;
@@ -75,15 +74,11 @@ public class StatisticsOrRecordTypeInformation<InputT>
     }
 
     @Override
-    public TypeSerializer<StatisticsOrRecord<InputT>> 
createSerializer(SerializerConfig config) {
-        TypeSerializer<InputT> recordSerializer = 
rowTypeInformation.createSerializer(config);
-        return new StatisticsOrRecordSerializer<>(globalStatisticsSerializer, 
recordSerializer);
-    }
-
-    @Override
-    @Deprecated
-    public TypeSerializer<StatisticsOrRecord<InputT>> 
createSerializer(ExecutionConfig config) {
-        TypeSerializer<InputT> recordSerializer = 
rowTypeInformation.createSerializer(config);
+    @SuppressWarnings("unchecked")
+    protected TypeSerializer<StatisticsOrRecord<InputT>> createSerializer(
+            TypeInformationAdapter.TypeSerializerCreator 
typeSerializerCreator) {
+        TypeSerializer<InputT> recordSerializer =
+                (TypeSerializer<InputT>) 
typeSerializerCreator.createSerializer(rowTypeInformation);
         return new StatisticsOrRecordSerializer<>(globalStatisticsSerializer, 
recordSerializer);
     }
 
@@ -93,6 +88,7 @@ public class StatisticsOrRecordTypeInformation<InputT>
     }
 
     @Override
+    @SuppressWarnings("unchecked")
     public boolean equals(Object o) {
         if (this == o) {
             return true;
diff --git 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/committer/CommittableMessageTypeInfo.java
 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/committer/CommittableMessageTypeInfo.java
index c0cf567ce..d541721d1 100644
--- 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/committer/CommittableMessageTypeInfo.java
+++ 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/committer/CommittableMessageTypeInfo.java
@@ -17,10 +17,10 @@
 
 package org.apache.fluss.flink.tiering.committer;
 
+import org.apache.fluss.flink.adapter.TypeInformationAdapter;
 import org.apache.fluss.flink.tiering.source.TableBucketWriteResult;
 import org.apache.fluss.lake.serializer.SimpleVersionedSerializer;
 
-import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.core.io.SimpleVersionedSerializerTypeSerializerProxy;
@@ -30,7 +30,7 @@ import java.io.IOException;
 
 /** A {@link TypeInformation} for {@link CommittableMessage}. */
 public class CommittableMessageTypeInfo<Committable>
-        extends TypeInformation<CommittableMessage<Committable>> {
+        extends TypeInformationAdapter<CommittableMessage<Committable>> {
 
     private final SerializableSupplier<SimpleVersionedSerializer<Committable>>
             committableSerializerFactory;
@@ -79,8 +79,8 @@ public class CommittableMessageTypeInfo<Committable>
     }
 
     @Override
-    public TypeSerializer<CommittableMessage<Committable>> createSerializer(
-            ExecutionConfig executionConfig) {
+    protected TypeSerializer<CommittableMessage<Committable>> createSerializer(
+            TypeSerializerCreator typeSerializerCreator) {
         return new 
SimpleVersionedSerializerTypeSerializerProxy<CommittableMessage<Committable>>(
                 () ->
                         new org.apache.flink.core.io.SimpleVersionedSerializer<
diff --git 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TableBucketWriteResultTypeInfo.java
 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TableBucketWriteResultTypeInfo.java
index d68cf5d71..424673c26 100644
--- 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TableBucketWriteResultTypeInfo.java
+++ 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TableBucketWriteResultTypeInfo.java
@@ -17,9 +17,9 @@
 
 package org.apache.fluss.flink.tiering.source;
 
+import org.apache.fluss.flink.adapter.TypeInformationAdapter;
 import org.apache.fluss.lake.serializer.SimpleVersionedSerializer;
 
-import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.core.io.SimpleVersionedSerializerTypeSerializerProxy;
@@ -27,7 +27,7 @@ import org.apache.flink.util.function.SerializableSupplier;
 
 /** A {@link TypeInformation} for {@link TableBucketWriteResult} . */
 public class TableBucketWriteResultTypeInfo<WriteResult>
-        extends TypeInformation<TableBucketWriteResult<WriteResult>> {
+        extends TypeInformationAdapter<TableBucketWriteResult<WriteResult>> {
 
     private final SerializableSupplier<SimpleVersionedSerializer<WriteResult>>
             writeResultSerializerFactory;
@@ -76,8 +76,8 @@ public class TableBucketWriteResultTypeInfo<WriteResult>
     }
 
     @Override
-    public TypeSerializer<TableBucketWriteResult<WriteResult>> 
createSerializer(
-            ExecutionConfig executionConfig) {
+    protected TypeSerializer<TableBucketWriteResult<WriteResult>> 
createSerializer(
+            TypeSerializerCreator typeSerializerCreator) {
         // no copy, so that data from lake writer is directly going into lake 
committer while
         // chaining
         return new SimpleVersionedSerializerTypeSerializerProxy<
diff --git 
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/TieringITCase.java
 
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/TieringITCase.java
index fe81b5a84..86b89e4a7 100644
--- 
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/TieringITCase.java
+++ 
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/TieringITCase.java
@@ -48,7 +48,7 @@ import static 
org.apache.fluss.testutils.common.CommonTestUtils.waitValue;
 import static org.assertj.core.api.Assertions.assertThat;
 
 /** The IT case for tiering. */
-class TieringITCase extends FlinkTieringTestBase {
+abstract class TieringITCase extends FlinkTieringTestBase {
 
     @BeforeAll
     protected static void beforeAll() {

Reply via email to