This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new d3f920b22 [flink] Adjust the setMaxParallelism api call in 
QueryService (#3377)
d3f920b22 is described below

commit d3f920b22910c7fe6fbc84ef787a68fc385d5fed
Author: Aitozi <[email protected]>
AuthorDate: Wed May 22 17:15:51 2024 +0800

    [flink] Adjust the setMaxParallelism api call in QueryService (#3377)
---
 .../apache/paimon/flink/service/QueryFileMonitor.java |  5 +++++
 .../org/apache/paimon/flink/service/QueryService.java | 19 +++++++++++--------
 2 files changed, 16 insertions(+), 8 deletions(-)

diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/service/QueryFileMonitor.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/service/QueryFileMonitor.java
index 4cd34ab26..43cf654e9 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/service/QueryFileMonitor.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/service/QueryFileMonitor.java
@@ -142,5 +142,10 @@ public class QueryFileMonitor extends 
RichSourceFunction<InternalRow> {
             int bucket = row.getInt(2);
             return ChannelComputer.select(partition, bucket, numChannels);
         }
+
+        @Override
+        public String toString() {
+            return "FileMonitorChannelComputer{" + "numChannels=" + 
numChannels + '}';
+        }
     }
 }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/service/QueryService.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/service/QueryService.java
index 03531d757..bd433fe0f 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/service/QueryService.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/service/QueryService.java
@@ -29,6 +29,7 @@ import org.apache.flink.api.common.RuntimeExecutionMode;
 import org.apache.flink.configuration.ExecutionOptions;
 import org.apache.flink.configuration.ReadableConfig;
 import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSink;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 
 import static org.apache.paimon.flink.sink.FlinkStreamPartitioner.partition;
@@ -55,13 +56,15 @@ public class QueryService {
         stream = partition(stream, QueryFileMonitor.createChannelComputer(), 
parallelism);
 
         QueryExecutorOperator executorOperator = new 
QueryExecutorOperator(table);
-        stream.transform(
-                        "Executor",
-                        
InternalTypeInfo.fromRowType(QueryExecutorOperator.outputType()),
-                        executorOperator)
-                .setParallelism(parallelism)
-                .addSink(new QueryAddressRegister(table))
-                .setParallelism(1)
-                .setMaxParallelism(1);
+        DataStreamSink<?> sink =
+                stream.transform(
+                                "Executor",
+                                
InternalTypeInfo.fromRowType(QueryExecutorOperator.outputType()),
+                                executorOperator)
+                        .setParallelism(parallelism)
+                        .addSink(new QueryAddressRegister(table))
+                        .setParallelism(1);
+
+        sink.getTransformation().setMaxParallelism(1);
     }
 }

Reply via email to