[
https://issues.apache.org/jira/browse/FLINK-35384?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17848126#comment-17848126
]
Zhu Zhu commented on FLINK-35384:
---------------------------------
Enabling metrics for partitioners makes sense and the proposed approach of
introducing a context sounds good.
How about introducing a sub-metric group specifically for partitioner metrics?
A single task might contain multiple partitioners for which the metrics should
not get mixed. It also avoids exposing the internal TaskIOMetricGroup class to
users.
> Expose TaskIOMetricGroup to custom Partitioner via init Context
> ---------------------------------------------------------------
>
> Key: FLINK-35384
> URL: https://issues.apache.org/jira/browse/FLINK-35384
> Project: Flink
> Issue Type: Improvement
> Components: Runtime / Metrics
> Affects Versions: 1.9.4
> Reporter: Steven Zhen Wu
> Priority: Major
>
> I am trying to implement a custom range partitioner in the Flink Iceberg
> sink. Want to publish some counter metrics for certain scenarios. This is
> like the network metrics exposed in `TaskIOMetricGroup`.
> We can implement the range partitioner using the public interface from
> `DataStream`.
> {code}
> public <K> DataStream<T> partitionCustom(
> Partitioner<K> partitioner, KeySelector<T, K> keySelector)
> {code}
> We can pass the `TaskIOMetricGroup` to the `StreamPartitioner` that
> `CustomPartitionerWrapper` extends from. `CustomPartitionerWrapper` wraps the
> pubic `Partitioner` interface, where we can implement the custom range
> partitioner.
> `Partitioner` interface is a functional interface today. we can add a new
> default `setup` method without breaking the backward compatibility.
> {code}
> @Public
> @FunctionalInterface
> public interface Partitioner<K> extends java.io.Serializable, Function {
> *default void setup(TaskIOMetricGroup metrics) {}*
> int partition(K key, int numPartitions);
> }
> {code}
> I know public interface requires a FLIP process. will do that if the
> community agree with this feature request.
> Personally, `numPartitions` should be passed in the `setup` method too. But
> it is a breaking change that is NOT worth the benefit right now.
> {code}
> @Public
> @FunctionalInterface
> public interface Partitioner<K> extends java.io.Serializable, Function {
> public void setup(int numPartitions, TaskIOMetricGroup metrics) {}
> int partition(K key);
> }
> {code}
> That would be similar to `StreamPartitioner#setup()` method that we would
> need to modify for passing the metrics group.
> {code}
> @Internal
> public abstract class StreamPartitioner<T>
> implements ChannelSelector<SerializationDelegate<StreamRecord<T>>>,
> Serializable {
> @Override
> public void setup(int numberOfChannels, TaskIOMetricGroup metrics) {
> this.numberOfChannels = numberOfChannels;
> }
> {code}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)