This all sounds pretty reasonable to me, although I'd use `StructType`
rather than `Schema` in most places so this is more reusable. I definitely
agree about reusing the existing tooling for `StructLike` rather than
re-implementing. I'd also recommend using sort order so you can use
transforms. Otherwise you'll just have to add it later.

Also, check out how `PartitionKey` works because I think that's basically
the same thing as `StructTransformation`, just with a different name.

On Thu, Jun 1, 2023 at 3:31 AM Péter Váry <peter.vary.apa...@gmail.com>
wrote:

> Good point.
> Stick to the conventions then
>
> Steven Wu <stevenz...@gmail.com> ezt írta (időpont: 2023. máj. 31., Sze,
> 17:14):
>
>> Peter,
>>
>> I also thought about that. Didn't go with `StructTransformation.schema()`,
>> because I was hoping to stick with the `StructLike` interface which
>> doesn't expose `schema()`. Trying to mimic the behavior of `
>> StructProjection`, which doesn't expose  `schema()`. Projected schema
>> can be extracted via `TypeUtil.project(Schema schema, Set<Integer>
>> fieldIds)`.
>>
>> Thanks,
>> Steven
>>
>> On Wed, May 31, 2023 at 1:18 AM Péter Váry <peter.vary.apa...@gmail.com>
>> wrote:
>>
>>> > 4. To represent the transformed struct, we need a transformed schema.
>>> I am thinking about adding a transform method to TypeUtil. It will return a
>>> transformed schema with field types updated to the result types of the
>>> transforms. This can look a bit weird with field types changed.
>>> >
>>> > public static Schema transform(Schema schema, Map<Integer,
>>> Transform<?, ?>> idToTransforms)
>>>
>>> Wouldn't it make sense to get the Schema for the `StructTransformation` 
>>> object
>>> instead, like `StructTransformation.schema()`?
>>>
>>> Steven Wu <stevenz...@gmail.com> ezt írta (időpont: 2023. máj. 31.,
>>> Sze, 7:19):
>>>
>>>> We are implementing a range partitioner for Flink sink shuffling [1].
>>>> One key piece is RowDataComparator for Flink RowData. Would love to get
>>>> some feedback on a few decisions.
>>>>
>>>> 1. Comparators for Flink `RowData` type. Flink already has the
>>>> `RowDataWrapper` class that can wrap a `RowData` as a `StructLike`. With
>>>> `StructLike`, Iceberg `Comparators` can be used to compare two structs.
>>>> Then we don't need to implement `RowDataComparators` that look very similar
>>>> to struct `Comparators`. This is also related to the transformation
>>>> decision below. We don't need to re-implement all the transform functions
>>>> with Flink data types.
>>>>
>>>> 2. Use SortOrder or just natural orders (with null first). SortOrder
>>>> supports transform functions (like bucket, hours, truncate). The
>>>> implementation will be a lot simpler if we only need to implement natural
>>>> order without transformations from SortOrder. But I do think the
>>>> transformations (like days, bucket) in SortOrder are quite useful.
>>>>
>>>> In addition to the current transforms, we plan to add a `relative_hour`
>>>> transform for event time partitioned tables. Flink range shuffle calculates
>>>> traffic statistics across keys (like number of observed rows per event
>>>> hour). Ideally the traffic distributions should be relatively stable. Hence
>>>> relative hour (hour 0 meaning current hour) can result in the stable
>>>> statistics for traffic weight across the relative event hours.
>>>>
>>>> 3. I am thinking about adding a `StructTransformation` class in the
>>>> iceberg-api module. It can be implemented similar to `StructProjection`
>>>> where transform functions are applied lazily during get.
>>>>
>>>> public static StructTransformation create(Schema schema, Map<Integer,
>>>> Transform<?, ?>> idToTransforms)
>>>>
>>>> 4. To represent the transformed struct, we need a transformed schema. I
>>>> am thinking about adding a transform method to TypeUtil. It will return a
>>>> transformed schema with field types updated to the result types of the
>>>> transforms. This can look a bit weird with field types changed.
>>>>
>>>> public static Schema transform(Schema schema, Map<Integer, Transform<?,
>>>> ?>> idToTransforms)
>>>>
>>>> =========================
>>>> This is how everything is put together for RowDataComparator.
>>>>
>>>> Schema projected = TypeUtil.select(schema, sortFieldIds); //
>>>> sortFieldIds set is calculated from SortOrder
>>>> Map<Integer, Transform<?, ?>> idToTransforms) idToTransforms = //
>>>> calculated from SortOrder
>>>> Schema sortSchema = TypeUtil.transform(projected, idToTransforms);
>>>>
>>>> StructLike leftSortKey =
>>>> structTransformation.wrap(structProjection.wrap(rowDataWrapper.wrap(leftRowData)))
>>>> StructLike rightSortKey =
>>>> structTransformation.wrap(structProjection.wrap(rowDataWrapper.wrap(leftRowData)))
>>>>
>>>> Comparators.forType(sortSchema).compare(leftSortKey, rightSortKey)
>>>>
>>>> Thanks,
>>>> Steven
>>>>
>>>> [1]
>>>> https://docs.google.com/document/d/13N8cMqPi-ZPSKbkXGOBMPOzbv2Fua59j8bIjjtxLWqo/
>>>>
>>>

-- 
Ryan Blue
Tabular

Reply via email to