This is an automated email from the ASF dual-hosted git repository.

philo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new c6936a1ea2 [GLUTEN-9538][Flink] Refine comments for Flink classes 
modifications (#9539)
c6936a1ea2 is described below

commit c6936a1ea22e4d71420a336bec84052545bd8a70
Author: shuai.xu <[email protected]>
AuthorDate: Wed May 7 15:26:57 2025 +0800

    [GLUTEN-9538][Flink] Refine comments for Flink classes modifications (#9539)
---
 .../flink/table/planner/plan/nodes/exec/common/CommonExecSink.java    | 4 ++--
 .../flink/table/planner/plan/nodes/exec/stream/StreamExecCalc.java    | 4 ++--
 .../planner/plan/nodes/exec/stream/StreamExecWatermarkAssigner.java   | 4 ++--
 .../src/main/java/org/apache/flink/client/StreamGraphTranslator.java  | 4 ++--
 .../java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java | 2 ++
 .../runtime/translators/LegacySourceTransformationTranslator.java     | 4 ++--
 .../streaming/runtime/translators/SinkTransformationTranslator.java   | 4 ++--
 .../streaming/runtime/translators/SourceTransformationTranslator.java | 4 ++--
 .../src/main/java/org/apache/gluten/util/PlanNodeIdGenerator.java     | 2 +-
 9 files changed, 17 insertions(+), 15 deletions(-)

diff --git 
a/gluten-flink/planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSink.java
 
b/gluten-flink/planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSink.java
index 561075f151..84cee3cf89 100644
--- 
a/gluten-flink/planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSink.java
+++ 
b/gluten-flink/planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSink.java
@@ -574,7 +574,7 @@ public abstract class CommonExecSink extends 
ExecNodeBase<Object>
         if (rowtimeFieldIndex == -1) {
             return inputTransform;
         }
-        /// These codes are changed for gluten
+        // --- Begin Gluten-specific code changes ---
         return ExecNodeUtil.createOneInputTransformation(
                 inputTransform,
                 createTransformationMeta(
@@ -593,7 +593,7 @@ public abstract class CommonExecSink extends 
ExecNodeBase<Object>
                 inputTransform.getOutputType(),
                 sinkParallelism,
                 sinkParallelismConfigured);
-        /// end gluten
+        // --- End Gluten-specific code changes ---
     }
 
     private InternalTypeInfo<RowData> getInputTypeInfo() {
diff --git 
a/gluten-flink/planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecCalc.java
 
b/gluten-flink/planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecCalc.java
index 640460f3c9..143dd888cc 100644
--- 
a/gluten-flink/planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecCalc.java
+++ 
b/gluten-flink/planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecCalc.java
@@ -111,7 +111,7 @@ public class StreamExecCalc extends CommonExecCalc 
implements StreamExecNode<Row
         final Transformation<RowData> inputTransform =
                 (Transformation<RowData>) inputEdge.translateToPlan(planner);
 
-        /// These codes are changed for gluten
+        // --- Begin Gluten-specific code changes ---
         io.github.zhztheplayer.velox4j.type.RowType inputType =
                 (io.github.zhztheplayer.velox4j.type.RowType)
                         
LogicalTypeConverter.toVLType(inputEdge.getOutputType());
@@ -145,6 +145,6 @@ public class StreamExecCalc extends CommonExecCalc 
implements StreamExecNode<Row
                 InternalTypeInfo.of(getOutputType()),
                 inputTransform.getParallelism(),
                 false);
-        /// end gluten
+        // --- End Gluten-specific code changes ---
     }
 }
diff --git 
a/gluten-flink/planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecWatermarkAssigner.java
 
b/gluten-flink/planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecWatermarkAssigner.java
index d3cbc0f66d..dd88a84061 100644
--- 
a/gluten-flink/planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecWatermarkAssigner.java
+++ 
b/gluten-flink/planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecWatermarkAssigner.java
@@ -112,7 +112,7 @@ public class StreamExecWatermarkAssigner extends 
ExecNodeBase<RowData>
         final Transformation<RowData> inputTransform =
                 (Transformation<RowData>) inputEdge.translateToPlan(planner);
 
