I don't quite see why `StructTransformation` would preserve nesting. Aren't
you basically running something that would create a new row from an
existing one, optionally transforming the values at the same time? How
would you run a transformation and get back the original row? How would
nested fields work?

I think you might want to make it simpler by just using a SortKey, like you
mentioned.

On Sat, Jun 3, 2023 at 8:03 AM Steven Wu <stevenz...@gmail.com> wrote:

> Ryan, thanks a lot for the feedback. Will use `StructType` when
> applicable.
>
> `PartitionKey` is a combination of `StructProjection` and
> `StructTransformation` with a flattened array of partition tuples. This
> pattern of flattened arrays can also work for the SortOrder purpose. But it
> is not the `StructTransformation` that I had in mind earlier, where the
> original structure (like nesting) was maintained and only primitive types
> and values were transformed. If we go with the `PartitionKey` pattern,
> maybe we can call it `SortKey`.
>
> public class SortKey implements StructLike {
>     public SortKey(Schema schema, SortOrder sortOrder) {}
> }
>
> Originally, I was thinking about keeping `StructProjection` and
> `StructTransformation` separate. For SortOrder comparison, we can chain
> those two together: structTransformation.wrap(structProjection.wrap(...)).
>
> Any preference between the two choices? It probably boils down to if
> `StructTransformation` can be useful as a standalone class.
>
> On Fri, Jun 2, 2023 at 4:04 PM Ryan Blue <b...@tabular.io> wrote:
>
>> This all sounds pretty reasonable to me, although I'd use `StructType`
>> rather than `Schema` in most places so this is more reusable. I definitely
>> agree about reusing the existing tooling for `StructLike` rather than
>> re-implementing. I'd also recommend using sort order so you can use
>> transforms. Otherwise you'll just have to add it later.
>>
>> Also, check out how `PartitionKey` works because I think that's basically
>> the same thing as `StructTransformation`, just with a different name.
>>
>> On Thu, Jun 1, 2023 at 3:31 AM Péter Váry <peter.vary.apa...@gmail.com>
>> wrote:
>>
>>> Good point.
>>> Stick to the conventions then
>>>
>>> Steven Wu <stevenz...@gmail.com> ezt írta (időpont: 2023. máj. 31.,
>>> Sze, 17:14):
>>>
>>>> Peter,
>>>>
>>>> I also thought about that. Didn't go with `
>>>> StructTransformation.schema()`, because I was hoping to stick with the
>>>> `StructLike` interface which doesn't expose `schema()`. Trying to
>>>> mimic the behavior of `StructProjection`, which doesn't expose  `
>>>> schema()`. Projected schema can be extracted via `TypeUtil.project(Schema
>>>> schema, Set<Integer> fieldIds)`.
>>>>
>>>> Thanks,
>>>> Steven
>>>>
>>>> On Wed, May 31, 2023 at 1:18 AM Péter Váry <peter.vary.apa...@gmail.com>
>>>> wrote:
>>>>
>>>>> > 4. To represent the transformed struct, we need a transformed
>>>>> schema. I am thinking about adding a transform method to TypeUtil. It will
>>>>> return a transformed schema with field types updated to the result types 
>>>>> of
>>>>> the transforms. This can look a bit weird with field types changed.
>>>>> >
>>>>> > public static Schema transform(Schema schema, Map<Integer,
>>>>> Transform<?, ?>> idToTransforms)
>>>>>
>>>>> Wouldn't it make sense to get the Schema for the `StructTransformation
>>>>> ` object instead, like `StructTransformation.schema()`?
>>>>>
>>>>> Steven Wu <stevenz...@gmail.com> ezt írta (időpont: 2023. máj. 31.,
>>>>> Sze, 7:19):
>>>>>
>>>>>> We are implementing a range partitioner for Flink sink shuffling [1].
>>>>>> One key piece is RowDataComparator for Flink RowData. Would love to get
>>>>>> some feedback on a few decisions.
>>>>>>
>>>>>> 1. Comparators for Flink `RowData` type. Flink already has the
>>>>>> `RowDataWrapper` class that can wrap a `RowData` as a `StructLike`. With
>>>>>> `StructLike`, Iceberg `Comparators` can be used to compare two structs.
>>>>>> Then we don't need to implement `RowDataComparators` that look very 
>>>>>> similar
>>>>>> to struct `Comparators`. This is also related to the transformation
>>>>>> decision below. We don't need to re-implement all the transform functions
>>>>>> with Flink data types.
>>>>>>
>>>>>> 2. Use SortOrder or just natural orders (with null first). SortOrder
>>>>>> supports transform functions (like bucket, hours, truncate). The
>>>>>> implementation will be a lot simpler if we only need to implement natural
>>>>>> order without transformations from SortOrder. But I do think the
>>>>>> transformations (like days, bucket) in SortOrder are quite useful.
>>>>>>
>>>>>> In addition to the current transforms, we plan to add a
>>>>>> `relative_hour` transform for event time partitioned tables. Flink range
>>>>>> shuffle calculates traffic statistics across keys (like number of 
>>>>>> observed
>>>>>> rows per event hour). Ideally the traffic distributions should be
>>>>>> relatively stable. Hence relative hour (hour 0 meaning current hour) can
>>>>>> result in the stable statistics for traffic weight across the relative
>>>>>> event hours.
>>>>>>
>>>>>> 3. I am thinking about adding a `StructTransformation` class in the
>>>>>> iceberg-api module. It can be implemented similar to `StructProjection`
>>>>>> where transform functions are applied lazily during get.
>>>>>>
>>>>>> public static StructTransformation create(Schema schema, Map<Integer,
>>>>>> Transform<?, ?>> idToTransforms)
>>>>>>
>>>>>> 4. To represent the transformed struct, we need a transformed schema.
>>>>>> I am thinking about adding a transform method to TypeUtil. It will 
>>>>>> return a
>>>>>> transformed schema with field types updated to the result types of the
>>>>>> transforms. This can look a bit weird with field types changed.
>>>>>>
>>>>>> public static Schema transform(Schema schema, Map<Integer,
>>>>>> Transform<?, ?>> idToTransforms)
>>>>>>
>>>>>> =========================
>>>>>> This is how everything is put together for RowDataComparator.
>>>>>>
>>>>>> Schema projected = TypeUtil.select(schema, sortFieldIds); //
>>>>>> sortFieldIds set is calculated from SortOrder
>>>>>> Map<Integer, Transform<?, ?>> idToTransforms) idToTransforms = //
>>>>>> calculated from SortOrder
>>>>>> Schema sortSchema = TypeUtil.transform(projected, idToTransforms);
>>>>>>
>>>>>> StructLike leftSortKey =
>>>>>> structTransformation.wrap(structProjection.wrap(rowDataWrapper.wrap(leftRowData)))
>>>>>> StructLike rightSortKey =
>>>>>> structTransformation.wrap(structProjection.wrap(rowDataWrapper.wrap(leftRowData)))
>>>>>>
>>>>>> Comparators.forType(sortSchema).compare(leftSortKey, rightSortKey)
>>>>>>
>>>>>> Thanks,
>>>>>> Steven
>>>>>>
>>>>>> [1]
>>>>>> https://docs.google.com/document/d/13N8cMqPi-ZPSKbkXGOBMPOzbv2Fua59j8bIjjtxLWqo/
>>>>>>
>>>>>
>>
>> --
>> Ryan Blue
>> Tabular
>>
>

-- 
Ryan Blue
Tabular

Reply via email to