We are implementing a range partitioner for Flink sink shuffling [1]. One key piece is RowDataComparator for Flink RowData. Would love to get some feedback on a few decisions.
1. Comparators for Flink `RowData` type. Flink already has the `RowDataWrapper` class that can wrap a `RowData` as a `StructLike`. With `StructLike`, Iceberg `Comparators` can be used to compare two structs. Then we don't need to implement `RowDataComparators` that look very similar to struct `Comparators`. This is also related to the transformation decision below. We don't need to re-implement all the transform functions with Flink data types. 2. Use SortOrder or just natural orders (with null first). SortOrder supports transform functions (like bucket, hours, truncate). The implementation will be a lot simpler if we only need to implement natural order without transformations from SortOrder. But I do think the transformations (like days, bucket) in SortOrder are quite useful. In addition to the current transforms, we plan to add a `relative_hour` transform for event time partitioned tables. Flink range shuffle calculates traffic statistics across keys (like number of observed rows per event hour). Ideally the traffic distributions should be relatively stable. Hence relative hour (hour 0 meaning current hour) can result in the stable statistics for traffic weight across the relative event hours. 3. I am thinking about adding a `StructTransformation` class in the iceberg-api module. It can be implemented similar to `StructProjection` where transform functions are applied lazily during get. public static StructTransformation create(Schema schema, Map<Integer, Transform<?, ?>> idToTransforms) 4. To represent the transformed struct, we need a transformed schema. I am thinking about adding a transform method to TypeUtil. It will return a transformed schema with field types updated to the result types of the transforms. This can look a bit weird with field types changed. public static Schema transform(Schema schema, Map<Integer, Transform<?, ?>> idToTransforms) ========================= This is how everything is put together for RowDataComparator. Schema projected = TypeUtil.select(schema, sortFieldIds); // sortFieldIds set is calculated from SortOrder Map<Integer, Transform<?, ?>> idToTransforms) idToTransforms = // calculated from SortOrder Schema sortSchema = TypeUtil.transform(projected, idToTransforms); StructLike leftSortKey = structTransformation.wrap(structProjection.wrap(rowDataWrapper.wrap(leftRowData))) StructLike rightSortKey = structTransformation.wrap(structProjection.wrap(rowDataWrapper.wrap(leftRowData))) Comparators.forType(sortSchema).compare(leftSortKey, rightSortKey) Thanks, Steven [1] https://docs.google.com/document/d/13N8cMqPi-ZPSKbkXGOBMPOzbv2Fua59j8bIjjtxLWqo/