This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new d3f920b22 [flink] Adjust the setMaxParallelism api call in
QueryService (#3377)
d3f920b22 is described below
commit d3f920b22910c7fe6fbc84ef787a68fc385d5fed
Author: Aitozi <[email protected]>
AuthorDate: Wed May 22 17:15:51 2024 +0800
[flink] Adjust the setMaxParallelism api call in QueryService (#3377)
---
.../apache/paimon/flink/service/QueryFileMonitor.java | 5 +++++
.../org/apache/paimon/flink/service/QueryService.java | 19 +++++++++++--------
2 files changed, 16 insertions(+), 8 deletions(-)
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/service/QueryFileMonitor.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/service/QueryFileMonitor.java
index 4cd34ab26..43cf654e9 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/service/QueryFileMonitor.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/service/QueryFileMonitor.java
@@ -142,5 +142,10 @@ public class QueryFileMonitor extends
RichSourceFunction<InternalRow> {
int bucket = row.getInt(2);
return ChannelComputer.select(partition, bucket, numChannels);
}
+
+ @Override
+ public String toString() {
+ return "FileMonitorChannelComputer{" + "numChannels=" +
numChannels + '}';
+ }
}
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/service/QueryService.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/service/QueryService.java
index 03531d757..bd433fe0f 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/service/QueryService.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/service/QueryService.java
@@ -29,6 +29,7 @@ import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.configuration.ExecutionOptions;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import static org.apache.paimon.flink.sink.FlinkStreamPartitioner.partition;
@@ -55,13 +56,15 @@ public class QueryService {
stream = partition(stream, QueryFileMonitor.createChannelComputer(),
parallelism);
QueryExecutorOperator executorOperator = new
QueryExecutorOperator(table);
- stream.transform(
- "Executor",
-
InternalTypeInfo.fromRowType(QueryExecutorOperator.outputType()),
- executorOperator)
- .setParallelism(parallelism)
- .addSink(new QueryAddressRegister(table))
- .setParallelism(1)
- .setMaxParallelism(1);
+ DataStreamSink<?> sink =
+ stream.transform(
+ "Executor",
+
InternalTypeInfo.fromRowType(QueryExecutorOperator.outputType()),
+ executorOperator)
+ .setParallelism(parallelism)
+ .addSink(new QueryAddressRegister(table))
+ .setParallelism(1);
+
+ sink.getTransformation().setMaxParallelism(1);
}
}