This is an automated email from the ASF dual-hosted git repository. jark pushed a commit to branch release-0.9 in repository https://gitbox.apache.org/repos/asf/fluss.git
commit 86f704f4711920934736ff2a00a1833e77abb6dd Author: yuxia Luo <[email protected]> AuthorDate: Thu Feb 12 17:45:06 2026 +0800 [lake/flink] Fix lake tiering doesn't work in flink 2.2 (#2657) --- .../flink/adapter/TypeInformationAdapter.java | 64 +++++++++++++++++++ .../fluss/flink/tiering/Flink18TieringITCase.java | 22 +++++++ .../fluss/flink/tiering/Flink19TieringITCase.java | 22 +++++++ .../fluss/flink/tiering/Flink20TieringITCase.java | 22 +++++++ .../flink/adapter/TypeInformationAdapter.java | 64 +++++++++++++++++++ .../fluss/flink/tiering/Flink22TieringITCase.java | 22 +++++++ .../flink/adapter/TypeInformationAdapter.java | 73 ++++++++++++++++++++++ .../shuffle/StatisticsOrRecordTypeInformation.java | 20 +++--- .../committer/CommittableMessageTypeInfo.java | 8 +-- .../source/TableBucketWriteResultTypeInfo.java | 8 +-- .../apache/fluss/flink/tiering/TieringITCase.java | 2 +- 11 files changed, 306 insertions(+), 21 deletions(-) diff --git a/fluss-flink/fluss-flink-1.18/src/main/java/org/apache/fluss/flink/adapter/TypeInformationAdapter.java b/fluss-flink/fluss-flink-1.18/src/main/java/org/apache/fluss/flink/adapter/TypeInformationAdapter.java new file mode 100644 index 000000000..1efc177bd --- /dev/null +++ b/fluss-flink/fluss-flink-1.18/src/main/java/org/apache/fluss/flink/adapter/TypeInformationAdapter.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.flink.adapter; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeutils.TypeSerializer; + +/** + * Flink 1.18 variant of the type information adapter. + * + * <p>In Flink 1.18, {@link TypeInformation} only declares {@code + * createSerializer(ExecutionConfig)}. This adapter implements that method and delegates to {@link + * #createSerializer(TypeSerializerCreator)} with a creator that forwards the config, so subclasses + * (e.g. in fluss-flink-common) can obtain the inner serializer and wrap it without touching + * ExecutionConfig here. + * + * <p>When building for Flink 1.18, this class is used instead of the common adapter so that + * SerializerConfig is not required on the classpath. See fluss-flink-common's + * TypeInformationAdapter for the version that implements both createSerializer(SerializerConfig) + * and createSerializer(ExecutionConfig). + * + * @param <T> the type described by this type information + */ +public abstract class TypeInformationAdapter<T> extends TypeInformation<T> { + + @Override + public TypeSerializer<T> createSerializer(ExecutionConfig config) { + return createSerializer(typeInfo -> typeInfo.createSerializer(config)); + } + + /** + * Creates the type serializer using the given creator. The creator captures the config from the + * framework; subclasses call {@code creator.createSerializer(innerTypeInfo)} to obtain the + * inner serializer and then wrap or return it. + */ + protected abstract TypeSerializer<T> createSerializer( + TypeSerializerCreator typeSerializerCreator); + + /** + * Creator that, given a TypeInformation, returns its serializer for the current config. Passed + * by the adapter so that config is forwarded to the underlying TypeInformation. + */ + @FunctionalInterface + public interface TypeSerializerCreator { + TypeSerializer<?> createSerializer(TypeInformation<?> typeInfo); + } +} diff --git a/fluss-flink/fluss-flink-1.18/src/test/java/org/apache/fluss/flink/tiering/Flink18TieringITCase.java b/fluss-flink/fluss-flink-1.18/src/test/java/org/apache/fluss/flink/tiering/Flink18TieringITCase.java new file mode 100644 index 000000000..ea7cd618e --- /dev/null +++ b/fluss-flink/fluss-flink-1.18/src/test/java/org/apache/fluss/flink/tiering/Flink18TieringITCase.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.flink.tiering; + +/** The IT case for tiering in Flink 1.18. */ +class Flink18TieringITCase extends TieringITCase {} diff --git a/fluss-flink/fluss-flink-1.19/src/test/java/org/apache/fluss/flink/tiering/Flink19TieringITCase.java b/fluss-flink/fluss-flink-1.19/src/test/java/org/apache/fluss/flink/tiering/Flink19TieringITCase.java new file mode 100644 index 000000000..81fb9fc63 --- /dev/null +++ b/fluss-flink/fluss-flink-1.19/src/test/java/org/apache/fluss/flink/tiering/Flink19TieringITCase.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.flink.tiering; + +/** The IT case for tiering in Flink 1.19. */ +class Flink19TieringITCase extends TieringITCase {} diff --git a/fluss-flink/fluss-flink-1.20/src/test/java/org/apache/fluss/flink/tiering/Flink20TieringITCase.java b/fluss-flink/fluss-flink-1.20/src/test/java/org/apache/fluss/flink/tiering/Flink20TieringITCase.java new file mode 100644 index 000000000..73d2778b0 --- /dev/null +++ b/fluss-flink/fluss-flink-1.20/src/test/java/org/apache/fluss/flink/tiering/Flink20TieringITCase.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.flink.tiering; + +/** The IT case for tiering in Flink 1.20. */ +class Flink20TieringITCase extends TieringITCase {} diff --git a/fluss-flink/fluss-flink-2.2/src/main/java/org/apache/fluss/flink/adapter/TypeInformationAdapter.java b/fluss-flink/fluss-flink-2.2/src/main/java/org/apache/fluss/flink/adapter/TypeInformationAdapter.java new file mode 100644 index 000000000..81f05766e --- /dev/null +++ b/fluss-flink/fluss-flink-2.2/src/main/java/org/apache/fluss/flink/adapter/TypeInformationAdapter.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.flink.adapter; + +import org.apache.flink.api.common.serialization.SerializerConfig; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeutils.TypeSerializer; + +/** + * Flink 2.2 variant of the type information adapter. + * + * <p>In Flink 2.2, {@link TypeInformation} declares {@code createSerializer(SerializerConfig)} as + * the abstract method. This adapter implements that method and delegates to {@link + * #createSerializer(TypeSerializerCreator)} with a creator that forwards the config, so subclasses + * (e.g. in fluss-flink-common) can obtain the inner serializer and wrap it without touching + * SerializerConfig here. + * + * <p>When building for Flink 2.2, this class is used instead of the common adapter so that the + * correct API is implemented for this Flink version. See fluss-flink-common's + * TypeInformationAdapter for the version that implements both createSerializer(SerializerConfig) + * and createSerializer(ExecutionConfig). + * + * @param <T> the type described by this type information + */ +public abstract class TypeInformationAdapter<T> extends TypeInformation<T> { + + @Override + public TypeSerializer<T> createSerializer(SerializerConfig config) { + return createSerializer(typeInfo -> typeInfo.createSerializer(config)); + } + + /** + * Creates the type serializer using the given creator. The creator captures the config from the + * framework; subclasses call {@code creator.createSerializer(innerTypeInfo)} to obtain the + * inner serializer and then wrap or return it. + */ + protected abstract TypeSerializer<T> createSerializer( + TypeInformationAdapter.TypeSerializerCreator typeSerializerCreator); + + /** + * Creator that, given a TypeInformation, returns its serializer for the current config. Passed + * by the adapter so that config is forwarded to the underlying TypeInformation. + */ + @FunctionalInterface + public interface TypeSerializerCreator { + TypeSerializer<?> createSerializer(TypeInformation<?> typeInfo); + } +} diff --git a/fluss-flink/fluss-flink-2.2/src/test/java/org/apache/fluss/flink/tiering/Flink22TieringITCase.java b/fluss-flink/fluss-flink-2.2/src/test/java/org/apache/fluss/flink/tiering/Flink22TieringITCase.java new file mode 100644 index 000000000..36269d854 --- /dev/null +++ b/fluss-flink/fluss-flink-2.2/src/test/java/org/apache/fluss/flink/tiering/Flink22TieringITCase.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.flink.tiering; + +/** The IT case for tiering in Flink 2.2. */ +class Flink22TieringITCase extends TieringITCase {} diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/adapter/TypeInformationAdapter.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/adapter/TypeInformationAdapter.java new file mode 100644 index 000000000..fdd31b05f --- /dev/null +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/adapter/TypeInformationAdapter.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.flink.adapter; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.serialization.SerializerConfig; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeutils.TypeSerializer; + +/** + * Type information adapter which hides the different version of {@code createSerializer} method. + * + * <p>Implements both {@link #createSerializer(SerializerConfig)} and {@link + * #createSerializer(ExecutionConfig)} by delegating to {@link + * #createSerializer(TypeSerializerCreator)} with a creator that, given a {@link TypeInformation}, + * returns {@code typeInfo.createSerializer(config)}. Subclasses implement {@link + * #createSerializer(TypeSerializerCreator)} by calling the creator with their inner type + * information (e.g. row type) and wrapping the result. This way config is passed through to the + * underlying TypeInformation. + * + * <p>See {@link org.apache.fluss.flink.sink.shuffle.StatisticsOrRecordTypeInformation} for usage. + * + * <p>TODO: remove this class when no longer supporting Flink versions that only have one of the two + * createSerializer signatures. + * + * @param <T> the type described by this type information + */ +public abstract class TypeInformationAdapter<T> extends TypeInformation<T> { + + @Override + public TypeSerializer<T> createSerializer(SerializerConfig config) { + return createSerializer(typeInfo -> typeInfo.createSerializer(config)); + } + + @Override + @Deprecated + public TypeSerializer<T> createSerializer(ExecutionConfig config) { + return createSerializer(typeInfo -> typeInfo.createSerializer(config)); + } + + /** + * Creates the type serializer using the given creator. The creator captures the config from the + * framework; subclasses call {@code creator.createSerializer(innerTypeInfo)} to obtain the + * inner serializer and then wrap or return it. + */ + protected abstract TypeSerializer<T> createSerializer( + TypeSerializerCreator typeSerializerCreator); + + /** + * Creator that, given a TypeInformation, returns its serializer for the current config. Passed + * by the adapter so that config is forwarded to the underlying TypeInformation. + */ + @FunctionalInterface + public interface TypeSerializerCreator { + TypeSerializer<?> createSerializer(TypeInformation<?> typeInfo); + } +} diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/shuffle/StatisticsOrRecordTypeInformation.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/shuffle/StatisticsOrRecordTypeInformation.java index ca5721040..a410fdb57 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/shuffle/StatisticsOrRecordTypeInformation.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/shuffle/StatisticsOrRecordTypeInformation.java @@ -18,9 +18,8 @@ package org.apache.fluss.flink.sink.shuffle; import org.apache.fluss.annotation.Internal; +import org.apache.fluss.flink.adapter.TypeInformationAdapter; -import org.apache.flink.api.common.ExecutionConfig; -import org.apache.flink.api.common.serialization.SerializerConfig; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeutils.TypeSerializer; @@ -33,7 +32,7 @@ import java.util.Objects; */ @Internal public class StatisticsOrRecordTypeInformation<InputT> - extends TypeInformation<StatisticsOrRecord<InputT>> { + extends TypeInformationAdapter<StatisticsOrRecord<InputT>> { private final TypeInformation<InputT> rowTypeInformation; private final DataStatisticsSerializer globalStatisticsSerializer; @@ -75,15 +74,11 @@ public class StatisticsOrRecordTypeInformation<InputT> } @Override - public TypeSerializer<StatisticsOrRecord<InputT>> createSerializer(SerializerConfig config) { - TypeSerializer<InputT> recordSerializer = rowTypeInformation.createSerializer(config); - return new StatisticsOrRecordSerializer<>(globalStatisticsSerializer, recordSerializer); - } - - @Override - @Deprecated - public TypeSerializer<StatisticsOrRecord<InputT>> createSerializer(ExecutionConfig config) { - TypeSerializer<InputT> recordSerializer = rowTypeInformation.createSerializer(config); + @SuppressWarnings("unchecked") + protected TypeSerializer<StatisticsOrRecord<InputT>> createSerializer( + TypeInformationAdapter.TypeSerializerCreator typeSerializerCreator) { + TypeSerializer<InputT> recordSerializer = + (TypeSerializer<InputT>) typeSerializerCreator.createSerializer(rowTypeInformation); return new StatisticsOrRecordSerializer<>(globalStatisticsSerializer, recordSerializer); } @@ -93,6 +88,7 @@ public class StatisticsOrRecordTypeInformation<InputT> } @Override + @SuppressWarnings("unchecked") public boolean equals(Object o) { if (this == o) { return true; diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/committer/CommittableMessageTypeInfo.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/committer/CommittableMessageTypeInfo.java index c0cf567ce..d541721d1 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/committer/CommittableMessageTypeInfo.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/committer/CommittableMessageTypeInfo.java @@ -17,10 +17,10 @@ package org.apache.fluss.flink.tiering.committer; +import org.apache.fluss.flink.adapter.TypeInformationAdapter; import org.apache.fluss.flink.tiering.source.TableBucketWriteResult; import org.apache.fluss.lake.serializer.SimpleVersionedSerializer; -import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.core.io.SimpleVersionedSerializerTypeSerializerProxy; @@ -30,7 +30,7 @@ import java.io.IOException; /** A {@link TypeInformation} for {@link CommittableMessage}. */ public class CommittableMessageTypeInfo<Committable> - extends TypeInformation<CommittableMessage<Committable>> { + extends TypeInformationAdapter<CommittableMessage<Committable>> { private final SerializableSupplier<SimpleVersionedSerializer<Committable>> committableSerializerFactory; @@ -79,8 +79,8 @@ public class CommittableMessageTypeInfo<Committable> } @Override - public TypeSerializer<CommittableMessage<Committable>> createSerializer( - ExecutionConfig executionConfig) { + protected TypeSerializer<CommittableMessage<Committable>> createSerializer( + TypeSerializerCreator typeSerializerCreator) { return new SimpleVersionedSerializerTypeSerializerProxy<CommittableMessage<Committable>>( () -> new org.apache.flink.core.io.SimpleVersionedSerializer< diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TableBucketWriteResultTypeInfo.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TableBucketWriteResultTypeInfo.java index d68cf5d71..424673c26 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TableBucketWriteResultTypeInfo.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TableBucketWriteResultTypeInfo.java @@ -17,9 +17,9 @@ package org.apache.fluss.flink.tiering.source; +import org.apache.fluss.flink.adapter.TypeInformationAdapter; import org.apache.fluss.lake.serializer.SimpleVersionedSerializer; -import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.core.io.SimpleVersionedSerializerTypeSerializerProxy; @@ -27,7 +27,7 @@ import org.apache.flink.util.function.SerializableSupplier; /** A {@link TypeInformation} for {@link TableBucketWriteResult} . */ public class TableBucketWriteResultTypeInfo<WriteResult> - extends TypeInformation<TableBucketWriteResult<WriteResult>> { + extends TypeInformationAdapter<TableBucketWriteResult<WriteResult>> { private final SerializableSupplier<SimpleVersionedSerializer<WriteResult>> writeResultSerializerFactory; @@ -76,8 +76,8 @@ public class TableBucketWriteResultTypeInfo<WriteResult> } @Override - public TypeSerializer<TableBucketWriteResult<WriteResult>> createSerializer( - ExecutionConfig executionConfig) { + protected TypeSerializer<TableBucketWriteResult<WriteResult>> createSerializer( + TypeSerializerCreator typeSerializerCreator) { // no copy, so that data from lake writer is directly going into lake committer while // chaining return new SimpleVersionedSerializerTypeSerializerProxy< diff --git a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/TieringITCase.java b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/TieringITCase.java index fe81b5a84..86b89e4a7 100644 --- a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/TieringITCase.java +++ b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/TieringITCase.java @@ -48,7 +48,7 @@ import static org.apache.fluss.testutils.common.CommonTestUtils.waitValue; import static org.assertj.core.api.Assertions.assertThat; /** The IT case for tiering. */ -class TieringITCase extends FlinkTieringTestBase { +abstract class TieringITCase extends FlinkTieringTestBase { @BeforeAll protected static void beforeAll() {
