This all sounds pretty reasonable to me, although I'd use `StructType` rather than `Schema` in most places so this is more reusable. I definitely agree about reusing the existing tooling for `StructLike` rather than re-implementing. I'd also recommend using sort order so you can use transforms. Otherwise you'll just have to add it later.
Also, check out how `PartitionKey` works because I think that's basically the same thing as `StructTransformation`, just with a different name. On Thu, Jun 1, 2023 at 3:31 AM Péter Váry <peter.vary.apa...@gmail.com> wrote: > Good point. > Stick to the conventions then > > Steven Wu <stevenz...@gmail.com> ezt írta (időpont: 2023. máj. 31., Sze, > 17:14): > >> Peter, >> >> I also thought about that. Didn't go with `StructTransformation.schema()`, >> because I was hoping to stick with the `StructLike` interface which >> doesn't expose `schema()`. Trying to mimic the behavior of ` >> StructProjection`, which doesn't expose `schema()`. Projected schema >> can be extracted via `TypeUtil.project(Schema schema, Set<Integer> >> fieldIds)`. >> >> Thanks, >> Steven >> >> On Wed, May 31, 2023 at 1:18 AM Péter Váry <peter.vary.apa...@gmail.com> >> wrote: >> >>> > 4. To represent the transformed struct, we need a transformed schema. >>> I am thinking about adding a transform method to TypeUtil. It will return a >>> transformed schema with field types updated to the result types of the >>> transforms. This can look a bit weird with field types changed. >>> > >>> > public static Schema transform(Schema schema, Map<Integer, >>> Transform<?, ?>> idToTransforms) >>> >>> Wouldn't it make sense to get the Schema for the `StructTransformation` >>> object >>> instead, like `StructTransformation.schema()`? >>> >>> Steven Wu <stevenz...@gmail.com> ezt írta (időpont: 2023. máj. 31., >>> Sze, 7:19): >>> >>>> We are implementing a range partitioner for Flink sink shuffling [1]. >>>> One key piece is RowDataComparator for Flink RowData. Would love to get >>>> some feedback on a few decisions. >>>> >>>> 1. Comparators for Flink `RowData` type. Flink already has the >>>> `RowDataWrapper` class that can wrap a `RowData` as a `StructLike`. With >>>> `StructLike`, Iceberg `Comparators` can be used to compare two structs. >>>> Then we don't need to implement `RowDataComparators` that look very similar >>>> to struct `Comparators`. This is also related to the transformation >>>> decision below. We don't need to re-implement all the transform functions >>>> with Flink data types. >>>> >>>> 2. Use SortOrder or just natural orders (with null first). SortOrder >>>> supports transform functions (like bucket, hours, truncate). The >>>> implementation will be a lot simpler if we only need to implement natural >>>> order without transformations from SortOrder. But I do think the >>>> transformations (like days, bucket) in SortOrder are quite useful. >>>> >>>> In addition to the current transforms, we plan to add a `relative_hour` >>>> transform for event time partitioned tables. Flink range shuffle calculates >>>> traffic statistics across keys (like number of observed rows per event >>>> hour). Ideally the traffic distributions should be relatively stable. Hence >>>> relative hour (hour 0 meaning current hour) can result in the stable >>>> statistics for traffic weight across the relative event hours. >>>> >>>> 3. I am thinking about adding a `StructTransformation` class in the >>>> iceberg-api module. It can be implemented similar to `StructProjection` >>>> where transform functions are applied lazily during get. >>>> >>>> public static StructTransformation create(Schema schema, Map<Integer, >>>> Transform<?, ?>> idToTransforms) >>>> >>>> 4. To represent the transformed struct, we need a transformed schema. I >>>> am thinking about adding a transform method to TypeUtil. It will return a >>>> transformed schema with field types updated to the result types of the >>>> transforms. This can look a bit weird with field types changed. >>>> >>>> public static Schema transform(Schema schema, Map<Integer, Transform<?, >>>> ?>> idToTransforms) >>>> >>>> ========================= >>>> This is how everything is put together for RowDataComparator. >>>> >>>> Schema projected = TypeUtil.select(schema, sortFieldIds); // >>>> sortFieldIds set is calculated from SortOrder >>>> Map<Integer, Transform<?, ?>> idToTransforms) idToTransforms = // >>>> calculated from SortOrder >>>> Schema sortSchema = TypeUtil.transform(projected, idToTransforms); >>>> >>>> StructLike leftSortKey = >>>> structTransformation.wrap(structProjection.wrap(rowDataWrapper.wrap(leftRowData))) >>>> StructLike rightSortKey = >>>> structTransformation.wrap(structProjection.wrap(rowDataWrapper.wrap(leftRowData))) >>>> >>>> Comparators.forType(sortSchema).compare(leftSortKey, rightSortKey) >>>> >>>> Thanks, >>>> Steven >>>> >>>> [1] >>>> https://docs.google.com/document/d/13N8cMqPi-ZPSKbkXGOBMPOzbv2Fua59j8bIjjtxLWqo/ >>>> >>> -- Ryan Blue Tabular