Hi Leonard,

Thanks for the review! See my responses below:

1. Let's take for example the case that a split is a file (i.e. what I call
bounded). To calculate the pending records, the connector needs to know how
many splits are left and the number of records in each split. If a reader
only knows its split (i.e. file), then it wouldn't be able to calculate the
pending records. Furthermore, unlike Kafka, there is no concept of a
partition--a reader might be assigned any file, under any file path, etc.

2. Thanks for the catch, looks like I didn't commit my changes.

3. Similar to my response for (1), the metric implemented depends if the
split is bounded or unbounded. It wouldn't make much sense for a connector
to implement both. We could unify reporting the metric in the enumerator
but see the Rejected Alternatives section for why not.

4. Same issue here as in (2) regarding the implementation, looks like I
didn't commit my changes. The autoscaling algorithm needs to determine the
maximum theoretical parallelism of a source (e.g. max number of splits). To
handle the case when the split is completed, we need to report the splits
that are assigned to a reader and decrement the gauge when the split is
completed. If we want to introduce an additional RPC to the enumerator when
a split is completed, we can send an event from the reader in that
scenario. However, note that the enumerator doesn't know how many splits
are assigned in all cases, e.g. in the case that a job restarts.

I generally agree with you on the final status being awkward and I do agree
reporting this metric from the enumerator is cleaner, interface-wise. To
solve the problem of tracking splits in job restarts, we can send the
splits back to the enumerator from the readers on restore (and the
enumerator already has an API to handle `addSplitsBack`) and let the
enumerator re-assign the splits.

Best,
Mason

On Sat, Nov 18, 2023 at 8:43 PM Leonard Xu <xbjt...@gmail.com> wrote:

> Thanks Mason for starting this thread discussion, generally +1 for the
> motivation and proposal .
>
>
> I have some questions about the detail after read the FLIP.
>
> 1. The FLIP says "However, pendingRecords is currently only reported by
> the SourceReader and doesn’t cover the case for sources that only the
> SourceEnumerator can calculate those metrics e.g. bounded split
> implementations like Iceberg." Could you explain more why we cannot
> calculate metrics in  SourceReader side?
>
> 2.minor: Looks like you mixed SplitEnumeratorMetricGroup and
> SourceReaderMetricGroup in public interface part
> @PublicEvolving
> public interface SplitEnumeratorMetricGroup extends
> OperatorCoordinatorMetricGroup {
>     // IIUC, these methods belongs to SourceReader?
>     Counter getNumRecordsInErrorsCounter();
>     void setPendingBytesGauge(Gauge<Long> pendingBytesGauge);
>     void setPendingRecordsGauge(Gauge<Long> pendingRecordsGauge);
>
>     /** new addition */
>     void setAssignedSplitsGauge(Gauge<Long> assignedSplitsGauge);
> }
>
> 3. Could you explain the relation between your proposed
> PendingRecordsGauge in SplitEnumerator and PendingRecordsGauge in
> SourceReader? e.g. Which kind of connector developers needs to
> care/implements the two
> metrics.
>
> 4. The discussion thread context show me that you want to introduce
> setAssignedSplits method in sourceReader side, but the FLIP didn’t update
> yet like the implementation part? And, the final status looks strange to me
> that we calculate total assignedSplits in
> splitEnumerator but calculate total unAssignedSplits in SourceReader side
> following your design. The action ‘assign’ is happening in SplitEnumerator
> sider and should be surely managed by splitEnumerator,
> why a SourceReader never assigns any splits need to report its assigned
> splits?
>
> Best,
> Leonard
>
>
>
> > 2023年11月17日 上午6:52,Mason Chen <mas.chen6...@gmail.com> 写道:
> >
> > Hi all,
> >
> > I would like to start a discussion on FLIP-394: Add Metrics for Connector
> > Agnostic Autoscaling [1].
> >
> > This FLIP recommends adding two metrics to make autoscaling work for
> > bounded split source implementations like IcebergSource. These metrics
> are
> > required by the Flink Kubernetes Operator autoscaler algorithm [2] to
> > retrieve information for the backlog and the maximum source parallelism.
> > The changes would affect the `@PublicEvolving`
> `SplitEnumeratorMetricGroup`
> > API of the source connector framework.
> >
> > Best,
> > Mason
> >
> > [1]
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-394%3A+Add+Metrics+for+Connector+Agnostic+Autoscaling
> > [2]
> >
> https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/docs/custom-resource/autoscaler/#limitations
>
>

Reply via email to