Good point.
Stick to the conventions then

Steven Wu <stevenz...@gmail.com> ezt írta (időpont: 2023. máj. 31., Sze,
17:14):

> Peter,
>
> I also thought about that. Didn't go with `StructTransformation.schema()`,
> because I was hoping to stick with the `StructLike` interface which
> doesn't expose `schema()`. Trying to mimic the behavior of `
> StructProjection`, which doesn't expose  `schema()`. Projected schema can
> be extracted via `TypeUtil.project(Schema schema, Set<Integer> fieldIds)`.
>
> Thanks,
> Steven
>
> On Wed, May 31, 2023 at 1:18 AM Péter Váry <peter.vary.apa...@gmail.com>
> wrote:
>
>> > 4. To represent the transformed struct, we need a transformed schema. I
>> am thinking about adding a transform method to TypeUtil. It will return a
>> transformed schema with field types updated to the result types of the
>> transforms. This can look a bit weird with field types changed.
>> >
>> > public static Schema transform(Schema schema, Map<Integer,
>> Transform<?, ?>> idToTransforms)
>>
>> Wouldn't it make sense to get the Schema for the `StructTransformation` 
>> object
>> instead, like `StructTransformation.schema()`?
>>
>> Steven Wu <stevenz...@gmail.com> ezt írta (időpont: 2023. máj. 31., Sze,
>> 7:19):
>>
>>> We are implementing a range partitioner for Flink sink shuffling [1].
>>> One key piece is RowDataComparator for Flink RowData. Would love to get
>>> some feedback on a few decisions.
>>>
>>> 1. Comparators for Flink `RowData` type. Flink already has the
>>> `RowDataWrapper` class that can wrap a `RowData` as a `StructLike`. With
>>> `StructLike`, Iceberg `Comparators` can be used to compare two structs.
>>> Then we don't need to implement `RowDataComparators` that look very similar
>>> to struct `Comparators`. This is also related to the transformation
>>> decision below. We don't need to re-implement all the transform functions
>>> with Flink data types.
>>>
>>> 2. Use SortOrder or just natural orders (with null first). SortOrder
>>> supports transform functions (like bucket, hours, truncate). The
>>> implementation will be a lot simpler if we only need to implement natural
>>> order without transformations from SortOrder. But I do think the
>>> transformations (like days, bucket) in SortOrder are quite useful.
>>>
>>> In addition to the current transforms, we plan to add a `relative_hour`
>>> transform for event time partitioned tables. Flink range shuffle calculates
>>> traffic statistics across keys (like number of observed rows per event
>>> hour). Ideally the traffic distributions should be relatively stable. Hence
>>> relative hour (hour 0 meaning current hour) can result in the stable
>>> statistics for traffic weight across the relative event hours.
>>>
>>> 3. I am thinking about adding a `StructTransformation` class in the
>>> iceberg-api module. It can be implemented similar to `StructProjection`
>>> where transform functions are applied lazily during get.
>>>
>>> public static StructTransformation create(Schema schema, Map<Integer,
>>> Transform<?, ?>> idToTransforms)
>>>
>>> 4. To represent the transformed struct, we need a transformed schema. I
>>> am thinking about adding a transform method to TypeUtil. It will return a
>>> transformed schema with field types updated to the result types of the
>>> transforms. This can look a bit weird with field types changed.
>>>
>>> public static Schema transform(Schema schema, Map<Integer, Transform<?,
>>> ?>> idToTransforms)
>>>
>>> =========================
>>> This is how everything is put together for RowDataComparator.
>>>
>>> Schema projected = TypeUtil.select(schema, sortFieldIds); //
>>> sortFieldIds set is calculated from SortOrder
>>> Map<Integer, Transform<?, ?>> idToTransforms) idToTransforms = //
>>> calculated from SortOrder
>>> Schema sortSchema = TypeUtil.transform(projected, idToTransforms);
>>>
>>> StructLike leftSortKey =
>>> structTransformation.wrap(structProjection.wrap(rowDataWrapper.wrap(leftRowData)))
>>> StructLike rightSortKey =
>>> structTransformation.wrap(structProjection.wrap(rowDataWrapper.wrap(leftRowData)))
>>>
>>> Comparators.forType(sortSchema).compare(leftSortKey, rightSortKey)
>>>
>>> Thanks,
>>> Steven
>>>
>>> [1]
>>> https://docs.google.com/document/d/13N8cMqPi-ZPSKbkXGOBMPOzbv2Fua59j8bIjjtxLWqo/
>>>
>>

Reply via email to