Checking with the community again, if anyone explored this before. Thanks.
On Fri, Mar 17, 2023 at 1:56 PM Elkhan Dadashov <elkhan.dadas...@gmail.com> wrote: > Dear Flink developers, > > Wanted to check, if there is a way to control the parallelism of > auto-generated Flink operators of the FlinkSQL job graph? > > In Java API, it is possible to have full control of the parallelism of > each operator. > > On FlinkSQL some source and sink connectors support `source.parallelism` > and `sink.parallelism`, and the rest can be set via `default.parallelism`. > > In this particular scenario, enchancedEvents gets chained to the > KafkaSource operator, it can be separated by calling disableChain() on > KafkaSource stream on Kafka connector side, but even with disabled > chaining on the source stream, `enhancedEvents` operator parallelism is > still set to 5 (same as Kafka Source operator parallelism), instead of 3 > (which is default parallelism) : > > ```sql > SET 'parallelism.default' = '3'; > > CREATE TABLE input_kafka_table > ( > ... > ts AS TO_TIMESTAMP_LTZ(CAST(`timestamp` AS BIGINT),3), > WATERMARK FOR ts AS ts - INTERVAL '1' MINUTE > ) WITH ( > 'connector' = 'kafka', > 'source.parallelism' = '5' // this is supported by cutomization of > kafka connector > ... > ); > > CREATE TEMPORARY VIEW enhancedEvents AS ( > SELECT x, y > FROM input_kafka_table, LATERAL TABLE(udf.doIt(x, y) > ); > > CREATE TABLE other_table_source (...) WITH(...); > CREATE TABLE other_table_sink (...) WITH(...); > > BEGIN STATEMENT SET; > INSERT into enhancedEventsSink (Select * from enhancedEvents); > INSERT into other_table_sink (Select z from other_table_source ); > END; > ``` > > Is there a way to force override parallelism of auto-generated operators > for FlinkSQL pipeline? > > Or is this expected behavior of some operator's parallelism not assigned > from default parallelism but from another operator's parallelism? > > Want to understand if this is a bug or intended behavior. > > Thank you. > >