leonardBang commented on a change in pull request #11853:
URL: https://github.com/apache/flink/pull/11853#discussion_r414634883
##########
File path:
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/FileSystemTableFactory.java
##########
@@ -71,6 +71,12 @@
.withDescription("The default partition name in case
the dynamic partition" +
" column value is null/empty string");
+ public static final ConfigOption<Boolean> SINK_SHUFFLE_BY_PARTITION =
key("sink.shuffle-by-partition.enable")
+ .booleanType()
+ .defaultValue(false)
+ .withDescription("Before sink, can shuffle by dynamic
partition fields to sink parallelisms," +
+ " this can greatly reduce the number of
files. But will lead to data skew too.");
+
Review comment:
How about this ?"The option to enable shuffle data by dynamic partition
fields in sink phase, this can greatly reduce the number of file for filesystem
sink but may lead data skew, the default value is disabled."
##########
File path:
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamExecSinkRule.scala
##########
@@ -53,15 +53,20 @@ class StreamExecSinkRule extends ConverterRule(
val dynamicPartIndices =
dynamicPartFields.map(partitionSink.getTableSchema.getFieldNames.indexOf(_))
- if (partitionSink.configurePartitionGrouping(false)) {
- throw new TableException("Partition grouping in stream mode is
not supported yet!")
- }
+ val shuffleEnable = sinkNode
+ .catalogTable
+ .getProperties
+ .get(FileSystemTableFactory.SINK_SHUFFLE_BY_PARTITION.key())
- if (!partitionSink.isInstanceOf[DataStreamTableSink[_]]) {
Review comment:
`DataStreamTableSink` is a special Type, this change will make all
stream sink nodes have chance to use the shuffle enable config?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]