This is an automated email from the ASF dual-hosted git repository.
philo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new c6936a1ea2 [GLUTEN-9538][Flink] Refine comments for Flink classes
modifications (#9539)
c6936a1ea2 is described below
commit c6936a1ea22e4d71420a336bec84052545bd8a70
Author: shuai.xu <[email protected]>
AuthorDate: Wed May 7 15:26:57 2025 +0800
[GLUTEN-9538][Flink] Refine comments for Flink classes modifications (#9539)
---
.../flink/table/planner/plan/nodes/exec/common/CommonExecSink.java | 4 ++--
.../flink/table/planner/plan/nodes/exec/stream/StreamExecCalc.java | 4 ++--
.../planner/plan/nodes/exec/stream/StreamExecWatermarkAssigner.java | 4 ++--
.../src/main/java/org/apache/flink/client/StreamGraphTranslator.java | 4 ++--
.../java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java | 2 ++
.../runtime/translators/LegacySourceTransformationTranslator.java | 4 ++--
.../streaming/runtime/translators/SinkTransformationTranslator.java | 4 ++--
.../streaming/runtime/translators/SourceTransformationTranslator.java | 4 ++--
.../src/main/java/org/apache/gluten/util/PlanNodeIdGenerator.java | 2 +-
9 files changed, 17 insertions(+), 15 deletions(-)
diff --git
a/gluten-flink/planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSink.java
b/gluten-flink/planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSink.java
index 561075f151..84cee3cf89 100644
---
a/gluten-flink/planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSink.java
+++
b/gluten-flink/planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSink.java
@@ -574,7 +574,7 @@ public abstract class CommonExecSink extends
ExecNodeBase<Object>
if (rowtimeFieldIndex == -1) {
return inputTransform;
}
- /// These codes are changed for gluten
+ // --- Begin Gluten-specific code changes ---
return ExecNodeUtil.createOneInputTransformation(
inputTransform,
createTransformationMeta(
@@ -593,7 +593,7 @@ public abstract class CommonExecSink extends
ExecNodeBase<Object>
inputTransform.getOutputType(),
sinkParallelism,
sinkParallelismConfigured);
- /// end gluten
+ // --- End Gluten-specific code changes ---
}
private InternalTypeInfo<RowData> getInputTypeInfo() {
diff --git
a/gluten-flink/planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecCalc.java
b/gluten-flink/planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecCalc.java
index 640460f3c9..143dd888cc 100644
---
a/gluten-flink/planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecCalc.java
+++
b/gluten-flink/planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecCalc.java
@@ -111,7 +111,7 @@ public class StreamExecCalc extends CommonExecCalc
implements StreamExecNode<Row
final Transformation<RowData> inputTransform =
(Transformation<RowData>) inputEdge.translateToPlan(planner);
- /// These codes are changed for gluten
+ // --- Begin Gluten-specific code changes ---
io.github.zhztheplayer.velox4j.type.RowType inputType =
(io.github.zhztheplayer.velox4j.type.RowType)
LogicalTypeConverter.toVLType(inputEdge.getOutputType());
@@ -145,6 +145,6 @@ public class StreamExecCalc extends CommonExecCalc
implements StreamExecNode<Row
InternalTypeInfo.of(getOutputType()),
inputTransform.getParallelism(),
false);
- /// end gluten
+ // --- End Gluten-specific code changes ---
}
}
diff --git
a/gluten-flink/planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecWatermarkAssigner.java
b/gluten-flink/planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecWatermarkAssigner.java
index d3cbc0f66d..dd88a84061 100644
---
a/gluten-flink/planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecWatermarkAssigner.java
+++
b/gluten-flink/planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecWatermarkAssigner.java
@@ -112,7 +112,7 @@ public class StreamExecWatermarkAssigner extends
ExecNodeBase<RowData>
final Transformation<RowData> inputTransform =
(Transformation<RowData>) inputEdge.translateToPlan(planner);
- /// These codes are changed for gluten
+ // --- Begin Gluten-specific code changes ---
final long idleTimeout =
config.get(ExecutionConfigOptions.TABLE_EXEC_SOURCE_IDLE_TIMEOUT).toMillis();
@@ -133,6 +133,6 @@ public class StreamExecWatermarkAssigner extends
ExecNodeBase<RowData>
InternalTypeInfo.of(getOutputType()),
inputTransform.getParallelism(),
false);
- /// end gluten
+ // --- End Gluten-specific code changes ---
}
}
diff --git
a/gluten-flink/runtime/src/main/java/org/apache/flink/client/StreamGraphTranslator.java
b/gluten-flink/runtime/src/main/java/org/apache/flink/client/StreamGraphTranslator.java
index 5ec72613b3..ce8fd23436 100644
---
a/gluten-flink/runtime/src/main/java/org/apache/flink/client/StreamGraphTranslator.java
+++
b/gluten-flink/runtime/src/main/java/org/apache/flink/client/StreamGraphTranslator.java
@@ -86,7 +86,7 @@ public class StreamGraphTranslator implements
FlinkPipelineTranslator {
return pipeline instanceof StreamGraph;
}
- /// These codes are added for Gluten
+ // --- Begin Gluten-specific code changes ---
private JobGraph mergeGlutenOperators(JobGraph jobGraph) {
for (JobVertex vertex : jobGraph.getVertices()) {
StreamConfig streamConfig = new
StreamConfig(vertex.getConfiguration());
@@ -184,5 +184,5 @@ public class StreamGraphTranslator implements
FlinkPipelineTranslator {
}
return null;
}
- /// end gluten
+ // --- End Gluten-specific code changes ---
}
diff --git
a/gluten-flink/runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
b/gluten-flink/runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
index 101ec77371..59cc94d7e0 100644
---
a/gluten-flink/runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
+++
b/gluten-flink/runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
@@ -524,7 +524,9 @@ public class TaskManagerRunner implements FatalErrorHandler
{
FileSystem.initialize(configuration, pluginManager);
StateChangelogStorageLoader.initialize(pluginManager);
+ // --- Begin Gluten-specific code changes ---
Velox4j.initialize();
+ // --- End Gluten-specific code changes ---
int exitCode;
Throwable throwable = null;
diff --git
a/gluten-flink/runtime/src/main/java/org/apache/flink/streaming/runtime/translators/LegacySourceTransformationTranslator.java
b/gluten-flink/runtime/src/main/java/org/apache/flink/streaming/runtime/translators/LegacySourceTransformationTranslator.java
index ed0c252dd3..6b0893c8b8 100644
---
a/gluten-flink/runtime/src/main/java/org/apache/flink/streaming/runtime/translators/LegacySourceTransformationTranslator.java
+++
b/gluten-flink/runtime/src/main/java/org/apache/flink/streaming/runtime/translators/LegacySourceTransformationTranslator.java
@@ -80,7 +80,7 @@ public class LegacySourceTransformationTranslator<OUT>
Function userFunction = transformation.getOperator().getUserFunction();
StreamOperatorFactory<OUT> operatorFactory;
String namePrefix = "";
- /// These codes are changed for gluten
+ // --- Begin Gluten-specific code changes ---
if (userFunction instanceof DataGeneratorSource) {
RowType outputType = (RowType) LogicalTypeConverter.toVLType(
((InternalTypeInfo)
transformation.getOutputType()).toLogicalType());
@@ -100,7 +100,7 @@ public class LegacySourceTransformationTranslator<OUT>
} else {
operatorFactory = transformation.getOperatorFactory();
}
- /// end gluten
+ // --- End Gluten-specific code changes ---
streamGraph.addLegacySource(
transformationId,
slotSharingGroup,
diff --git
a/gluten-flink/runtime/src/main/java/org/apache/flink/streaming/runtime/translators/SinkTransformationTranslator.java
b/gluten-flink/runtime/src/main/java/org/apache/flink/streaming/runtime/translators/SinkTransformationTranslator.java
index 9e08f8a1ed..a5d4c0364d 100644
---
a/gluten-flink/runtime/src/main/java/org/apache/flink/streaming/runtime/translators/SinkTransformationTranslator.java
+++
b/gluten-flink/runtime/src/main/java/org/apache/flink/streaming/runtime/translators/SinkTransformationTranslator.java
@@ -177,7 +177,7 @@ public class SinkTransformationTranslator<Input, Output>
if (sink instanceof SupportsCommitter) {
addCommittingTopology(sink, prewritten);
} else {
- /// These code are changed for gluten
+ // --- Begin Gluten-specific code changes ---
if (sink instanceof DiscardingSink) {
RowType outputType = (RowType)
LogicalTypeConverter.toVLType(
((InternalTypeInfo)
transformation.getOutputType()).toLogicalType());
@@ -222,7 +222,7 @@ public class SinkTransformationTranslator<Input, Output>
false,
sink instanceof
SupportsConcurrentExecutionAttempts);
}
- /// end gluten
+ // --- End Gluten-specific code changes ---
}
getSinkTransformations(sizeBefore).forEach(context::transform);
diff --git
a/gluten-flink/runtime/src/main/java/org/apache/flink/streaming/runtime/translators/SourceTransformationTranslator.java
b/gluten-flink/runtime/src/main/java/org/apache/flink/streaming/runtime/translators/SourceTransformationTranslator.java
index 20ce2e70a5..67f436c3b9 100644
---
a/gluten-flink/runtime/src/main/java/org/apache/flink/streaming/runtime/translators/SourceTransformationTranslator.java
+++
b/gluten-flink/runtime/src/main/java/org/apache/flink/streaming/runtime/translators/SourceTransformationTranslator.java
@@ -82,7 +82,7 @@ public class SourceTransformationTranslator<OUT, SplitT
extends SourceSplit, Enu
final int transformationId = transformation.getId();
final ExecutionConfig executionConfig =
streamGraph.getExecutionConfig();
- /// These codes are changed for gluten
+ // --- Begin Gluten-specific code changes ---
if
(transformation.getSource().getClass().getSimpleName().equals("NexmarkSource"))
{
RowType outputType = (RowType) LogicalTypeConverter.toVLType(
((InternalTypeInfo)
transformation.getOutputType()).toLogicalType());
@@ -126,7 +126,7 @@ public class SourceTransformationTranslator<OUT, SplitT
extends SourceSplit, Enu
transformation.getOutputType(),
"Source: " + transformation.getName());
}
- /// end gluten
+ // --- End Gluten-specific code changes ---
final int parallelism =
transformation.getParallelism() !=
ExecutionConfig.PARALLELISM_DEFAULT
diff --git
a/gluten-flink/runtime/src/main/java/org/apache/gluten/util/PlanNodeIdGenerator.java
b/gluten-flink/runtime/src/main/java/org/apache/gluten/util/PlanNodeIdGenerator.java
index 76aae49ede..576f2cc9eb 100644
---
a/gluten-flink/runtime/src/main/java/org/apache/gluten/util/PlanNodeIdGenerator.java
+++
b/gluten-flink/runtime/src/main/java/org/apache/gluten/util/PlanNodeIdGenerator.java
@@ -23,6 +23,6 @@ public class PlanNodeIdGenerator {
private static final AtomicInteger ID_COUNTER = new AtomicInteger(0);
public static String newId() {
- return String.valueOf(ID_COUNTER.incrementAndGet());
+ return String.valueOf(ID_COUNTER.getAndIncrement());
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]