BIOINSu commented on code in PR #24128:
URL: https://github.com/apache/flink/pull/24128#discussion_r1462818847
##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecTableSourceScan.java:
##########
@@ -105,54 +112,92 @@ protected Transformation<RowData> translateToPlanInternal(
planner.getFlinkContext(),
ShortcutUtils.unwrapTypeFactory(planner));
ScanTableSource.ScanRuntimeProvider provider =
tableSource.getScanRuntimeProvider(ScanRuntimeProviderContext.INSTANCE);
+ final int sourceParallelism = deriveSourceParallelism(provider);
+ sourceParallelismConfigured = isParallelismConfigured(provider);
if (provider instanceof SourceFunctionProvider) {
final SourceFunctionProvider sourceFunctionProvider =
(SourceFunctionProvider) provider;
final SourceFunction<RowData> function =
sourceFunctionProvider.createSourceFunction();
- final Transformation<RowData> transformation =
+ sourceTransform =
createSourceFunctionTransformation(
env,
function,
sourceFunctionProvider.isBounded(),
meta.getName(),
- outputTypeInfo);
- return meta.fill(transformation);
+ outputTypeInfo,
+ sourceParallelism);
+ meta.fill(sourceTransform);
} else if (provider instanceof InputFormatProvider) {
final InputFormat<RowData, ?> inputFormat =
((InputFormatProvider) provider).createInputFormat();
- final Transformation<RowData> transformation =
+ sourceTransform =
createInputFormatTransformation(
env, inputFormat, outputTypeInfo, meta.getName());
- return meta.fill(transformation);
+ sourceTransform.setParallelism(sourceParallelism,
sourceParallelismConfigured);
+ meta.fill(sourceTransform);
} else if (provider instanceof SourceProvider) {
final Source<RowData, ?, ?> source = ((SourceProvider)
provider).createSource();
// TODO: Push down watermark strategy to source scan
- final Transformation<RowData> transformation =
+ sourceTransform =
env.fromSource(
source,
WatermarkStrategy.noWatermarks(),
meta.getName(),
outputTypeInfo)
.getTransformation();
- return meta.fill(transformation);
+ sourceTransform.setParallelism(sourceParallelism,
sourceParallelismConfigured);
+ meta.fill(sourceTransform);
} else if (provider instanceof DataStreamScanProvider) {
- Transformation<RowData> transformation =
+ sourceTransform =
((DataStreamScanProvider) provider)
.produceDataStream(createProviderContext(config),
env)
.getTransformation();
- meta.fill(transformation);
- transformation.setOutputType(outputTypeInfo);
- return transformation;
+ sourceTransform.setParallelism(sourceParallelism,
sourceParallelismConfigured);
Review Comment:
Thanks for the remind. I have changed this to if the provider is instanceof
`SourceFunctionProvider` the parallelism would be set directly. Otherwise,
whether the parallelism of the source is set will depend on whether the
parallelism is configured within the provider.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]