This is an automated email from the ASF dual-hosted git repository.
jark pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git
The following commit(s) were added to refs/heads/main by this push:
new 1055e42c4 [flink] Add RuntimeContext adapter for Flink 2.x
compatibility (#2241)
1055e42c4 is described below
commit 1055e42c40efc019856b81419e918829f2574366
Author: Pei Yu <[email protected]>
AuthorDate: Mon Dec 29 10:19:20 2025 +0800
[flink] Add RuntimeContext adapter for Flink 2.x compatibility (#2241)
---
.../fluss/flink/adapter/RuntimeContextAdapter.java | 34 ++++++++++++++
.../adapter/StreamOperatorParametersAdapter.java | 53 +++++++++++++++++++++
.../Flink118TieringCommitOperatorTest.java | 24 ++++++++++
.../adapter/StreamOperatorParametersAdapter.java | 53 +++++++++++++++++++++
.../Flink119TieringCommitOperatorTest.java | 24 ++++++++++
.../Flink22TieringCommitOperatorTest.java | 24 ++++++++++
.../fluss/flink/adapter/RuntimeContextAdapter.java | 34 ++++++++++++++
.../tiering/committer/TieringCommitOperator.java | 3 +-
.../adapter/StreamOperatorParametersAdapter.java | 54 ++++++++++++++++++++++
.../committer/TieringCommitOperatorTest.java | 3 +-
10 files changed, 304 insertions(+), 2 deletions(-)
diff --git
a/fluss-flink/fluss-flink-1.18/src/main/java/org/apache/fluss/flink/adapter/RuntimeContextAdapter.java
b/fluss-flink/fluss-flink-1.18/src/main/java/org/apache/fluss/flink/adapter/RuntimeContextAdapter.java
new file mode 100644
index 000000000..be97b7048
--- /dev/null
+++
b/fluss-flink/fluss-flink-1.18/src/main/java/org/apache/fluss/flink/adapter/RuntimeContextAdapter.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.flink.adapter;
+
+import org.apache.flink.api.common.functions.RuntimeContext;
+
+/**
+ * An adapter for Flink {@link RuntimeContext} class. The {@link
RuntimeContext} class added the
+ * `getJobInfo` and `getTaskInfo` methods in version 1.19 and deprecated many
methods, such as
+ * `getAttemptNumber`.
+ *
+ * <p>TODO: remove this class when no longer support flink 1.18.
+ */
+public class RuntimeContextAdapter {
+
+ public static int getAttemptNumber(RuntimeContext runtimeContext) {
+ return runtimeContext.getAttemptNumber();
+ }
+}
diff --git
a/fluss-flink/fluss-flink-1.18/src/test/java/org/apache/fluss/flink/adapter/StreamOperatorParametersAdapter.java
b/fluss-flink/fluss-flink-1.18/src/test/java/org/apache/fluss/flink/adapter/StreamOperatorParametersAdapter.java
new file mode 100644
index 000000000..49f7038a4
--- /dev/null
+++
b/fluss-flink/fluss-flink-1.18/src/test/java/org/apache/fluss/flink/adapter/StreamOperatorParametersAdapter.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.flink.adapter;
+
+import org.apache.flink.api.common.operators.MailboxExecutor;
+import org.apache.flink.runtime.operators.coordination.OperatorEventDispatcher;
+import org.apache.flink.streaming.api.graph.StreamConfig;
+import org.apache.flink.streaming.api.operators.Output;
+import org.apache.flink.streaming.api.operators.StreamOperatorParameters;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
+import org.apache.flink.streaming.runtime.tasks.StreamTask;
+
+import java.util.function.Supplier;
+
+/**
+ * Adapter for {@link StreamOperatorParameters} because the constructor is
compatibility in flink
+ * 1.18 and 1.19. However, this constructor only used in test.
+ *
+ * <p>TODO: remove this class when no longer support flink 1.18 and 1.19.
+ */
+public class StreamOperatorParametersAdapter {
+
+ public static <OUT> StreamOperatorParameters<OUT> create(
+ StreamTask<?, ?> containingTask,
+ StreamConfig config,
+ Output<StreamRecord<OUT>> output,
+ Supplier<ProcessingTimeService> processingTimeServiceFactory,
+ OperatorEventDispatcher operatorEventDispatcher,
+ MailboxExecutor mailboxExecutor) {
+ return new StreamOperatorParameters<>(
+ containingTask,
+ config,
+ output,
+ processingTimeServiceFactory,
+ operatorEventDispatcher);
+ }
+}
diff --git
a/fluss-flink/fluss-flink-1.18/src/test/java/org/apache/fluss/flink/tiering/committer/Flink118TieringCommitOperatorTest.java
b/fluss-flink/fluss-flink-1.18/src/test/java/org/apache/fluss/flink/tiering/committer/Flink118TieringCommitOperatorTest.java
new file mode 100644
index 000000000..b3c04505e
--- /dev/null
+++
b/fluss-flink/fluss-flink-1.18/src/test/java/org/apache/fluss/flink/tiering/committer/Flink118TieringCommitOperatorTest.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.flink.tiering.committer;
+
+/**
+ * UT for {@link TieringCommitOperator}. Test the compatibility of the
`getAttemptNumber` method in
+ * flink 1.18.
+ */
+public class Flink118TieringCommitOperatorTest extends
TieringCommitOperatorTest {}
diff --git
a/fluss-flink/fluss-flink-1.19/src/test/java/org/apache/fluss/flink/adapter/StreamOperatorParametersAdapter.java
b/fluss-flink/fluss-flink-1.19/src/test/java/org/apache/fluss/flink/adapter/StreamOperatorParametersAdapter.java
new file mode 100644
index 000000000..49f7038a4
--- /dev/null
+++
b/fluss-flink/fluss-flink-1.19/src/test/java/org/apache/fluss/flink/adapter/StreamOperatorParametersAdapter.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.flink.adapter;
+
+import org.apache.flink.api.common.operators.MailboxExecutor;
+import org.apache.flink.runtime.operators.coordination.OperatorEventDispatcher;
+import org.apache.flink.streaming.api.graph.StreamConfig;
+import org.apache.flink.streaming.api.operators.Output;
+import org.apache.flink.streaming.api.operators.StreamOperatorParameters;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
+import org.apache.flink.streaming.runtime.tasks.StreamTask;
+
+import java.util.function.Supplier;
+
+/**
+ * Adapter for {@link StreamOperatorParameters} because the constructor is
compatibility in flink
+ * 1.18 and 1.19. However, this constructor only used in test.
+ *
+ * <p>TODO: remove this class when no longer support flink 1.18 and 1.19.
+ */
+public class StreamOperatorParametersAdapter {
+
+ public static <OUT> StreamOperatorParameters<OUT> create(
+ StreamTask<?, ?> containingTask,
+ StreamConfig config,
+ Output<StreamRecord<OUT>> output,
+ Supplier<ProcessingTimeService> processingTimeServiceFactory,
+ OperatorEventDispatcher operatorEventDispatcher,
+ MailboxExecutor mailboxExecutor) {
+ return new StreamOperatorParameters<>(
+ containingTask,
+ config,
+ output,
+ processingTimeServiceFactory,
+ operatorEventDispatcher);
+ }
+}
diff --git
a/fluss-flink/fluss-flink-1.19/src/test/java/org/apache/fluss/flink/tiering/committer/Flink119TieringCommitOperatorTest.java
b/fluss-flink/fluss-flink-1.19/src/test/java/org/apache/fluss/flink/tiering/committer/Flink119TieringCommitOperatorTest.java
new file mode 100644
index 000000000..7fd1a20d0
--- /dev/null
+++
b/fluss-flink/fluss-flink-1.19/src/test/java/org/apache/fluss/flink/tiering/committer/Flink119TieringCommitOperatorTest.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.flink.tiering.committer;
+
+/**
+ * UT for {@link TieringCommitOperator}. Test the compatibility of the
`getAttemptNumber` method in
+ * flink 1.19.
+ */
+public class Flink119TieringCommitOperatorTest extends
TieringCommitOperatorTest {}
diff --git
a/fluss-flink/fluss-flink-2.2/src/test/java/org/apache/fluss/flink/tiering/committer/Flink22TieringCommitOperatorTest.java
b/fluss-flink/fluss-flink-2.2/src/test/java/org/apache/fluss/flink/tiering/committer/Flink22TieringCommitOperatorTest.java
new file mode 100644
index 000000000..25cbfcc3f
--- /dev/null
+++
b/fluss-flink/fluss-flink-2.2/src/test/java/org/apache/fluss/flink/tiering/committer/Flink22TieringCommitOperatorTest.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.flink.tiering.committer;
+
+/**
+ * UT for {@link TieringCommitOperator}. Test the compatibility of the
`getAttemptNumber` method in
+ * flink 2.2.
+ */
+public class Flink22TieringCommitOperatorTest extends
TieringCommitOperatorTest {}
diff --git
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/adapter/RuntimeContextAdapter.java
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/adapter/RuntimeContextAdapter.java
new file mode 100644
index 000000000..f635fbeb6
--- /dev/null
+++
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/adapter/RuntimeContextAdapter.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.flink.adapter;
+
+import org.apache.flink.api.common.functions.RuntimeContext;
+
+/**
+ * An adapter for Flink {@link RuntimeContext} class. The {@link
RuntimeContext} class added the
+ * `getJobInfo` and `getTaskInfo` methods in version 1.19 and deprecated many
methods, such as
+ * `getAttemptNumber`.
+ *
+ * <p>TODO: remove this class when no longer support flink 1.18.
+ */
+public class RuntimeContextAdapter {
+
+ public static int getAttemptNumber(RuntimeContext runtimeContext) {
+ return runtimeContext.getTaskInfo().getAttemptNumber();
+ }
+}
diff --git
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/committer/TieringCommitOperator.java
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/committer/TieringCommitOperator.java
index 9d60c8899..8954ed6f5 100644
---
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/committer/TieringCommitOperator.java
+++
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/committer/TieringCommitOperator.java
@@ -23,6 +23,7 @@ import org.apache.fluss.client.admin.Admin;
import org.apache.fluss.client.metadata.LakeSnapshot;
import org.apache.fluss.config.Configuration;
import org.apache.fluss.exception.LakeTableSnapshotNotExistException;
+import org.apache.fluss.flink.adapter.RuntimeContextAdapter;
import org.apache.fluss.flink.tiering.event.FailedTieringEvent;
import org.apache.fluss.flink.tiering.event.FinishedTieringEvent;
import org.apache.fluss.flink.tiering.event.TieringFailOverEvent;
@@ -132,7 +133,7 @@ public class TieringCommitOperator<WriteResult, Committable>
StreamConfig config,
Output<StreamRecord<CommittableMessage<Committable>>> output) {
super.setup(containingTask, config, output);
- int attemptNumber = getRuntimeContext().getAttemptNumber();
+ int attemptNumber =
RuntimeContextAdapter.getAttemptNumber(getRuntimeContext());
if (attemptNumber > 0) {
LOG.info("Send TieringFailoverEvent, current attempt number: {}",
attemptNumber);
// attempt number is greater than zero, the job must failover
diff --git
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/adapter/StreamOperatorParametersAdapter.java
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/adapter/StreamOperatorParametersAdapter.java
new file mode 100644
index 000000000..e0989cb1e
--- /dev/null
+++
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/adapter/StreamOperatorParametersAdapter.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.flink.adapter;
+
+import org.apache.flink.api.common.operators.MailboxExecutor;
+import org.apache.flink.runtime.operators.coordination.OperatorEventDispatcher;
+import org.apache.flink.streaming.api.graph.StreamConfig;
+import org.apache.flink.streaming.api.operators.Output;
+import org.apache.flink.streaming.api.operators.StreamOperatorParameters;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
+import org.apache.flink.streaming.runtime.tasks.StreamTask;
+
+import java.util.function.Supplier;
+
+/**
+ * Adapter for {@link StreamOperatorParameters} because the constructor is
compatibility in flink
+ * 1.18 and 1.19. However, this constructor only used in test.
+ *
+ * <p>TODO: remove this class when no longer support flink 1.18 and 1.19.
+ */
+public class StreamOperatorParametersAdapter {
+
+ public static <OUT> StreamOperatorParameters<OUT> create(
+ StreamTask<?, ?> containingTask,
+ StreamConfig config,
+ Output<StreamRecord<OUT>> output,
+ Supplier<ProcessingTimeService> processingTimeServiceFactory,
+ OperatorEventDispatcher operatorEventDispatcher,
+ MailboxExecutor mailboxExecutor) {
+ return new StreamOperatorParameters<>(
+ containingTask,
+ config,
+ output,
+ processingTimeServiceFactory,
+ operatorEventDispatcher,
+ mailboxExecutor);
+ }
+}
diff --git
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/committer/TieringCommitOperatorTest.java
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/committer/TieringCommitOperatorTest.java
index 0be879e9f..618d45445 100644
---
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/committer/TieringCommitOperatorTest.java
+++
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/committer/TieringCommitOperatorTest.java
@@ -19,6 +19,7 @@ package org.apache.fluss.flink.tiering.committer;
import org.apache.fluss.client.metadata.LakeSnapshot;
import org.apache.fluss.exception.LakeTableSnapshotNotExistException;
+import org.apache.fluss.flink.adapter.StreamOperatorParametersAdapter;
import org.apache.fluss.flink.tiering.TestingLakeTieringFactory;
import org.apache.fluss.flink.tiering.TestingWriteResult;
import org.apache.fluss.flink.tiering.event.FailedTieringEvent;
@@ -76,7 +77,7 @@ class TieringCommitOperatorTest extends FlinkTestBase {
MockOperatorEventDispatcher mockOperatorEventDispatcher =
new MockOperatorEventDispatcher(mockOperatorEventGateway);
parameters =
- new StreamOperatorParameters<>(
+ StreamOperatorParametersAdapter.create(
new SourceOperatorStreamTask<String>(new
DummyEnvironment()),
new MockStreamConfig(new Configuration(), 1),
new MockOutput<>(new ArrayList<>()),