Hi everyone,

I started the voting thread [1]. Please cast your vote there or ask
additional questions here.

Best,

Arvid

[1]
https://lists.apache.org/thread.html/r70d321b6aa62ab4e31c8b73552b2de7846c4d31ed6f08d6541a9b36e%40%3Cdev.flink.apache.org%3E

On Fri, Jul 30, 2021 at 10:46 AM Becket Qin <becket....@gmail.com> wrote:

> Hi Arvid,
>
> I think it is OK to leave eventTimeFetchLag out of the scope of this FLIP
> given that it may involve additional API changes.
>
> 5. RecordMetadata is currently not simplifying any code. By the current
> > design RecordMetadata is a read-only data structure that is constant for
> > all records in a batch. So in Kafka, we still need to pass Tuple3 because
> > offset and timestamp are per record.
>
> Does this depend on whether we will get the RecordMetadata per record or
> per batch? We can make the semantic of RecordsWithSplitIds.metadata() to be
> the metadata associated with the last record returned by
> RecordsWithSplitIds.nextRecordsFromSplit(). In this case, individual
> implementations can decide whether to return different metadata for each
> record or not. In case of Kafka, the Tuple3 can be replaced with three
> lists of records, timestamps and offsets respectively. It probably saves
> some object instantiation, assuming the RecordMetadata object itself can be
> reused.
>
> 6. We might rename and change the semantics into
>
> public interface RecordsWithSplitIds<E> {
> >     /**
> >      * Returns the record metadata. The metadata is shared for all
> > records in the current split.
> >      */
> >     @Nullable
> >     default RecordMetadata metadataOfCurrentSplit() {
> >         return null;
> >     }
> > ...
> > }
>
> Maybe we can move one step further to make it "metadataOfCurrentRecord()"
> as I mentioned above.
>
> Thanks,
>
> Jiangjie (Becket) QIn
>
> On Fri, Jul 30, 2021 at 3:00 PM Arvid Heise <ar...@apache.org> wrote:
>
> > Hi folks,
> >
> > To move on with the FLIP, I will cut out eventTimeFetchLag out of scope
> and
> > go ahead with the remainder.
> >
> > I will open a VOTE later to today.
> >
> > Best,
> >
> > Arvid
> >
> > On Wed, Jul 28, 2021 at 8:44 AM Arvid Heise <ar...@apache.org> wrote:
> >
> > > Hi Becket,
> > >
> > > I have updated the PR according to your suggestion (note that this
> commit
> > > contains the removal of the previous approach) [1]. Here are my
> > > observations:
> > > 1. Adding the type of RecordMetadata to emitRecord would require adding
> > > another type parameter to RecordEmitter and SourceReaderBase. So I left
> > > that out for now as it would break things completely.
> > > 2. RecordEmitter implementations that want to pass it to SourceOutput
> > need
> > > to be changed in a boilerplate fashion. (passing the metadata to the
> > > SourceOutput)
> > > 3. RecordMetadata as an interface (as in the commit) probably requires
> > > boilerplate implementations in using sources as well.
> > > 4. SourceOutput would also require an additional collect
> > >
> > > default void collect(T record, RecordMetadata metadata) {
> > >     collect(record, TimestampAssigner.NO_TIMESTAMP, metadata);
> > > }
> > >
> > > 5. RecordMetadata is currently not simplifying any code. By the current
> > > design RecordMetadata is a read-only data structure that is constant
> for
> > > all records in a batch. So in Kafka, we still need to pass Tuple3
> because
> > > offset and timestamp are per record.
> > > 6. RecordMetadata is currently the same for all splits in
> > > RecordsWithSplitIds.
> > >
> > > Some ideas for the above points:
> > > 3. We should accompy it with a default implementation to avoid the
> > trivial
> > > POJO implementations as the KafkaRecordMetadata of my commit. Can we
> skip
> > > the interface and just have RecordMetadata as a base class?
> > > 1.,2.,4. We could also set the metadata only once in an orthogonal
> method
> > > that need to be called before collect like
> > SourceOutput#setRecordMetadata.
> > > Then we can implement it entirely in SourceReaderBase without changing
> > any
> > > code. The clear downside is that it introduces some implicit state in
> > > SourceOutput (which we implement) and is harder to use in
> > > non-SourceReaderBase classes: Source devs need to remember to call
> > > setRecordMetadata before collect for a respective record.
> > > 6. We might rename and change the semantics into
> > >
> > > public interface RecordsWithSplitIds<E> {
> > >     /**
> > >      * Returns the record metadata. The metadata is shared for all
> > records in the current split.
> > >      */
> > >     @Nullable
> > >     default RecordMetadata metadataOfCurrentSplit() {
> > >         return null;
> > >     }
> > > ...
> > > }
> > >
> > >
> > > Re global variable
> > >
> > >> To explain a bit more on the metric being a global variable, I think
> in
> > >> general there are two ways to pass a value from one code block to
> > another.
> > >> The first way is direct passing. That means the variable is explicitly
> > >> passed from one code block to another via arguments, be them in the
> > >> constructor or methods. Another way is indirect passing through
> context,
> > >> that means the information is stored in some kind of context or
> > >> environment, and everyone can have access to it. And there is no
> > explicit
> > >> value passing from one code block to another because everyone just
> reads
> > >> from/writes to the context or environment. This is basically the
> "global
> > >> variable" pattern I am talking about.
> > >>
> > >> In general people would avoid having a mutable global value shared
> > across
> > >> code blocks, because it is usually less deterministic and therefore
> more
> > >> difficult to understand or debug.
> > >>
> > > Since the first approach was using a Gauge, it's a callback and not a
> > > global value. The actual value is passed when invoking the callback.
> It's
> > > the same as a supplier. However, the gauge itself is stored in the
> > context,
> > > so your argument holds on that level.
> > >
> > >
> > >> Moreover, generally speaking, the Metrics in systems are usually
> > perceived
> > >> as a reporting mechanism. People usually think of it as a way to
> expose
> > >> some internal values to the external system, and don't expect the
> > program
> > >> itself to read the reported values again in the main logic, which is
> > >> essentially using the MetricGroup as a context to pass values across
> > code
> > >> block, i.e. the "global variable" pattern. Instead, people would
> usually
> > >> use the "direct passing" to do this.
> > >>
> > > Here I still don't see a difference on how we calculate the meter
> values
> > > from the byteIn/Out counters. We also need to read the counters
> > > periodically and calculate a secondary metric. So it can't be that
> > > unexpected to users.
> > >
> > > [1]
> > >
> >
> https://github.com/apache/flink/commit/71212e6baf2906444987253d0cf13b5a5978a43b
> > >
> > > On Tue, Jul 27, 2021 at 3:19 AM Becket Qin <becket....@gmail.com>
> wrote:
> > >
> > >> Hi Arvid,
> > >>
> > >> Thanks for the patient discussion.
> > >>
> > >> To explain a bit more on the metric being a global variable, I think
> in
> > >> general there are two ways to pass a value from one code block to
> > another.
> > >> The first way is direct passing. That means the variable is explicitly
> > >> passed from one code block to another via arguments, be them in the
> > >> constructor or methods. Another way is indirect passing through
> context,
> > >> that means the information is stored in some kind of context or
> > >> environment, and everyone can have access to it. And there is no
> > explicit
> > >> value passing from one code block to another because everyone just
> reads
> > >> from/writes to the context or environment. This is basically the
> "global
> > >> variable" pattern I am talking about.
> > >>
> > >> In general people would avoid having a mutable global value shared
> > across
> > >> code blocks, because it is usually less deterministic and therefore
> more
> > >> difficult to understand or debug.
> > >>
> > >> Moreover, generally speaking, the Metrics in systems are usually
> > perceived
> > >> as a reporting mechanism. People usually think of it as a way to
> expose
> > >> some internal values to the external system, and don't expect the
> > program
> > >> itself to read the reported values again in the main logic, which is
> > >> essentially using the MetricGroup as a context to pass values across
> > code
> > >> block, i.e. the "global variable" pattern. Instead, people would
> usually
> > >> use the "direct passing" to do this.
> > >>
> > >> >Can we think of other use cases for the fetchTime parameter beyond
> > >> metrics
> > >> in the future? If so, it would make an even stronger case.
> > >> At this point, I cannot think of other use cases for fetchTime, but I
> > can
> > >> see use cases where people want to get a per split fetch lag. So I am
> > >> wondering if it makes sense to generalize the API a little bit by
> > >> introducing collect(T Record, Long timestamp, Metadata metadata). This
> > >> also
> > >> makes a natural alignment because the RecordsWithSplitIds also
> returns a
> > >> Metadata associated with the record, which can be used by
> RecordEmitter
> > as
> > >> well as the SourceOutput.
> > >>
> > >> What do you think?
> > >>
> > >> Thanks,
> > >>
> > >> Jiangjie (Becket) Qin
> > >>
> > >>
> > >> On Fri, Jul 23, 2021 at 7:58 PM Arvid Heise <ar...@apache.org> wrote:
> > >>
> > >> > Hi Becket,
> > >> >
> > >> > I still can't follow your view on the metric being a global variable
> > or
> > >> > your concern that it is confusing to users. Nevertheless, I like
> your
> > >> > proposal with having an additional collect method.
> > >> >
> > >> > I was thinking that
> > >> > > SourceOutput is going to have an additional method of collect(T
> > >> Record,
> > >> > > Long timestamp, Long fetchTime). So people can just pass in the
> > fetch
> > >> > time
> > >> > > directly when they emit a record, regardless of using
> > >> SourceReaderBase or
> > >> > > not.
> > >> > >
> > >> >
> > >> > Can we think of other use cases for the fetchTime parameter beyond
> > >> metrics
> > >> > in the future? If so, it would make an even stronger case.
> > >> >
> > >> > I'll update the PR with your proposals.
> > >> >
> > >> > Best,
> > >> >
> > >> > Arvid
> > >> >
> > >> > On Fri, Jul 23, 2021 at 12:08 PM Becket Qin <becket....@gmail.com>
> > >> wrote:
> > >> >
> > >> > > Regarding the generic type v.s. class/subclasses of Metadata.
> > >> > >
> > >> > > I think generic types usually make sense if the framework/abstract
> > >> class
> > >> > > itself does not look into the instances, but just pass them from
> one
> > >> user
> > >> > > logic to another. Otherwise, interfaces or class/subclasses would
> be
> > >> > > preferred.
> > >> > >
> > >> > > In our case, it depends on whether we expect the SourceReaderBase
> to
> > >> look
> > >> > > into the MetaData. At this point, it does not. But it seems
> possible
> > >> that
> > >> > > in the future it may look into MetaData. Therefore I think the
> > class /
> > >> > > subclass pattern would be better.
> > >> > >
> > >> > > Thanks,
> > >> > >
> > >> > > Jiangjie (Becket) Qin
> > >> > >
> > >> > >
> > >> > > On Fri, Jul 23, 2021 at 5:54 PM Becket Qin <becket....@gmail.com>
> > >> wrote:
> > >> > >
> > >> > > > Hi Arvid,
> > >> > > >
> > >> > > > > I'm not sure if I follow the global variable argument, could
> you
> > >> > > > elaborate? Are you referring specifically to the SettableGauge?
> > How
> > >> is
> > >> > > that
> > >> > > > different from a Counter or Meter?
> > >> > > > What I meant is that the fetch lag computing logic can either
> get
> > >> the
> > >> > > > information required from method argument or something like a
> > static
> > >> > > global
> > >> > > > variable. We are essentially trying to reuse the metric as a
> > static
> > >> > > global
> > >> > > > variable. It seems not a common pattern in most systems. It is a
> > >> little
> > >> > > > counterintuitive that a gauge reported to the metric system
> would
> > be
> > >> > used
> > >> > > > by the program main logic later on as a variable.
> > >> > > >
> > >> > > > > We could do that. That would remove the gauge from the
> > >> MetricGroup,
> > >> > > > right? The main downside is that sources that do not use
> > >> > SourceReaderBase
> > >> > > > cannot set the metric anymore. So I'd rather keep the current
> way
> > >> and
> > >> > > > extend it with the metadata extension.
> > >> > > > Yes, that would remove the gauge from the MetricGroup. I was
> > >> thinking
> > >> > > that
> > >> > > > SourceOutput is going to have an additional method of collect(T
> > >> Record,
> > >> > > > Long timestamp, Long fetchTime). So people can just pass in the
> > >> fetch
> > >> > > time
> > >> > > > directly when they emit a record, regardless of using
> > >> SourceReaderBase
> > >> > or
> > >> > > > not.
> > >> > > >
> > >> > > > Thanks,
> > >> > > >
> > >> > > > Jiangjie (Becket) Qin
> > >> > > >
> > >> > > > On Thu, Jul 22, 2021 at 3:46 PM Chesnay Schepler <
> > >> ches...@apache.org>
> > >> > > > wrote:
> > >> > > >
> > >> > > >> The only histogram implementation available to use are those by
> > >> > > >> dropwizard, and they do some lock-free synchronization stuff
> that
> > >> so
> > >> > > far we
> > >> > > >> wanted to keep out of hot paths (this applis to both reading
> and
> > >> > > writing);
> > >> > > >> we have however never made benchmarks.
> > >> > > >> But it is reasonable to assume that they are way more expensive
> > >> than
> > >> > the
> > >> > > >> alternatives (in the ideal case just being a getter).
> > >> > > >> You'd pay that cost irrespective of whether a reporter is
> enabled
> > >> or
> > >> > > not,
> > >> > > >> which is another thing we so far wanted to prevent.
> > >> > > >> Finally, histograms are problematic because they are 10x more
> > >> > expensive
> > >> > > >> on the metric backend (because they are effectively 10
> metrics),
> > >> and
> > >> > > should
> > >> > > >> be used with extreme caution, in particular because we lack any
> > >> switch
> > >> > > to
> > >> > > >> disable/enable metrics (and I think we are getting to a point
> > where
> > >> > the
> > >> > > >> metric system becomes unusable for heavy users because of that,
> > >> where
> > >> > > >> another histogram isn't helping).
> > >> > > >>
> > >> > > >> Overall, at this time I'm against using Histograms.
> > >> > > >> Furthermore, I believe that we should be able to derive a
> > Histogram
> > >> > from
> > >> > > >> that supplier if we later one decide differently. We'd just
> need
> > to
> > >> > poll
> > >> > > >> the supplier more often.
> > >> > > >>
> > >> > > >> On 22/07/2021 09:36, Arvid Heise wrote:
> > >> > > >>
> > >> > > >> Hi all,
> > >> > > >>
> > >> > > >> @Steven Wu <stevenz...@gmail.com>
> > >> > > >>
> > >> > > >>> Regarding "lastFetchTime" latency metric, I found Gauge to be
> > less
> > >> > > >>> informative as it only captures the last sampling value for
> each
> > >> > metric
> > >> > > >>> publish interval (e.g. 60s).
> > >> > > >>> * Can we make it a histogram? Histograms are more expensive
> > >> though.
> > >> > > >>> * Timer [1, 2] is cheaper as it just tracks min, max, avg,
> > count.
> > >> but
> > >> > > >>> there
> > >> > > >>> is no such metric type in Flink
> > >> > > >>> * Summary metric type [3] (from Prometheus) would be nice too
> > >> > > >>>
> > >> > > >> I'd also think that a histogram is much more expressive but the
> > >> > original
> > >> > > >> FLIP-33 decided against it because of it's cost. @Chesnay
> > Schepler
> > >> > > >> <ches...@apache.org> could you shed some light on how much
> more
> > >> > > >> expensive it is in comparison to a simple gauge? Does it depend
> > on
> > >> > > whether
> > >> > > >> a reporter is actually using the metric?
> > >> > > >> The current interface of this FLIP-179 would actually allow to
> > >> switch
> > >> > > the
> > >> > > >> type of the metric later. But since the metric type is
> > >> user-facing, we
> > >> > > need
> > >> > > >> to have an agreement now.
> > >> > > >>
> > >> > > >> @Becket Qin <becket....@gmail.com>
> > >> > > >>
> > >> > > >>> In that case, do we still need the metric here? It seems we
> are
> > >> > > creating
> > >> > > >>> a
> > >> > > >>> "global variable" which users may potentially use. I am
> > wondering
> > >> how
> > >> > > >>> much
> > >> > > >>> additional convenience it provides because it seems easy for
> > >> people
> > >> > to
> > >> > > >>> simply pass the fetch time by themselves if they have decided
> to
> > >> not
> > >> > > use
> > >> > > >>> SourceReaderBase. Also, it looks like we do not have an API
> > >> pattern
> > >> > > that
> > >> > > >>> lets users get the value of a metric and derive another
> metric.
> > >> So I
> > >> > > >>> think
> > >> > > >>> it is easier for people to understand if LastFetchTimeGauge()
> is
> > >> just
> > >> > > an
> > >> > > >>> independent metric by itself, instead of being a part of the
> > >> > > >>> eventTimeFetchLag computation.
> > >> > > >>>
> > >> > > >> I'm not sure if I follow the global variable argument, could
> you
> > >> > > >> elaborate? Are you referring specifically to the SettableGauge?
> > >> How is
> > >> > > that
> > >> > > >> different from a Counter or Meter?
> > >> > > >>
> > >> > > >> With the current design, we could very well add a LastFetchTime
> > >> > metric.
> > >> > > >> The key point of the current abstraction is that a user gets
> the
> > >> much
> > >> > > >> harder eventTimeFetchLag metric for free, since we already need
> > to
> > >> > > extract
> > >> > > >> the event time for other metrics. I think the JavaDoc makes it
> > >> clear
> > >> > > what
> > >> > > >> the intent of the LastFetchTimeGauge is and if not we can
> improve
> > >> it.
> > >> > > >> Btw we have derived metrics already. For example, we have
> Meters
> > >> for
> > >> > > >> byteIn/Out and recordIn/Out. That's already part of FLIP-33.
> > >> > > >>
> > >> > > >> Would it make sense to have a more generic metadata type <T>
> > >> > associated
> > >> > > >>> with the records batch? In some cases, it may be useful to
> allow
> > >> the
> > >> > > >>> Source
> > >> > > >>> implementation to carry some additional information of the
> batch
> > >> to
> > >> > the
> > >> > > >>> RecordEmitter. For example, the split info of the batch, the
> > >> sender
> > >> > of
> > >> > > >>> the
> > >> > > >>> batch etc. Because the RecordEmitter only takes one record
> at.a
> > >> time,
> > >> > > >>> currently such information needs to be put into each record,
> > which
> > >> > may
> > >> > > >>> involve a lot of wrapper object creation.
> > >> > > >>>
> > >> > > >> I like the idea of having more general metadata and I follow
> the
> > >> > > example.
> > >> > > >> I'm wondering if we could avoid a generic type (since that
> adds a
> > >> bit
> > >> > of
> > >> > > >> complexity to the mental model and usage) by simply encouraging
> > to
> > >> > use a
> > >> > > >> more specific MetaData subclass as a return type of the method.
> > >> > > >>
> > >> > > >> public interface RecordsWithSplitIds<E> {
> > >> > > >>     @Nullable    default RecordMetadata getMetadata() {
> > >> > > >>         return null;    }
> > >> > > >>     ...
> > >> > > >> }
> > >> > > >>
> > >> > > >> public interface RecordMetadata {
> > >> > > >>     long getLastFetchTime(); // mandatory?}
> > >> > > >>
> > >> > > >> And using it as
> > >> > > >>
> > >> > > >> public class KafkaRecordMetadata implements RecordMetadata {}
> > >> > > >>
> > >> > > >> private static class KafkaPartitionSplitRecords<T> implements
> > >> > > RecordsWithSplitIds<T> {
> > >> > > >>     @Override    public KafkaRecordMetadata getMetadata() {
> > >> > > >>         return metadata;    }
> > >> > > >> }
> > >> > > >>
> > >> > > >> Or do we want to have the generic to explicitly pass it to the
> > >> > > >> RecordEmitter? Would that metadata be a fourth parameter of
> > >> > > >> RecordEmitter#emitRecord?
> > >> > > >>
> > >> > > >> It might be slightly better if we let the method accept a
> > Supplier
> > >> in
> > >> > > this
> > >> > > >>> case. However, it seems to introduce a parallel channel or a
> > >> sidepath
> > >> > > >>> between the user implementation and SourceOutput. I am not
> sure
> > if
> > >> > this
> > >> > > >>> is
> > >> > > >>> the right way to go. Would it be more intuitive if we just
> add a
> > >> new
> > >> > > >>> method
> > >> > > >>> to the SourceOutput, to allow the FetchTime to be passed in
> > >> > explicitly?
> > >> > > >>> This would work well with the change I suggested above, which
> > >> adds a
> > >> > > >>> generic metadata type <T> to the RecordsWithSplits and passes
> > >> that to
> > >> > > the
> > >> > > >>> RecordEmitter.emitRecord() as an argument.
> > >> > > >>>
> > >> > > >>
> > >> > > >> We could do that. That would remove the gauge from the
> > MetricGroup,
> > >> > > >> right? The main downside is that sources that do not use
> > >> > > SourceReaderBase
> > >> > > >> cannot set the metric anymore. So I'd rather keep the current
> way
> > >> and
> > >> > > >> extend it with the metadata extension.
> > >> > > >>
> > >> > > >> Best,
> > >> > > >>
> > >> > > >> Arvid
> > >> > > >>
> > >> > > >>
> > >> > > >> On Wed, Jul 21, 2021 at 1:38 PM Becket Qin <
> becket....@gmail.com
> > >
> > >> > > wrote:
> > >> > > >>
> > >> > > >>> Hey Chesnay,
> > >> > > >>>
> > >> > > >>> I think I got what that method was designed for now. Basically
> > the
> > >> > > >>> motivation is to let the SourceOutput to report the
> > >> eventTimeFetchLag
> > >> > > for
> > >> > > >>> users. At this point, the SourceOutput only has the EventTime,
> > so
> > >> > this
> > >> > > >>> method provides a way for the users to pass the FetchTime to
> the
> > >> > > >>> SourceOutput. This is essentially a context associated with
> each
> > >> > record
> > >> > > >>> emitted to the SourceOutput.
> > >> > > >>>
> > >> > > >>> It might be slightly better if we let the method accept a
> > >> Supplier in
> > >> > > >>> this
> > >> > > >>> case. However, it seems to introduce a parallel channel or a
> > >> sidepath
> > >> > > >>> between the user implementation and SourceOutput. I am not
> sure
> > if
> > >> > this
> > >> > > >>> is
> > >> > > >>> the right way to go. Would it be more intuitive if we just
> add a
> > >> new
> > >> > > >>> method
> > >> > > >>> to the SourceOutput, to allow the FetchTime to be passed in
> > >> > explicitly?
> > >> > > >>> This would work well with the change I suggested above, which
> > >> adds a
> > >> > > >>> generic metadata type <T> to the RecordsWithSplits and passes
> > >> that to
> > >> > > the
> > >> > > >>> RecordEmitter.emitRecord() as an argument.
> > >> > > >>>
> > >> > > >>> What do you think?
> > >> > > >>>
> > >> > > >>> Thanks,
> > >> > > >>>
> > >> > > >>> Jiangjie (Becket) Qin
> > >> > > >>>
> > >> > > >>> On Tue, Jul 20, 2021 at 2:50 PM Chesnay Schepler <
> > >> ches...@apache.org
> > >> > >
> > >> > > >>> wrote:
> > >> > > >>>
> > >> > > >>> > Would it be easier to understand if the method would accept
> a
> > >> > > Supplier
> > >> > > >>> > instead?
> > >> > > >>> >
> > >> > > >>> > On 20/07/2021 05:36, Becket Qin wrote:
> > >> > > >>> > > In that case, do we still need the metric here? It seems
> we
> > >> are
> > >> > > >>> creating
> > >> > > >>> > a
> > >> > > >>> > > "global variable" which users may potentially use. I am
> > >> wondering
> > >> > > how
> > >> > > >>> > much
> > >> > > >>> > > additional convenience it provides because it seems easy
> for
> > >> > people
> > >> > > >>> to
> > >> > > >>> > > simply pass the fetch time by themselves if they have
> > decided
> > >> to
> > >> > > not
> > >> > > >>> use
> > >> > > >>> > > SourceReaderBase. Also, it looks like we do not have an
> API
> > >> > pattern
> > >> > > >>> that
> > >> > > >>> > > lets users get the value of a metric and derive another
> > >> metric.
> > >> > So
> > >> > > I
> > >> > > >>> > think
> > >> > > >>> > > it is easier for people to understand if
> > LastFetchTimeGauge()
> > >> is
> > >> > > >>> just an
> > >> > > >>> > > independent metric by itself, instead of being a part of
> the
> > >> > > >>> > > eventTimeFetchLag computation.
> > >> > > >>> >
> > >> > > >>> >
> > >> > > >>> >
> > >> > > >>>
> > >> > > >>
> > >> > > >>
> > >> > >
> > >> >
> > >>
> > >
> >
>

Reply via email to