-        /// These codes are changed for gluten
+        // --- Begin Gluten-specific code changes ---
         final long idleTimeout =
                 
config.get(ExecutionConfigOptions.TABLE_EXEC_SOURCE_IDLE_TIMEOUT).toMillis();
 
@@ -133,6 +133,6 @@ public class StreamExecWatermarkAssigner extends 
ExecNodeBase<RowData>
                 InternalTypeInfo.of(getOutputType()),
                 inputTransform.getParallelism(),
                 false);
-        /// end gluten
+        // --- End Gluten-specific code changes ---
     }
 }
diff --git 
a/gluten-flink/runtime/src/main/java/org/apache/flink/client/StreamGraphTranslator.java
 
b/gluten-flink/runtime/src/main/java/org/apache/flink/client/StreamGraphTranslator.java
index 5ec72613b3..ce8fd23436 100644
--- 
a/gluten-flink/runtime/src/main/java/org/apache/flink/client/StreamGraphTranslator.java
+++ 
b/gluten-flink/runtime/src/main/java/org/apache/flink/client/StreamGraphTranslator.java
@@ -86,7 +86,7 @@ public class StreamGraphTranslator implements 
FlinkPipelineTranslator {
         return pipeline instanceof StreamGraph;
     }
 
-    /// These codes are added for Gluten
+    // --- Begin Gluten-specific code changes ---
     private JobGraph mergeGlutenOperators(JobGraph jobGraph) {
         for (JobVertex vertex : jobGraph.getVertices()) {
             StreamConfig streamConfig = new 
StreamConfig(vertex.getConfiguration());
@@ -184,5 +184,5 @@ public class StreamGraphTranslator implements 
FlinkPipelineTranslator {
         }
         return null;
     }
-    /// end gluten
+    // --- End Gluten-specific code changes ---
 }
diff --git 
a/gluten-flink/runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
 
