[ 
https://issues.apache.org/jira/browse/FLINK-35384?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Steven Zhen Wu updated FLINK-35384:
-----------------------------------
    Description: 
I am trying to implement a custom range partitioner in the Flink Iceberg sink. 
Want to publish some counter metrics for certain scenarios. This is like the 
network metrics exposed in `TaskIOMetricGroup`.

We can implement the range partitioner using the public interface from 
`DataStream`. 
{code}
    public <K> DataStream<T> partitionCustom(
            Partitioner<K> partitioner, KeySelector<T, K> keySelector)
{code}

We can pass the `TaskIOMetricGroup` to the `StreamPartitioner` that 
`CustomPartitionerWrapper` extends from. `CustomPartitionerWrapper` wraps the 
pubic `Partitioner` interface, where we can implement the custom range 
partitioner.

`Partitioner` interface is a functional interface today. we can add a new 
default `setup` method without breaking the backward compatibility.
{code}
@Public
@FunctionalInterface
public interface Partitioner<K> extends java.io.Serializable, Function {
    *default void setup(TaskIOMetricGroup metrics) {}*
    int partition(K key, int numPartitions);
}
{code}

I know public interface requires a FLIP process. will do that if the community 
agree with this feature request.

Personally, `numPartitions` should be passed in the `setup` method too. But it 
is a breaking change that is NOT worth the benefit right now.
{code}
@Public
@FunctionalInterface
public interface Partitioner<K> extends java.io.Serializable, Function {
    public void setup(int numPartitions, TaskIOMetricGroup metrics) {}
    int partition(K key);
}
{code}

That would be similar to `StreamPartitioner#setup()` method that we would need 
to modify for passing the metrics group.
{code}
@Internal
public abstract class StreamPartitioner<T>
        implements ChannelSelector<SerializationDelegate<StreamRecord<T>>>, 
Serializable {
    @Override
    public void setup(int numberOfChannels, TaskIOMetricGroup metrics) {
        this.numberOfChannels = numberOfChannels;
    }
{code}


  was:
I am trying to implement a custom range partitioner in the Flink Iceberg sink. 
Want to publish some counter metrics for certain scenarios. This is like the 
network metrics exposed in `TaskIOMetricGroup`.

We can implement the range partitioner using the public interface from 
`DataStream`. 
```
    public <K> DataStream<T> partitionCustom(
            Partitioner<K> partitioner, KeySelector<T, K> keySelector)
```

We can pass the `TaskIOMetricGroup` to the `StreamPartitioner` that 
`CustomPartitionerWrapper` extends from. `CustomPartitionerWrapper` wraps the 
pubic `Partitioner` interface, where we can implement the custom range 
partitioner.

`Partitioner` interface is a functional interface today. we can add a new 
default `setup` method without breaking the backward compatibility.
```
@Public
@FunctionalInterface
public interface Partitioner<K> extends java.io.Serializable, Function {
    *default void setup(TaskIOMetricGroup metrics) {}*
    int partition(K key, int numPartitions);
}
```

I know public interface requires a FLIP process. will do that if the community 
agree with this feature request.

Personally, `numPartitions` should be passed in the `setup` method too. But it 
is a breaking change that is NOT worth the benefit right now.
```
@Public
@FunctionalInterface
public interface Partitioner<K> extends java.io.Serializable, Function {
    public void setup(int numPartitions, TaskIOMetricGroup metrics) {}
    int partition(K key);
}
```

That would be similar to `StreamPartitioner#setup()` method that we would need 
to modify for passing the metrics group.
```
@Internal
public abstract class StreamPartitioner<T>
        implements ChannelSelector<SerializationDelegate<StreamRecord<T>>>, 
Serializable {
    @Override
    public void setup(int numberOfChannels, TaskIOMetricGroup metrics) {
        this.numberOfChannels = numberOfChannels;
    }
```



> Expose metrics group to custom Partitioner
> ------------------------------------------
>
>                 Key: FLINK-35384
>                 URL: https://issues.apache.org/jira/browse/FLINK-35384
>             Project: Flink
>          Issue Type: Improvement
>          Components: Runtime / Metrics
>    Affects Versions: 1.9.4
>            Reporter: Steven Zhen Wu
>            Priority: Major
>
> I am trying to implement a custom range partitioner in the Flink Iceberg 
> sink. Want to publish some counter metrics for certain scenarios. This is 
> like the network metrics exposed in `TaskIOMetricGroup`.
> We can implement the range partitioner using the public interface from 
> `DataStream`. 
> {code}
>     public <K> DataStream<T> partitionCustom(
>             Partitioner<K> partitioner, KeySelector<T, K> keySelector)
> {code}
> We can pass the `TaskIOMetricGroup` to the `StreamPartitioner` that 
> `CustomPartitionerWrapper` extends from. `CustomPartitionerWrapper` wraps the 
> pubic `Partitioner` interface, where we can implement the custom range 
> partitioner.
> `Partitioner` interface is a functional interface today. we can add a new 
> default `setup` method without breaking the backward compatibility.
> {code}
> @Public
> @FunctionalInterface
> public interface Partitioner<K> extends java.io.Serializable, Function {
>     *default void setup(TaskIOMetricGroup metrics) {}*
>     int partition(K key, int numPartitions);
> }
> {code}
> I know public interface requires a FLIP process. will do that if the 
> community agree with this feature request.
> Personally, `numPartitions` should be passed in the `setup` method too. But 
> it is a breaking change that is NOT worth the benefit right now.
> {code}
> @Public
> @FunctionalInterface
> public interface Partitioner<K> extends java.io.Serializable, Function {
>     public void setup(int numPartitions, TaskIOMetricGroup metrics) {}
>     int partition(K key);
> }
> {code}
> That would be similar to `StreamPartitioner#setup()` method that we would 
> need to modify for passing the metrics group.
> {code}
> @Internal
> public abstract class StreamPartitioner<T>
>         implements ChannelSelector<SerializationDelegate<StreamRecord<T>>>, 
> Serializable {
>     @Override
>     public void setup(int numberOfChannels, TaskIOMetricGroup metrics) {
>         this.numberOfChannels = numberOfChannels;
>     }
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to