This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git


The following commit(s) were added to refs/heads/main by this push:
     new 1055e42c4 [flink] Add RuntimeContext adapter for Flink 2.x 
compatibility (#2241)
1055e42c4 is described below

commit 1055e42c40efc019856b81419e918829f2574366
Author: Pei Yu <[email protected]>
AuthorDate: Mon Dec 29 10:19:20 2025 +0800

    [flink] Add RuntimeContext adapter for Flink 2.x compatibility (#2241)
---
 .../fluss/flink/adapter/RuntimeContextAdapter.java | 34 ++++++++++++++
 .../adapter/StreamOperatorParametersAdapter.java   | 53 +++++++++++++++++++++
 .../Flink118TieringCommitOperatorTest.java         | 24 ++++++++++
 .../adapter/StreamOperatorParametersAdapter.java   | 53 +++++++++++++++++++++
 .../Flink119TieringCommitOperatorTest.java         | 24 ++++++++++
 .../Flink22TieringCommitOperatorTest.java          | 24 ++++++++++
 .../fluss/flink/adapter/RuntimeContextAdapter.java | 34 ++++++++++++++
 .../tiering/committer/TieringCommitOperator.java   |  3 +-
 .../adapter/StreamOperatorParametersAdapter.java   | 54 ++++++++++++++++++++++
 .../committer/TieringCommitOperatorTest.java       |  3 +-
 10 files changed, 304 insertions(+), 2 deletions(-)

diff --git 
a/fluss-flink/fluss-flink-1.18/src/main/java/org/apache/fluss/flink/adapter/RuntimeContextAdapter.java
 
b/fluss-flink/fluss-flink-1.18/src/main/java/org/apache/fluss/flink/adapter/RuntimeContextAdapter.java
new file mode 100644
index 000000000..be97b7048
--- /dev/null
+++ 
b/fluss-flink/fluss-flink-1.18/src/main/java/org/apache/fluss/flink/adapter/RuntimeContextAdapter.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.flink.adapter;
+
+import org.apache.flink.api.common.functions.RuntimeContext;
+
+/**
+ * An adapter for Flink {@link RuntimeContext} class. The {@link 
RuntimeContext} class added the
+ * `getJobInfo` and `getTaskInfo` methods in version 1.19 and deprecated many 
methods, such as
+ * `getAttemptNumber`.
+ *
+ * <p>TODO: remove this class when no longer support flink 1.18.
+ */
+public class RuntimeContextAdapter {
+
+    public static int getAttemptNumber(RuntimeContext runtimeContext) {
+        return runtimeContext.getAttemptNumber();
+    }
+}
diff --git 
a/fluss-flink/fluss-flink-1.18/src/test/java/org/apache/fluss/flink/adapter/StreamOperatorParametersAdapter.java
 
b/fluss-flink/fluss-flink-1.18/src/test/java/org/apache/fluss/flink/adapter/StreamOperatorParametersAdapter.java
new file mode 100644
index 000000000..49f7038a4
--- /dev/null
+++ 
b/fluss-flink/fluss-flink-1.18/src/test/java/org/apache/fluss/flink/adapter/StreamOperatorParametersAdapter.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.flink.adapter;
+
+import org.apache.flink.api.common.operators.MailboxExecutor;
+import org.apache.flink.runtime.operators.coordination.OperatorEventDispatcher;
+import org.apache.flink.streaming.api.graph.StreamConfig;
+import org.apache.flink.streaming.api.operators.Output;
+import org.apache.flink.streaming.api.operators.StreamOperatorParameters;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
+import org.apache.flink.streaming.runtime.tasks.StreamTask;
+
+import java.util.function.Supplier;
+
+/**
+ * Adapter for {@link StreamOperatorParameters} because the constructor is 
compatibility in flink
+ * 1.18 and 1.19. However, this constructor only used in test.
+ *
+ * <p>TODO: remove this class when no longer support flink 1.18 and 1.19.
+ */
+public class StreamOperatorParametersAdapter {
+
+    public static <OUT> StreamOperatorParameters<OUT> create(
+            StreamTask<?, ?> containingTask,
+            StreamConfig config,
+            Output<StreamRecord<OUT>> output,
+            Supplier<ProcessingTimeService> processingTimeServiceFactory,
+            OperatorEventDispatcher operatorEventDispatcher,
+            MailboxExecutor mailboxExecutor) {
+        return new StreamOperatorParameters<>(
+                containingTask,
+                config,
+                output,
+                processingTimeServiceFactory,
+                operatorEventDispatcher);
+    }
+}
diff --git 
a/fluss-flink/fluss-flink-1.18/src/test/java/org/apache/fluss/flink/tiering/committer/Flink118TieringCommitOperatorTest.java
 
b/fluss-flink/fluss-flink-1.18/src/test/java/org/apache/fluss/flink/tiering/committer/Flink118TieringCommitOperatorTest.java
new file mode 100644
index 000000000..b3c04505e
--- /dev/null
+++ 
b/fluss-flink/fluss-flink-1.18/src/test/java/org/apache/fluss/flink/tiering/committer/Flink118TieringCommitOperatorTest.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.flink.tiering.committer;
+
+/**
+ * UT for {@link TieringCommitOperator}. Test the compatibility of the 
`getAttemptNumber` method in
+ * flink 1.18.
+ */
+public class Flink118TieringCommitOperatorTest extends 
TieringCommitOperatorTest {}
diff --git 
a/fluss-flink/fluss-flink-1.19/src/test/java/org/apache/fluss/flink/adapter/StreamOperatorParametersAdapter.java
 
b/fluss-flink/fluss-flink-1.19/src/test/java/org/apache/fluss/flink/adapter/StreamOperatorParametersAdapter.java
new file mode 100644
index 000000000..49f7038a4
--- /dev/null
+++ 
b/fluss-flink/fluss-flink-1.19/src/test/java/org/apache/fluss/flink/adapter/StreamOperatorParametersAdapter.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.flink.adapter;
+
+import org.apache.flink.api.common.operators.MailboxExecutor;
+import org.apache.flink.runtime.operators.coordination.OperatorEventDispatcher;
+import org.apache.flink.streaming.api.graph.StreamConfig;
+import org.apache.flink.streaming.api.operators.Output;
+import org.apache.flink.streaming.api.operators.StreamOperatorParameters;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
+import org.apache.flink.streaming.runtime.tasks.StreamTask;
+
+import java.util.function.Supplier;
+
+/**
+ * Adapter for {@link StreamOperatorParameters} because the constructor is 
compatibility in flink
+ * 1.18 and 1.19. However, this constructor only used in test.
+ *
+ * <p>TODO: remove this class when no longer support flink 1.18 and 1.19.
+ */
+public class StreamOperatorParametersAdapter {
+
+    public static <OUT> StreamOperatorParameters<OUT> create(
+            StreamTask<?, ?> containingTask,
+            StreamConfig config,
+            Output<StreamRecord<OUT>> output,
+            Supplier<ProcessingTimeService> processingTimeServiceFactory,
+            OperatorEventDispatcher operatorEventDispatcher,
+            MailboxExecutor mailboxExecutor) {
+        return new StreamOperatorParameters<>(
+                containingTask,
+                config,
+                output,
+                processingTimeServiceFactory,
+                operatorEventDispatcher);
+    }
+}
diff --git 
a/fluss-flink/fluss-flink-1.19/src/test/java/org/apache/fluss/flink/tiering/committer/Flink119TieringCommitOperatorTest.java
 
b/fluss-flink/fluss-flink-1.19/src/test/java/org/apache/fluss/flink/tiering/committer/Flink119TieringCommitOperatorTest.java
new file mode 100644
index 000000000..7fd1a20d0
--- /dev/null
+++ 
b/fluss-flink/fluss-flink-1.19/src/test/java/org/apache/fluss/flink/tiering/committer/Flink119TieringCommitOperatorTest.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.flink.tiering.committer;
+
+/**
+ * UT for {@link TieringCommitOperator}. Test the compatibility of the 
`getAttemptNumber` method in
+ * flink 1.19.
+ */
+public class Flink119TieringCommitOperatorTest extends 
TieringCommitOperatorTest {}
diff --git 
a/fluss-flink/fluss-flink-2.2/src/test/java/org/apache/fluss/flink/tiering/committer/Flink22TieringCommitOperatorTest.java
 
b/fluss-flink/fluss-flink-2.2/src/test/java/org/apache/fluss/flink/tiering/committer/Flink22TieringCommitOperatorTest.java
new file mode 100644
index 000000000..25cbfcc3f
--- /dev/null
+++ 
b/fluss-flink/fluss-flink-2.2/src/test/java/org/apache/fluss/flink/tiering/committer/Flink22TieringCommitOperatorTest.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.flink.tiering.committer;
+
+/**
+ * UT for {@link TieringCommitOperator}. Test the compatibility of the 
`getAttemptNumber` method in
+ * flink 2.2.
+ */
+public class Flink22TieringCommitOperatorTest extends 
TieringCommitOperatorTest {}
diff --git 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/adapter/RuntimeContextAdapter.java
 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/adapter/RuntimeContextAdapter.java
new file mode 100644
index 000000000..f635fbeb6
--- /dev/null
+++ 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/adapter/RuntimeContextAdapter.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.flink.adapter;
+
+import org.apache.flink.api.common.functions.RuntimeContext;
+
+/**
+ * An adapter for Flink {@link RuntimeContext} class. The {@link 
RuntimeContext} class added the
+ * `getJobInfo` and `getTaskInfo` methods in version 1.19 and deprecated many 
methods, such as
+ * `getAttemptNumber`.
+ *
+ * <p>TODO: remove this class when no longer support flink 1.18.
+ */
+public class RuntimeContextAdapter {
+
+    public static int getAttemptNumber(RuntimeContext runtimeContext) {
+        return runtimeContext.getTaskInfo().getAttemptNumber();
+    }
+}
diff --git 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/committer/TieringCommitOperator.java
 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/committer/TieringCommitOperator.java
index 9d60c8899..8954ed6f5 100644
--- 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/committer/TieringCommitOperator.java
+++ 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/committer/TieringCommitOperator.java
@@ -23,6 +23,7 @@ import org.apache.fluss.client.admin.Admin;
 import org.apache.fluss.client.metadata.LakeSnapshot;
 import org.apache.fluss.config.Configuration;
 import org.apache.fluss.exception.LakeTableSnapshotNotExistException;
+import org.apache.fluss.flink.adapter.RuntimeContextAdapter;
 import org.apache.fluss.flink.tiering.event.FailedTieringEvent;
 import org.apache.fluss.flink.tiering.event.FinishedTieringEvent;
 import org.apache.fluss.flink.tiering.event.TieringFailOverEvent;
@@ -132,7 +133,7 @@ public class TieringCommitOperator<WriteResult, Committable>
             StreamConfig config,
             Output<StreamRecord<CommittableMessage<Committable>>> output) {
         super.setup(containingTask, config, output);
-        int attemptNumber = getRuntimeContext().getAttemptNumber();
+        int attemptNumber = 
RuntimeContextAdapter.getAttemptNumber(getRuntimeContext());
         if (attemptNumber > 0) {
             LOG.info("Send TieringFailoverEvent, current attempt number: {}", 
attemptNumber);
             // attempt number is greater than zero, the job must failover
diff --git 
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/adapter/StreamOperatorParametersAdapter.java
 
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/adapter/StreamOperatorParametersAdapter.java
new file mode 100644
index 000000000..e0989cb1e
--- /dev/null
+++ 
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/adapter/StreamOperatorParametersAdapter.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.flink.adapter;
+
+import org.apache.flink.api.common.operators.MailboxExecutor;
+import org.apache.flink.runtime.operators.coordination.OperatorEventDispatcher;
+import org.apache.flink.streaming.api.graph.StreamConfig;
+import org.apache.flink.streaming.api.operators.Output;
+import org.apache.flink.streaming.api.operators.StreamOperatorParameters;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
+import org.apache.flink.streaming.runtime.tasks.StreamTask;
+
+import java.util.function.Supplier;
+
+/**
+ * Adapter for {@link StreamOperatorParameters} because the constructor is 
compatibility in flink
+ * 1.18 and 1.19. However, this constructor only used in test.
+ *
+ * <p>TODO: remove this class when no longer support flink 1.18 and 1.19.
+ */
+public class StreamOperatorParametersAdapter {
+
+    public static <OUT> StreamOperatorParameters<OUT> create(
+            StreamTask<?, ?> containingTask,
+            StreamConfig config,
+            Output<StreamRecord<OUT>> output,
+            Supplier<ProcessingTimeService> processingTimeServiceFactory,
+            OperatorEventDispatcher operatorEventDispatcher,
+            MailboxExecutor mailboxExecutor) {
+        return new StreamOperatorParameters<>(
+                containingTask,
+                config,
+                output,
+                processingTimeServiceFactory,
+                operatorEventDispatcher,
+                mailboxExecutor);
+    }
+}
diff --git 
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/committer/TieringCommitOperatorTest.java
 
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/committer/TieringCommitOperatorTest.java
index 0be879e9f..618d45445 100644
--- 
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/committer/TieringCommitOperatorTest.java
+++ 
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/committer/TieringCommitOperatorTest.java
@@ -19,6 +19,7 @@ package org.apache.fluss.flink.tiering.committer;
 
 import org.apache.fluss.client.metadata.LakeSnapshot;
 import org.apache.fluss.exception.LakeTableSnapshotNotExistException;
+import org.apache.fluss.flink.adapter.StreamOperatorParametersAdapter;
 import org.apache.fluss.flink.tiering.TestingLakeTieringFactory;
 import org.apache.fluss.flink.tiering.TestingWriteResult;
 import org.apache.fluss.flink.tiering.event.FailedTieringEvent;
@@ -76,7 +77,7 @@ class TieringCommitOperatorTest extends FlinkTestBase {
         MockOperatorEventDispatcher mockOperatorEventDispatcher =
                 new MockOperatorEventDispatcher(mockOperatorEventGateway);
         parameters =
-                new StreamOperatorParameters<>(
+                StreamOperatorParametersAdapter.create(
                         new SourceOperatorStreamTask<String>(new 
DummyEnvironment()),
                         new MockStreamConfig(new Configuration(), 1),
                         new MockOutput<>(new ArrayList<>()),

Reply via email to