This is an automated email from the ASF dual-hosted git repository. lzljs3620320 pushed a commit to branch release-0.8 in repository https://gitbox.apache.org/repos/asf/paimon.git
commit 78a8509372a745b41fa460903cfdb16764c302b8 Author: Aitozi <[email protected]> AuthorDate: Wed May 22 17:15:51 2024 +0800 [flink] Adjust the setMaxParallelism api call in QueryService (#3377) --- .../apache/paimon/flink/service/QueryFileMonitor.java | 5 +++++ .../org/apache/paimon/flink/service/QueryService.java | 19 +++++++++++-------- 2 files changed, 16 insertions(+), 8 deletions(-) diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/service/QueryFileMonitor.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/service/QueryFileMonitor.java index 4cd34ab26..43cf654e9 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/service/QueryFileMonitor.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/service/QueryFileMonitor.java @@ -142,5 +142,10 @@ public class QueryFileMonitor extends RichSourceFunction<InternalRow> { int bucket = row.getInt(2); return ChannelComputer.select(partition, bucket, numChannels); } + + @Override + public String toString() { + return "FileMonitorChannelComputer{" + "numChannels=" + numChannels + '}'; + } } } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/service/QueryService.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/service/QueryService.java index 8a4814d0a..2066a1513 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/service/QueryService.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/service/QueryService.java @@ -29,6 +29,7 @@ import org.apache.flink.api.common.RuntimeExecutionMode; import org.apache.flink.configuration.ExecutionOptions; import org.apache.flink.configuration.ReadableConfig; import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.datastream.DataStreamSink; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import static org.apache.paimon.flink.sink.FlinkStreamPartitioner.partition; @@ -55,13 +56,15 @@ public class QueryService { stream = partition(stream, QueryFileMonitor.createChannelComputer(), parallelism); QueryExecutorOperator executorOperator = new QueryExecutorOperator(table); - stream.transform( - "Executor", - InternalTypeInfo.fromRowType(QueryExecutorOperator.outputType()), - executorOperator) - .setParallelism(parallelism) - .addSink(new QueryAddressRegister(table)) - .setParallelism(1) - .setMaxParallelism(1); + DataStreamSink<?> sink = + stream.transform( + "Executor", + InternalTypeInfo.fromRowType(QueryExecutorOperator.outputType()), + executorOperator) + .setParallelism(parallelism) + .addSink(new QueryAddressRegister(table)) + .setParallelism(1); + + sink.getTransformation().setMaxParallelism(1); } }
