BIOINSu commented on code in PR #24128:
URL: https://github.com/apache/flink/pull/24128#discussion_r1464649772


##########
flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/TableScanTest.xml:
##########
@@ -727,4 +727,60 @@ Calc(select=[ts, a, b], where=[>(a, 1)], 
changelogMode=[I,UB,UA,D])
 ]]>
     </Resource>
   </TestCase>
+
+  <TestCase name="testSetParallelismForSource">
+    <Resource name="sql">
+      <![CDATA[SELECT * FROM src WHERE the_month > 1]]>
+    </Resource>
+       <Resource name="ast">
+      <![CDATA[
+LogicalProject(the_month=[$0], area=[$1], product_id=[$2])
++- LogicalFilter(condition=[>($0, 1)])
++- LogicalTableScan(table=[[default_catalog, default_database, src]])
+]]>
+    </Resource>
+    <Resource name="optimized exec plan">
+      <![CDATA[
+Calc(select=[the_month, area, product_id], where=[(the_month > 1)])
++- TableSourceScan(table=[[default_catalog, default_database, src, 
filter=[]]], fields=[the_month, area, product_id])
+]]>
+       </Resource>
+       <Resource name="transformation">
+      <![CDATA[
+OneInputTransformation{id=3, name='Calc[4]', outputType=ROW<`the_month` INT, 
`area` STRING, `product_id` INT>(org.apache.flink.table.data.RowData, 
org.apache.flink.table.runtime.typeutils.RowDataSerializer), parallelism=-1}
++- SourceTransformationWrapper{id=2, name='ChangeToDefaultParallel', 
outputType=ROW<`the_month` INT, `area` STRING, `product_id` 
INT>(org.apache.flink.table.data.RowData, 
org.apache.flink.table.runtime.typeutils.RowDataSerializer), parallelism=-1}
+       +- LegacySourceTransformation{id=1, name='src[3]', 
outputType=ROW<`the_month` INT, `area` STRING, `product_id` 
INT>(org.apache.flink.table.data.RowData, 
org.apache.flink.table.runtime.typeutils.RowDataSerializer), parallelism=3}
+]]>
+       </Resource>
+  </TestCase>
+  <TestCase name="testSetParallelismForChangelogSource">
+       <Resource name="sql">
+      <![CDATA[SELECT * FROM src WHERE the_month > 1]]>
+       </Resource>
+       <Resource name="ast">
+      <![CDATA[
+LogicalProject(the_month=[$0], area=[$1], product_id=[$2])
++- LogicalFilter(condition=[>($0, 1)])
+   +- LogicalTableScan(table=[[default_catalog, default_database, src]])
+]]>
+       </Resource>
+       <Resource name="optimized exec plan">
+      <![CDATA[
+Calc(select=[the_month, area, product_id], where=[(the_month > 1)])
++- ChangelogNormalize(key=[product_id])
+   +- Exchange(distribution=[hash[product_id]])
+      +- TableSourceScan(table=[[default_catalog, default_database, src, 
filter=[]]], fields=[the_month, area, product_id])
+]]>
+       </Resource>
+       <Resource name="transformation">
+      <![CDATA[
+OneInputTransformation{id=6, name='Calc[8]', outputType=ROW<`the_month` INT, 
`area` STRING, `product_id` INT NOT NULL>(org.apache.flink.table.data.RowData, 
org.apache.flink.table.runtime.typeutils.RowDataSerializer), parallelism=-1}
+       +- OneInputTransformation{id=5, name='ChangelogNormalize[7]', 
outputType=ROW<`the_month` INT, `area` STRING, `product_id` INT NOT 
NULL>(org.apache.flink.table.data.RowData, 
org.apache.flink.table.runtime.typeutils.RowDataSerializer), parallelism=-1}
+               +- PartitionTransformation{id=4, name='Exchange[6]', 
outputType=ROW<`the_month` INT, `area` STRING, `product_id` INT NOT 
NULL>(org.apache.flink.table.data.RowData, 
org.apache.flink.table.runtime.typeutils.RowDataSerializer), parallelism=-1}
+                       +- PartitionTransformation{id=3, name='Partitioner[5]', 
outputType=ROW<`the_month` INT, `area` STRING, `product_id` INT NOT 
NULL>(org.apache.flink.table.data.RowData, 
org.apache.flink.table.runtime.typeutils.RowDataSerializer), parallelism=-1}

Review Comment:
   Yes, I have run real jobs and verified there will be only one hash shuffle. 
The reasons are as follows:
   1. `PartitionTransformationTranslator` would add `PartitionTransformation` 
as a virtual partition node in stream graph.
   2. `StreamGraph.java#addEdgeInternal ` would use the first partitioner as 
the real partitioner if there are multiple consecutive 
`PartitionTransformation`. Details are as follows:
   ```
   ...
   else if (virtualPartitionNodes.containsKey(upStreamVertexID)) {
       int virtualId = upStreamVertexID;
       upStreamVertexID = virtualPartitionNodes.get(virtualId).f0;
       if (partitioner == null) {
           partitioner = virtualPartitionNodes.get(virtualId).f1;
       }
       exchangeMode = virtualPartitionNodes.get(virtualId).f2;
       addEdgeInternal(
               upStreamVertexID,
               downStreamVertexID,
               typeNumber,
               partitioner,
               outputNames,
               outputTag,
               exchangeMode,
               intermediateDataSetId);
   }
   ...
   ```
   Insert-Only source real job : 
   ![下载 
(2)](https://github.com/apache/flink/assets/23656175/c4f0b699-4f97-48f0-930e-b44a80bae3d4)
   
   Upsert source real job : 
   ![下载 
(3)](https://github.com/apache/flink/assets/23656175/d82ff578-bdaf-4fa8-aa4f-ace966bdc5bd)
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to