b/gluten-flink/runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
index 101ec77371..59cc94d7e0 100644
--- 
a/gluten-flink/runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
+++ 
b/gluten-flink/runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
@@ -524,7 +524,9 @@ public class TaskManagerRunner implements FatalErrorHandler 
{
         FileSystem.initialize(configuration, pluginManager);
 
         StateChangelogStorageLoader.initialize(pluginManager);
+        // --- Begin Gluten-specific code changes ---
         Velox4j.initialize();
+        // --- End Gluten-specific code changes ---
 
         int exitCode;
         Throwable throwable = null;
diff --git 
a/gluten-flink/runtime/src/main/java/org/apache/flink/streaming/runtime/translators/LegacySourceTransformationTranslator.java
 
b/gluten-flink/runtime/src/main/java/org/apache/flink/streaming/runtime/translators/LegacySourceTransformationTranslator.java
index ed0c252dd3..6b0893c8b8 100644
--- 
a/gluten-flink/runtime/src/main/java/org/apache/flink/streaming/runtime/translators/LegacySourceTransformationTranslator.java
+++ 
b/gluten-flink/runtime/src/main/java/org/apache/flink/streaming/runtime/translators/LegacySourceTransformationTranslator.java
@@ -80,7 +80,7 @@ public class LegacySourceTransformationTranslator<OUT>
         Function userFunction = transformation.getOperator().getUserFunction();
         StreamOperatorFactory<OUT> operatorFactory;
         String namePrefix = "";
-        /// These codes are changed for gluten
+        // --- Begin Gluten-specific code changes ---
         if (userFunction instanceof DataGeneratorSource) {
             RowType outputType = (RowType) LogicalTypeConverter.toVLType(
                     ((InternalTypeInfo) 
transformation.getOutputType()).toLogicalType());
@@ -100,7 +100,7 @@ public class LegacySourceTransformationTranslator<OUT>
         } else {
             operatorFactory = transformation.getOperatorFactory();
         }
-        /// end gluten
+        // --- End Gluten-specific code changes ---
         streamGraph.addLegacySource(
                 transformationId,
                 slotSharingGroup,
diff --git 
a/gluten-flink/runtime/src/main/java/org/apache/flink/streaming/runtime/translators/SinkTransformationTranslator.java
 
b/gluten-flink/runtime/src/main/java/org/apache/flink/streaming/runtime/translators/SinkTransformationTranslator.java
index 9e08f8a1ed..a5d4c0364d 100644
--- 
a/gluten-flink/runtime/src/main/java/org/apache/flink/streaming/runtime/translators/SinkTransformationTranslator.java
+++ 
b/gluten-flink/runtime/src/main/java/org/apache/flink/streaming/runtime/translators/SinkTransformationTranslator.java
@@ -177,7 +177,7 @@ public class SinkTransformationTranslator<Input, Output>
             if (sink instanceof SupportsCommitter) {
                 addCommittingTopology(sink, prewritten);
             } else {
-                /// These code are changed for gluten
+                // --- Begin Gluten-specific code changes ---
                 if (sink instanceof DiscardingSink) {
                     RowType outputType = (RowType) 
LogicalTypeConverter.toVLType(
                             ((InternalTypeInfo) 
transformation.getOutputType()).toLogicalType());
@@ -222,7 +222,7 @@ public class SinkTransformationTranslator<Input, Output>
                             false,
                             sink instanceof 
SupportsConcurrentExecutionAttempts);
                 }
-                /// end gluten
+                // --- End Gluten-specific code changes ---
             }
 
             getSinkTransformations(sizeBefore).forEach(context::transform);
diff --git 
a/gluten-flink/runtime/src/main/java/org/apache/flink/streaming/runtime/translators/SourceTransformationTranslator.java
 
b/gluten-flink/runtime/src/main/java/org/apache/flink/streaming/runtime/translators/SourceTransformationTranslator.java
index 20ce2e70a5..67f436c3b9 100644
--- 
a/gluten-flink/runtime/src/main/java/org/apache/flink/streaming/runtime/translators/SourceTransformationTranslator.java
+++ 
b/gluten-flink/runtime/src/main/java/org/apache/flink/streaming/runtime/translators/SourceTransformationTranslator.java
@@ -82,7 +82,7 @@ public class SourceTransformationTranslator<OUT, SplitT 
extends SourceSplit, Enu
         final int transformationId = transformation.getId();
         final ExecutionConfig executionConfig = 
streamGraph.getExecutionConfig();
 
-        /// These codes are changed for gluten
+        // --- Begin Gluten-specific code changes ---
         if 
(transformation.getSource().getClass().getSimpleName().equals("NexmarkSource")) 
{
             RowType outputType = (RowType) LogicalTypeConverter.toVLType(
                     ((InternalTypeInfo) 
transformation.getOutputType()).toLogicalType());
@@ -126,7 +126,7 @@ public class SourceTransformationTranslator<OUT, SplitT 
extends SourceSplit, Enu
                     transformation.getOutputType(),
                     "Source: " + transformation.getName());
         }
-        /// end gluten
+        // --- End Gluten-specific code changes ---
 
         final int parallelism =
                 transformation.getParallelism() != 
ExecutionConfig.PARALLELISM_DEFAULT
diff --git 
a/gluten-flink/runtime/src/main/java/org/apache/gluten/util/PlanNodeIdGenerator.java
 
b/gluten-flink/runtime/src/main/java/org/apache/gluten/util/PlanNodeIdGenerator.java
index 76aae49ede..576f2cc9eb 100644
--- 
a/gluten-flink/runtime/src/main/java/org/apache/gluten/util/PlanNodeIdGenerator.java
+++ 
b/gluten-flink/runtime/src/main/java/org/apache/gluten/util/PlanNodeIdGenerator.java
@@ -23,6 +23,6 @@ public class PlanNodeIdGenerator {
     private static final AtomicInteger ID_COUNTER = new AtomicInteger(0);
 
     public static String newId() {
-        return String.valueOf(ID_COUNTER.incrementAndGet());
+        return String.valueOf(ID_COUNTER.getAndIncrement());
     }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to