zhangjun0x01 commented on code in PR #584:
URL: https://github.com/apache/flink-table-store/pull/584#discussion_r1132181472
##########
flink-table-store-flink/flink-table-store-flink-common/src/main/java/org/apache/flink/table/store/connector/source/TableStoreSource.java:
##########
@@ -162,15 +164,47 @@ public ScanRuntimeProvider
getScanRuntimeProvider(ScanContext scanContext) {
.withProjection(projectFields)
.withPredicate(predicate)
.withLimit(limit)
- .withParallelism(
- Options.fromMap(table.schema().options())
-
.get(FlinkConnectorOptions.SCAN_PARALLELISM))
+ .withParallelism(inferParallelism(table, predicate,
limit, streaming))
.withWatermarkStrategy(watermarkStrategy);
return new TableStoreDataStreamScanProvider(
!streaming, env -> sourceBuilder.withEnv(env).build());
}
+ private Integer inferParallelism(
+ FileStoreTable table, Predicate predicate, Long limitCount,
boolean streaming) {
+ Options options = Options.fromMap(this.table.schema().options());
+ Integer parallelism =
options.get(FlinkConnectorOptions.SCAN_PARALLELISM);
+
+ if (options.get(FlinkConnectorOptions.INFER_SCAN_PARALLELISM)) {
+ // for streaming mode, set the default parallelism to the bucket
number.
+ if (streaming) {
+ parallelism = options.get(CoreOptions.BUCKET);
+ } else {
+ int splitSize =
table.newScan().withFilter(predicate).plan().splits.size();
Review Comment:
My idea is that `split` is related to the data size of table. If we use
bucket size, I am worried that the data size of each bucket may be difficult to
balance (if there is data skew, this situation will be more serious), so
whether we can add a `totalDataSize` for snapshot,then we use `totalDataSize /
1G` to infer parallelism? Or the divisor can be specified by the user,
depending on how much data the user wants each read operator to read?
In addition, the `totalDataSize` of snapshot is similar to
`totalRecordCount`, which is also a metric of snapshot.
what do you think about?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]