stevenzwu commented on a change in pull request #2898:
URL: https://github.com/apache/iceberg/pull/2898#discussion_r697916071
##########
File path: flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java
##########
@@ -369,6 +373,27 @@ private String operatorName(String suffix) {
default:
throw new RuntimeException("Unrecognized write.distribution-mode: "
+ writeMode);
}
+
+ if (keySelector != null) {
+ return input.keyBy(keySelector);
+ }
+ return input;
+ }
+
+ private KeySelector<RowData, String> getKeySelector(List<Integer>
equalityFieldIds, PartitionSpec partitionSpec,
+ Schema schema, RowType rowType) {
+ boolean hasPrimaryKey = equalityFieldIds != null &&
!equalityFieldIds.isEmpty();
+ boolean hasPartitionKey = partitionSpec != null &&
!partitionSpec.isUnpartitioned();
+
+ if (hasPrimaryKey && hasPartitionKey) {
+ return new CombinedKeySelector(partitionSpec, equalityFieldIds,
schema, rowType);
Review comment:
regardless the answer to the question on "partition key is a subset of
equality key", we probably don't need `CombinedKeySelector `
* yes. then equality key shuffle is equivalent to the combined key
* no. we only need the equality key shuffle to maintain the order. combing
equality key and partition key shuffle won't bring any benefit of reducing the
number of data files (that hash distribution on partition key brings)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]