We are implementing a range partitioner for Flink sink shuffling [1]. One
key piece is RowDataComparator for Flink RowData. Would love to get some
feedback on a few decisions.

1. Comparators for Flink `RowData` type. Flink already has the
`RowDataWrapper` class that can wrap a `RowData` as a `StructLike`. With
`StructLike`, Iceberg `Comparators` can be used to compare two structs.
Then we don't need to implement `RowDataComparators` that look very similar
to struct `Comparators`. This is also related to the transformation
decision below. We don't need to re-implement all the transform functions
with Flink data types.

2. Use SortOrder or just natural orders (with null first). SortOrder
supports transform functions (like bucket, hours, truncate). The
implementation will be a lot simpler if we only need to implement natural
order without transformations from SortOrder. But I do think the
transformations (like days, bucket) in SortOrder are quite useful.

In addition to the current transforms, we plan to add a `relative_hour`
transform for event time partitioned tables. Flink range shuffle calculates
traffic statistics across keys (like number of observed rows per event
hour). Ideally the traffic distributions should be relatively stable. Hence
relative hour (hour 0 meaning current hour) can result in the stable
statistics for traffic weight across the relative event hours.

3. I am thinking about adding a `StructTransformation` class in the
iceberg-api module. It can be implemented similar to `StructProjection`
where transform functions are applied lazily during get.

public static StructTransformation create(Schema schema, Map<Integer,
Transform<?, ?>> idToTransforms)

4. To represent the transformed struct, we need a transformed schema. I am
thinking about adding a transform method to TypeUtil. It will return a
transformed schema with field types updated to the result types of the
transforms. This can look a bit weird with field types changed.

public static Schema transform(Schema schema, Map<Integer, Transform<?, ?>>
idToTransforms)

=========================
This is how everything is put together for RowDataComparator.

Schema projected = TypeUtil.select(schema, sortFieldIds); // sortFieldIds
set is calculated from SortOrder
Map<Integer, Transform<?, ?>> idToTransforms) idToTransforms = //
calculated from SortOrder
Schema sortSchema = TypeUtil.transform(projected, idToTransforms);

StructLike leftSortKey =
structTransformation.wrap(structProjection.wrap(rowDataWrapper.wrap(leftRowData)))
StructLike rightSortKey =
structTransformation.wrap(structProjection.wrap(rowDataWrapper.wrap(leftRowData)))

Comparators.forType(sortSchema).compare(leftSortKey, rightSortKey)

Thanks,
Steven

[1]
https://docs.google.com/document/d/13N8cMqPi-ZPSKbkXGOBMPOzbv2Fua59j8bIjjtxLWqo/

Reply via email to