This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new ca4af649cf [flink] Adopt getTaskInfo() when acquiring parallelism info 
(#4583)
ca4af649cf is described below

commit ca4af649cf27f4d280c005cc77bee9c860d50bb1
Author: yunfengzhou-hub <[email protected]>
AuthorDate: Mon Nov 25 20:11:46 2024 +0800

    [flink] Adopt getTaskInfo() when acquiring parallelism info (#4583)
---
 .../paimon/flink/utils/RuntimeContextUtils.java    | 32 ++++++++++++++++++++++
 .../paimon/flink/utils/RuntimeContextUtils.java    | 32 ++++++++++++++++++++++
 .../paimon/flink/utils/RuntimeContextUtils.java    | 32 ++++++++++++++++++++++
 .../paimon/flink/utils/RuntimeContextUtils.java    | 32 ++++++++++++++++++++++
 .../flink/service/QueryExecutorOperator.java       | 10 ++++---
 .../paimon/flink/sink/CommitterOperator.java       |  5 +++-
 .../flink/sink/HashBucketAssignerOperator.java     |  5 ++--
 .../sink/MultiTablesStoreCompactOperator.java      |  7 +++--
 .../paimon/flink/sink/StoreCompactOperator.java    |  7 +++--
 .../paimon/flink/sink/TableWriteOperator.java      |  5 ++--
 .../sink/index/GlobalIndexAssignerOperator.java    |  5 ++--
 .../flink/sink/index/IndexBootstrapOperator.java   |  5 ++--
 .../apache/paimon/flink/sorter/SortOperator.java   |  4 ++-
 .../source/AppendBypassCoordinateOperator.java     |  3 +-
 .../flink/source/BucketUnawareCompactSource.java   |  3 +-
 .../paimon/flink/utils/RuntimeContextUtils.java    | 32 ++++++++++++++++++++++
 .../flink/UnawareBucketAppendOnlyTableITCase.java  |  3 +-
 17 files changed, 201 insertions(+), 21 deletions(-)

diff --git 
a/paimon-flink/paimon-flink-1.15/src/main/java/org/apache/paimon/flink/utils/RuntimeContextUtils.java
 
b/paimon-flink/paimon-flink-1.15/src/main/java/org/apache/paimon/flink/utils/RuntimeContextUtils.java
new file mode 100644
index 0000000000..460fea55ad
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-1.15/src/main/java/org/apache/paimon/flink/utils/RuntimeContextUtils.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.utils;
+
+import org.apache.flink.api.common.functions.RuntimeContext;
+
+/** Utility methods about Flink runtime context to resolve compatibility 
issues. */
+public class RuntimeContextUtils {
+    public static int getNumberOfParallelSubtasks(RuntimeContext context) {
+        return context.getNumberOfParallelSubtasks();
+    }
+
+    public static int getIndexOfThisSubtask(RuntimeContext context) {
+        return context.getIndexOfThisSubtask();
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-1.16/src/main/java/org/apache/paimon/flink/utils/RuntimeContextUtils.java
 
b/paimon-flink/paimon-flink-1.16/src/main/java/org/apache/paimon/flink/utils/RuntimeContextUtils.java
new file mode 100644
index 0000000000..460fea55ad
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-1.16/src/main/java/org/apache/paimon/flink/utils/RuntimeContextUtils.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.utils;
+
+import org.apache.flink.api.common.functions.RuntimeContext;
+
+/** Utility methods about Flink runtime context to resolve compatibility 
issues. */
+public class RuntimeContextUtils {
+    public static int getNumberOfParallelSubtasks(RuntimeContext context) {
+        return context.getNumberOfParallelSubtasks();
+    }
+
+    public static int getIndexOfThisSubtask(RuntimeContext context) {
+        return context.getIndexOfThisSubtask();
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-1.17/src/main/java/org/apache/paimon/flink/utils/RuntimeContextUtils.java
 
b/paimon-flink/paimon-flink-1.17/src/main/java/org/apache/paimon/flink/utils/RuntimeContextUtils.java
new file mode 100644
index 0000000000..460fea55ad
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-1.17/src/main/java/org/apache/paimon/flink/utils/RuntimeContextUtils.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.utils;
+
+import org.apache.flink.api.common.functions.RuntimeContext;
+
+/** Utility methods about Flink runtime context to resolve compatibility 
issues. */
+public class RuntimeContextUtils {
+    public static int getNumberOfParallelSubtasks(RuntimeContext context) {
+        return context.getNumberOfParallelSubtasks();
+    }
+
+    public static int getIndexOfThisSubtask(RuntimeContext context) {
+        return context.getIndexOfThisSubtask();
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/utils/RuntimeContextUtils.java
 
b/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/utils/RuntimeContextUtils.java
new file mode 100644
index 0000000000..460fea55ad
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/utils/RuntimeContextUtils.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.utils;
+
+import org.apache.flink.api.common.functions.RuntimeContext;
+
+/** Utility methods about Flink runtime context to resolve compatibility 
issues. */
+public class RuntimeContextUtils {
+    public static int getNumberOfParallelSubtasks(RuntimeContext context) {
+        return context.getNumberOfParallelSubtasks();
+    }
+
+    public static int getIndexOfThisSubtask(RuntimeContext context) {
+        return context.getIndexOfThisSubtask();
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/service/QueryExecutorOperator.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/service/QueryExecutorOperator.java
index 556c308396..bf0521d550 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/service/QueryExecutorOperator.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/service/QueryExecutorOperator.java
@@ -23,6 +23,7 @@ import org.apache.paimon.data.BinaryString;
 import org.apache.paimon.data.GenericRow;
 import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.disk.IOManager;
+import org.apache.paimon.flink.utils.RuntimeContextUtils;
 import org.apache.paimon.io.DataFileMeta;
 import org.apache.paimon.io.DataFileMetaSerializer;
 import org.apache.paimon.service.network.NetworkUtils;
@@ -77,8 +78,8 @@ public class QueryExecutorOperator extends 
AbstractStreamOperator<InternalRow>
         this.query = ((FileStoreTable) 
table).newLocalTableQuery().withIOManager(ioManager);
         KvQueryServer server =
                 new KvQueryServer(
-                        getRuntimeContext().getIndexOfThisSubtask(),
-                        getRuntimeContext().getNumberOfParallelSubtasks(),
+                        
RuntimeContextUtils.getIndexOfThisSubtask(getRuntimeContext()),
+                        
RuntimeContextUtils.getNumberOfParallelSubtasks(getRuntimeContext()),
                         NetworkUtils.findHostAddress(),
                         Collections.singletonList(0).iterator(),
                         1,
@@ -96,8 +97,9 @@ public class QueryExecutorOperator extends 
AbstractStreamOperator<InternalRow>
         this.output.collect(
                 new StreamRecord<>(
                         GenericRow.of(
-                                
getRuntimeContext().getNumberOfParallelSubtasks(),
-                                getRuntimeContext().getIndexOfThisSubtask(),
+                                
RuntimeContextUtils.getNumberOfParallelSubtasks(
+                                        getRuntimeContext()),
+                                
RuntimeContextUtils.getIndexOfThisSubtask(getRuntimeContext()),
                                 BinaryString.fromString(address.getHostName()),
                                 address.getPort())));
     }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CommitterOperator.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CommitterOperator.java
index 2ec90b8c6c..021a5db413 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CommitterOperator.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CommitterOperator.java
@@ -18,6 +18,7 @@
 
 package org.apache.paimon.flink.sink;
 
+import org.apache.paimon.flink.utils.RuntimeContextUtils;
 import org.apache.paimon.utils.Preconditions;
 
 import org.apache.flink.runtime.state.StateInitializationContext;
@@ -129,7 +130,9 @@ public class CommitterOperator<CommitT, GlobalCommitT> 
extends AbstractStreamOpe
         super.initializeState(context);
 
         Preconditions.checkArgument(
-                !forceSingleParallelism || 
getRuntimeContext().getNumberOfParallelSubtasks() == 1,
+                !forceSingleParallelism
+                        || 
RuntimeContextUtils.getNumberOfParallelSubtasks(getRuntimeContext())
+                                == 1,
                 "Committer Operator parallelism in paimon MUST be one.");
 
         this.currentWatermark = Long.MIN_VALUE;
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/HashBucketAssignerOperator.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/HashBucketAssignerOperator.java
index 70fac7a83e..0c101c6d1e 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/HashBucketAssignerOperator.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/HashBucketAssignerOperator.java
@@ -19,6 +19,7 @@
 package org.apache.paimon.flink.sink;
 
 import org.apache.paimon.flink.ProcessRecordAttributesUtil;
+import org.apache.paimon.flink.utils.RuntimeContextUtils;
 import org.apache.paimon.index.BucketAssigner;
 import org.apache.paimon.index.HashBucketAssigner;
 import org.apache.paimon.index.SimpleHashBucketAssigner;
@@ -76,8 +77,8 @@ public class HashBucketAssignerOperator<T> extends 
AbstractStreamOperator<Tuple2
                 StateUtils.getSingleValueFromState(
                         context, "commit_user_state", String.class, 
initialCommitUser);
 
-        int numberTasks = getRuntimeContext().getNumberOfParallelSubtasks();
-        int taskId = getRuntimeContext().getIndexOfThisSubtask();
+        int numberTasks = 
RuntimeContextUtils.getNumberOfParallelSubtasks(getRuntimeContext());
+        int taskId = 
RuntimeContextUtils.getIndexOfThisSubtask(getRuntimeContext());
         long targetRowNum = table.coreOptions().dynamicBucketTargetRowNum();
         this.assigner =
                 overwrite
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/MultiTablesStoreCompactOperator.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/MultiTablesStoreCompactOperator.java
index 7cb5d30c2f..8a1d3a02df 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/MultiTablesStoreCompactOperator.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/MultiTablesStoreCompactOperator.java
@@ -22,6 +22,7 @@ import org.apache.paimon.CoreOptions;
 import org.apache.paimon.catalog.Catalog;
 import org.apache.paimon.catalog.Identifier;
 import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.flink.utils.RuntimeContextUtils;
 import org.apache.paimon.io.DataFileMeta;
 import org.apache.paimon.io.DataFileMetaSerializer;
 import org.apache.paimon.options.Options;
@@ -109,8 +110,10 @@ public class MultiTablesStoreCompactOperator
                                 ChannelComputer.select(
                                                 partition,
                                                 bucket,
-                                                
getRuntimeContext().getNumberOfParallelSubtasks())
-                                        == 
getRuntimeContext().getIndexOfThisSubtask());
+                                                
RuntimeContextUtils.getNumberOfParallelSubtasks(
+                                                        getRuntimeContext()))
+                                        == 
RuntimeContextUtils.getIndexOfThisSubtask(
+                                                getRuntimeContext()));
 
         tables = new HashMap<>();
         writes = new HashMap<>();
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreCompactOperator.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreCompactOperator.java
index 9b152a81ca..ac10345bc4 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreCompactOperator.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreCompactOperator.java
@@ -20,6 +20,7 @@ package org.apache.paimon.flink.sink;
 
 import org.apache.paimon.CoreOptions;
 import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.flink.utils.RuntimeContextUtils;
 import org.apache.paimon.io.DataFileMeta;
 import org.apache.paimon.io.DataFileMetaSerializer;
 import org.apache.paimon.options.Options;
@@ -92,8 +93,10 @@ public class StoreCompactOperator extends 
PrepareCommitOperator<RowData, Committ
                                 ChannelComputer.select(
                                                 partition,
                                                 bucket,
-                                                
getRuntimeContext().getNumberOfParallelSubtasks())
-                                        == 
getRuntimeContext().getIndexOfThisSubtask());
+                                                
RuntimeContextUtils.getNumberOfParallelSubtasks(
+                                                        getRuntimeContext()))
+                                        == 
RuntimeContextUtils.getIndexOfThisSubtask(
+                                                getRuntimeContext()));
         write =
                 storeSinkWriteProvider.provide(
                         table,
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/TableWriteOperator.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/TableWriteOperator.java
index 67b4720e29..32fcdd03bd 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/TableWriteOperator.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/TableWriteOperator.java
@@ -21,6 +21,7 @@ package org.apache.paimon.flink.sink;
 import org.apache.paimon.annotation.VisibleForTesting;
 import org.apache.paimon.flink.ProcessRecordAttributesUtil;
 import org.apache.paimon.flink.sink.StoreSinkWriteState.StateValueFilter;
+import org.apache.paimon.flink.utils.RuntimeContextUtils;
 import org.apache.paimon.options.Options;
 import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.table.sink.ChannelComputer;
@@ -58,14 +59,14 @@ public abstract class TableWriteOperator<IN> extends 
PrepareCommitOperator<IN, C
         super.initializeState(context);
 
         boolean containLogSystem = containLogSystem();
-        int numTasks = getRuntimeContext().getNumberOfParallelSubtasks();
+        int numTasks = 
RuntimeContextUtils.getNumberOfParallelSubtasks(getRuntimeContext());
         StateValueFilter stateFilter =
                 (tableName, partition, bucket) -> {
                     int task =
                             containLogSystem
                                     ? ChannelComputer.select(bucket, numTasks)
                                     : ChannelComputer.select(partition, 
bucket, numTasks);
-                    return task == getRuntimeContext().getIndexOfThisSubtask();
+                    return task == 
RuntimeContextUtils.getIndexOfThisSubtask(getRuntimeContext());
                 };
 
         state = createState(context, stateFilter);
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/index/GlobalIndexAssignerOperator.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/index/GlobalIndexAssignerOperator.java
index 7fee3f45f3..99cce07fdc 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/index/GlobalIndexAssignerOperator.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/index/GlobalIndexAssignerOperator.java
@@ -22,6 +22,7 @@ import org.apache.paimon.crosspartition.GlobalIndexAssigner;
 import org.apache.paimon.crosspartition.KeyPartOrRow;
 import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.disk.IOManager;
+import org.apache.paimon.flink.utils.RuntimeContextUtils;
 import org.apache.paimon.table.Table;
 
 import org.apache.flink.api.java.tuple.Tuple2;
@@ -59,8 +60,8 @@ public class GlobalIndexAssignerOperator
         assigner.open(
                 computeManagedMemory(this),
                 ioManager,
-                getRuntimeContext().getNumberOfParallelSubtasks(),
-                getRuntimeContext().getIndexOfThisSubtask(),
+                
RuntimeContextUtils.getNumberOfParallelSubtasks(getRuntimeContext()),
+                RuntimeContextUtils.getIndexOfThisSubtask(getRuntimeContext()),
                 this::collect);
     }
 
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/index/IndexBootstrapOperator.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/index/IndexBootstrapOperator.java
index 501e35dff4..5c8ba8f944 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/index/IndexBootstrapOperator.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/index/IndexBootstrapOperator.java
@@ -21,6 +21,7 @@ package org.apache.paimon.flink.sink.index;
 import org.apache.paimon.crosspartition.IndexBootstrap;
 import org.apache.paimon.crosspartition.KeyPartOrRow;
 import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.flink.utils.RuntimeContextUtils;
 import org.apache.paimon.utils.SerializableFunction;
 
 import org.apache.flink.api.java.tuple.Tuple2;
@@ -50,8 +51,8 @@ public class IndexBootstrapOperator<T> extends 
AbstractStreamOperator<Tuple2<Key
     public void initializeState(StateInitializationContext context) throws 
Exception {
         super.initializeState(context);
         bootstrap.bootstrap(
-                getRuntimeContext().getNumberOfParallelSubtasks(),
-                getRuntimeContext().getIndexOfThisSubtask(),
+                
RuntimeContextUtils.getNumberOfParallelSubtasks(getRuntimeContext()),
+                RuntimeContextUtils.getIndexOfThisSubtask(getRuntimeContext()),
                 this::collect);
     }
 
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sorter/SortOperator.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sorter/SortOperator.java
index d4d5dd7416..b6847125fb 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sorter/SortOperator.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sorter/SortOperator.java
@@ -23,6 +23,7 @@ import org.apache.paimon.compression.CompressOptions;
 import org.apache.paimon.data.BinaryRow;
 import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.disk.IOManager;
+import org.apache.paimon.flink.utils.RuntimeContextUtils;
 import org.apache.paimon.options.MemorySize;
 import org.apache.paimon.sort.BinaryExternalSortBuffer;
 import org.apache.paimon.types.RowType;
@@ -79,7 +80,8 @@ public class SortOperator extends 
TableStreamOperator<InternalRow>
     public void open() throws Exception {
         super.open();
         initBuffer();
-        if (sinkParallelism != 
getRuntimeContext().getNumberOfParallelSubtasks()) {
+        if (sinkParallelism
+                != 
RuntimeContextUtils.getNumberOfParallelSubtasks(getRuntimeContext())) {
             throw new IllegalArgumentException(
                     "Please ensure that the runtime parallelism of the sink 
matches the initial configuration "
                             + "to avoid potential issues with skewed range 
partitioning.");
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/AppendBypassCoordinateOperator.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/AppendBypassCoordinateOperator.java
index 668aa24c14..45090f7b68 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/AppendBypassCoordinateOperator.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/AppendBypassCoordinateOperator.java
@@ -20,6 +20,7 @@ package org.apache.paimon.flink.source;
 
 import org.apache.paimon.append.UnawareAppendCompactionTask;
 import org.apache.paimon.append.UnawareAppendTableCompactionCoordinator;
+import org.apache.paimon.flink.utils.RuntimeContextUtils;
 import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.utils.ExecutorUtils;
 
@@ -67,7 +68,7 @@ public class AppendBypassCoordinateOperator<CommitT>
     public void open() throws Exception {
         super.open();
         checkArgument(
-                getRuntimeContext().getNumberOfParallelSubtasks() == 1,
+                
RuntimeContextUtils.getNumberOfParallelSubtasks(getRuntimeContext()) == 1,
                 "Compaction Coordinator parallelism in paimon MUST be one.");
         long intervalMs = 
table.coreOptions().continuousDiscoveryInterval().toMillis();
         this.compactTasks = new LinkedBlockingQueue<>();
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/BucketUnawareCompactSource.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/BucketUnawareCompactSource.java
index e768c717dd..79ee827fe6 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/BucketUnawareCompactSource.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/BucketUnawareCompactSource.java
@@ -21,6 +21,7 @@ package org.apache.paimon.flink.source;
 import org.apache.paimon.append.UnawareAppendCompactionTask;
 import org.apache.paimon.append.UnawareAppendTableCompactionCoordinator;
 import org.apache.paimon.flink.sink.CompactionTaskTypeInfo;
+import org.apache.paimon.flink.utils.RuntimeContextUtils;
 import org.apache.paimon.predicate.Predicate;
 import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.table.source.EndOfScanException;
@@ -87,7 +88,7 @@ public class BucketUnawareCompactSource extends 
RichSourceFunction<UnawareAppend
         compactionCoordinator =
                 new UnawareAppendTableCompactionCoordinator(table, streaming, 
filter);
         Preconditions.checkArgument(
-                this.getRuntimeContext().getNumberOfParallelSubtasks() == 1,
+                
RuntimeContextUtils.getNumberOfParallelSubtasks(getRuntimeContext()) == 1,
                 "Compaction Operator parallelism in paimon MUST be one.");
     }
 
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/RuntimeContextUtils.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/RuntimeContextUtils.java
new file mode 100644
index 0000000000..34e0d041b6
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/RuntimeContextUtils.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.utils;
+
+import org.apache.flink.api.common.functions.RuntimeContext;
+
+/** Utility methods about Flink runtime context to resolve compatibility 
issues. */
+public class RuntimeContextUtils {
+    public static int getNumberOfParallelSubtasks(RuntimeContext context) {
+        return context.getTaskInfo().getNumberOfParallelSubtasks();
+    }
+
+    public static int getIndexOfThisSubtask(RuntimeContext context) {
+        return context.getTaskInfo().getIndexOfThisSubtask();
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/UnawareBucketAppendOnlyTableITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/UnawareBucketAppendOnlyTableITCase.java
index cb323542d4..f6dfb1b230 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/UnawareBucketAppendOnlyTableITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/UnawareBucketAppendOnlyTableITCase.java
@@ -20,6 +20,7 @@ package org.apache.paimon.flink;
 
 import org.apache.paimon.Snapshot;
 import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.flink.utils.RuntimeContextUtils;
 import org.apache.paimon.fs.Path;
 import org.apache.paimon.fs.local.LocalFileIO;
 import org.apache.paimon.reader.RecordReader;
@@ -395,7 +396,7 @@ public class UnawareBucketAppendOnlyTableITCase extends 
CatalogITCaseBase {
 
         @Override
         public void run(SourceContext<Integer> sourceContext) throws Exception 
{
-            int taskId = getRuntimeContext().getIndexOfThisSubtask();
+            int taskId = 
RuntimeContextUtils.getIndexOfThisSubtask(getRuntimeContext());
             // wait some time in parallelism #2,
             // so that it does not commit in the same checkpoint with 
parallelism #1
             int waitCount = (taskId == 0 ? 0 : 10);

Reply via email to