Hi,

some keywords in this triggered my attention, so sorry for late jumping in, but I'd like to comprehend the nature of the proposal.

I'll try to summarize my understanding:

The goal of the FLIP is to support automatic switching between streaming and batch processing, leveraging the fact that batch processing is more computationally effective. This makes perfect sense.

Looking at the streaming vs. batch semantics, switching from streaming to batch means the following:

 a) generally, watermarks are not propagated in batch, watermark moves from -inf to +inf in one step, at the end of batch input, this might (and probably will) skip many invocations of timers

 b) grouping by key (and window) can be done efficiently, because it can be done by sort-group and ideally parallelized by window (with some caveats)

The switch also has some conditions, namely:

 i) batch mode does not do checkpoints, inputs must be accessible repeatedly (forever)

 ii) due to failures in batch mode, inputs might be reprocessed and thus must be immutable or all sub-results computed in all branches of the computation (even possibly unaffected by the failure) have to be discarded and recomputed from scratch

Obviously, in case of the switch from batch to streaming, the property a) has to be modified so the watermark does not move to +inf, but to min(streaming watermark). Giving these properties, it should be possible to exchange batch and streaming processing without any cooperation with the application logic itself. Is my understanding correct?

If so, there is still one open question to efficiency, though. The streaming operator _might_ need sorting by timestamp (e.g. processing time-series data, or even sequential data). In that case simply switching streaming semantics to batch processing does not yield efficient processing, because the operator still needs to buffer and manually sort all the input data (batch data is always unordered). On the other hand, the batch runner already does sorting (for grouping by key), so adding additional sorting criterion is very cheap. In Apache Beam, we introduced a property of a stateful PTransform (DoFn) called @RequiresTimeSortedInput [1], which can then be implemented efficiently by batch engines.

Does the FLIP somehow work with conditions i) and ii)? I can imagine for instance that if data is read from say Kafka, then if backlog gets sufficiently large, then even the batch processing can take substantial time and if it fails after long processing, some of the original data might be already rolled out from Kafka topic.

In the FLIP there are some proposed changes to sources to emit metadata about if the records come from backlog. What is the driving line of thoughts why this is needed? In my point of view, streaming engines are _always_ processing backlog, the only question is "how delayed are the currently processed events after HEAD", or more specifically in this case "how many elements can we expect to process if the source would immediately stop receiving more data?". This should be configurable using simple option defining the difference between current processing-time (JM) and watermark of the source, or am I missing something?

Thanks for clarification and all the best,

 Jan

[1] https://beam.apache.org/releases/javadoc/2.50.0/org/apache/beam/sdk/transforms/DoFn.RequiresTimeSortedInput.html

On 8/31/23 13:17, Xuannan Su wrote:
Hi all,

I would like to share some updates on FLIP-327. Dong and I have had a
series of discussions and have made several refinements to the FLIP.

The major change to the FLIP is to allow the input of the one-input
operator to be automatically sorted during backlog processing. When
combined with the state backend optimization introduced in FLIP-325 [1],
all the keyed single-input operators can achieve similar performance as in
batch mode during backlog processing without any code change to the
operator. We also implemented a POC[2] and conducted benchmark[3] using the
KeyedStream#reduce operation. The benchmark results demonstrate the
performance gains that this FLIP can offer.

I am looking forward to any comments or feedback you may have on this FLIP.

Best,
Xuannan

[1]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-325%3A+Introduce+LRU+cache+to+accelerate+state+backend+access
[2] https://github.com/Sxnan/flink/tree/FLIP-327-demo
[3]
https://github.com/Sxnan/flink/blob/d77d0d3fb268de0a1939944ea4796a112e2d68c0/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/backlog/ReduceBacklogBenchmark.java



On Aug 18, 2023, at 21:28, Dong Lin <lindon...@gmail.com> wrote:

Hi Piotr,

Thanks for the explanation.

To recap our offline discussion, there is a concern regarding the
capability to dynamically switch between stream and batch modes. This
concern is around unforeseen behaviors such as bugs or performance
regressions, which we might not yet be aware of yet. The reason for this
concern is that this feature involves a fundamental impact on the Flink
runtime's behavior.

Due to the above concern, I agree it is reasonable to annotate related
APIs
as experimental. This step would provide us with the flexibility to modify
these APIs if issues arise in the future. This annotation also serves as a
note to users that this functionality might not perform well as expected.

Though I believe that we can ensure the reliability of this feature
through
good design and code reviews, comprehensive unit tests, and thorough
integration testing, I agree that it is reasonable to be extra cautious in
this case. Also, it should be OK to delay making these APIs as
non-experimental by 1-2 releases.

I have updated FLIP-327, FLIP-328, and FLIP-331 to mark APIs in these docs
as experimental. Please let me know if you think any other API should also
be marked as experimental.

Thanks!
Dong

On Wed, Aug 16, 2023 at 10:39 PM Piotr Nowojski <piotr.nowoj...@gmail.com>
wrote:

Hi Dong,

Operators API is unfortunately also our public facing API and I mean the
APIs that we will add there should also be marked `@Experimental` IMO.

The config options should also be marked as experimental (both
annotated @Experimental and noted the same thing in the docs,
if @Experimental annotation is not automatically mentioned in the docs).

Alternatively, how about we add a doc for
checkpointing.interval-during-backlog explaining its impact/concern as
discussed above?

We should do this independently from marking the APIs/config options as
`@Experimental`

Best,
Piotrek

pt., 11 sie 2023 o 14:55 Dong Lin <lindon...@gmail.com> napisał(a):

Hi Piotr,

Thanks for the reply!

On Fri, Aug 11, 2023 at 4:44 PM Piotr Nowojski <piotr.nowoj...@gmail.com

wrote:

Hi,

Sorry for the long delay in responding!

Given that it is an optional feature that can be
turned off by users, it might be OK to just let users try it out and
we
can
fix performance issues once we detect any of them. What do you think?
I think it's fine. It would be best to mark this feature as
experimental,
and
we say that the config keys or the default values might change in the
future.

In general I agree we can mark APIs that determine "whether to enable
dynamic switching between stream/batch mode" as experimental.

However, I am not sure we have such an API yet. The APIs added in this
FLIP
are intended to be used by operator developers rather than end users.
End
users can enable this capability by setting
execution.checkpointing.interval-during-backlog = Long.MAX and uses a
source which might implicitly set backlog statu (e.g. HybridSource). So
execution.checkpointing.interval-during-backlog is the only user-facing
APIs that can always control whether this feature can be used.

However, execution.checkpointing.interval-during-backlog itself is not
tied
to FLIP-327.

Do you mean we should set checkpointing.interval-during-backlog as
experimental? Alternatively, how about we add a doc for
checkpointing.interval-during-backlog explaining its impact/concern as
discussed above?

Best,
Dong


Maybe we can revisit the need for such a config when we
introduce/discuss
the capability to switch backlog from false to true in the future.
What
do
you think?
Sure, we can do that.

Best,
Piotrek

niedz., 23 lip 2023 o 14:32 Dong Lin <lindon...@gmail.com> napisał(a):

Hi Piotr,

Thanks a lot for the explanation. Please see my reply inline.

On Fri, Jul 21, 2023 at 10:49 PM Piotr Nowojski <
piotr.nowoj...@gmail.com>
wrote:

Hi Dong,

Thanks a lot for the answers. I can now only briefly answer your
last
email.

It is possible that spilling to disks might cause larger
overhead.
IMO
it
is an orthogonal issue already existing in Flink. This is
because a
Flink
job running batch mode might also be slower than its throughput
in
stream
mode due to the same reason.
Yes, I know, but the thing that worries me is that previously only
a
user
alone
could decide whether to use batch mode or streaming, and in
practice
one
user would rarely (if ever) use both for the same
problem/job/query.
If
his
intention was to eventually process live data, he was using
streaming
even
if there was a large backlog at the start (apart of some very few
very
power
users).

With this change, we want to introduce a mode that would be
switching
back
and forth between streaming and "batch in streaming" automatically.
So
a
potential performance regression would be much more visible and
painful
at the same time. If batch query runs slower then it could, it's
kind
of
fine as
it will end at some point. If streaming query during large back
pressure
maybe
temporary load spike switches to batch processing, that's a bigger
deal.
Especially if batch processing mode will not be able to actually
even
handle
the normal load, after the load spike. In that case, the job could
never
recover
from the backpressure/backlog mode.

I understand you are concerned with the risk of performance
regression
introduced due to switching to batch mode.

After thinking about this more, I think this existing proposal meets
the
minimum requirement of "not introducing regression for existing
jobs".
The
reason is that even if batch mode can be slower than stream mode for
some
operators in some cases, this is an optional feature that will only
be
enabled if a user explicitly overrides the newly introduced config to
non-default values. Existing jobs that simply upgrade their Flink
library
version will not suffer any performance regression.

More specifically, in order to switch to batch mode, users will need
to
explicitly set execution.checkpointing.interval-during-backlog to 0.
And
users can always explicitly update
execution.checkpointing.interval-during-backlog to turn off the batch
mode
if that incurs any performance issue.

As far as I can tell, for all practical workloads we see in
production
jobs, batch mode is always faster (w.r.t. throughput) than stream
mode
when
there is a high backlog of incoming records. Though it is still
theoretically possible, it should be very rare (if any) for batch
mode
to
be slower in practice. Given that it is an optional feature that can
be
turned off by users, it might be OK to just let users try it out and
we
can
fix performance issues once we detect any of them. What do you think?


execution.backlog.use-full-batch-mode-on-start (default false)
ops sorry, it was supposed to be sth like:

execution.backlog.use-batch-mode-only-on-start (default false)

That option would disallow switching from streaming to batch. Batch
mode
would be allowed only to get rid of the initial, present on
start-up
backlog.

Would allow us to safely experiment with switching from streaming
to
batch
and I would be actually more fine in enabling "using batch mode on
start"
by default, until we gain confidence and feedback that switching
back &
forth
is working as expected.

Now I understand what you are suggesting. I agree that it is
necessary
for
users to be able to disallow switching from streaming to batch.

I am not sure it is necessary to introduce an extra config just for
this
purpose. The reason is that we don't have any strategy that switches
backlog status from false to true yet. And when we have such strategy
(e.g.
FLIP-328) in the future, it is very likely that we will introduce
extra
config(s) for users to explicitly turn on such a feature. That means
user
should be able to turn off this feature even if we don't have
something
like execution.backlog.use-batch-mode-only-on-start.

Maybe we can revisit the need for such a config when we
introduce/discuss
the capability to switch backlog from false to true in the future.
What
do
you think?


Or we could limit the scope of this FLIP to only support
starting
with
batch mode and switching only once to
streaming, and design a follow up with switching back and forth?
Sure, that sounds good to me. I am happy to split this FLIP into
two
FLIPs
so that we can make incremental progress.
Great, let's do that. In a follow up FLIP we could restart the
discussion
about
switching back and forth.

Cool, I added the following statement to the motivation section.

"NOTE: this FLIP focuses only on the capability to switch from batch
to
stream mode. If there is any extra API needed to support switching
from
stream to batch mode, we will discuss them in a follow-up FLIP."

I am looking forward to reading your follow-up thoughts!

Best,
Dong


Piotrek

czw., 20 lip 2023 o 16:57 Dong Lin <lindon...@gmail.com>
napisał(a):
Hi Piotr,

Thank you for the very detailed comments! Please see my reply
inline.
On Thu, Jul 20, 2023 at 12:24 AM Piotr Nowojski <
piotr.nowoj...@gmail.com>
wrote:

Hi Dong,

I have a couple of follow up questions about switching back and
forth
between streaming and batching mode.
Especially around shuffle/watermark strategy, and keyed state
backend.
First of all, it might not always be beneficial to switch into
the
batch
modes:
- Shuffle strategy
    - Is sorting going to be purely in-memory? If not,
obviously
spilling
to disks might cause larger overheads
       compared to not sorting the records.

Sorting might require spilling data to disk depending on the
input
size.
The behavior of sorting w.r.t. memory/disk is expected to be
exactly
the
same as the behavior of input sorting automatically performed by
Flink
runtime in batch mode for keyed inputs.

More specifically, ExternalSorter
<

https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/ExternalSorter.java
is
currently used to sort keyed inputs in batch mode. It is
automatically
used
by Flink runtime in OneInputStreamTask (here
<

https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java#L114
)
and in MultiInputSortingDataInput (here
<

https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/sort/MultiInputSortingDataInput.java#L188
).
We plan to re-use the same code/mechanism to do sorting.

It is possible that spilling to disks might cause larger
overhead.
IMO
it
is an orthogonal issue already existing in Flink. This is
because a
Flink
job running batch mode might also be slower than its throughput
in
stream
mode due to the same reason. However, even though it is possible
in
theory,
I expect that in practice the throughput of using sorting +
BatchExecutionKeyedStateBackend should be much higher than using
other
keyed statebackends when the amount of data is large. As a matter
of
fact,
we have not heard of complaints of such performance regression
issues
in
batch mode.

The primary goal of this FLIP is to allow the operator to run at
the
same
throughput (in stream mode when there is backlog) as it can
currently
do
in
batch mode. And this goal is not affected by the disk overhead
issue
mentioned above.

I am thinking maybe we can treat it as an orthogonal performance
optimization problem instead of solving this problem in this
FLIP?
    - If it will be at least partially in-memory, does Flink have
some
mechanism to reserve optional memory that
      can be revoked if a new operator starts up? Can this
memory
be
redistributed? Ideally we should use as
      much as possible of the available memory to avoid
spilling
costs,
but
also being able to revoke that memory

This FLIP does not support dynamically revoking/redistribuitng
managed
memory used by the ExternalSorter.

For operators with isInternalSorterSupported = true, we will
allocate
to
this operator execution.sorted-inputs.memory
<

https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/configuration/ExecutionOptions.java#L144
amount of managed memory. This is the same as how Flink allocates
managed
memory to an operator when this operator has keyed inputs in
batch
mode.
Note that this FLIP intends to support operators to sort inputs
whenever
there is backlog. And there is currently no way for an operator
to
know
in
advance whether there will be no backlog after a given time. So
it
seems
simpler to just keep managed memory for such an operator
throughout
the
lifecycle of this operator, for now.

Besides, it seems that the lack of ability to dynamically
revoke/redistribute un-used managed memory is an existing issue
in
Flink.
For example, we might have two operators sharing the same slot
and
these
two operators both use managed memory (e.g. to sort inputs).
There
is
currently no way for one operator to re-use the memory not used
by
the
other operator.

Therefore, I think we can treat this as an orthogonal performance
optimization problem which can be addressed separately. What do
you
think?

    - Sometimes sorting, even if we have memory to do that,
might
be
an
unnecessary overhead.
- Watermarks
    - Is holding back watermarks always good? If we have tons
of
data
buffered/sorted and waiting to be processed
       with multiple windows per key and many different keys.
When
we
switch back to `isBacklog=false` we
       first process all of that data before processing
watermarks,
for
operators that are not using sorted input the
       state size can explode significantly causing lots of
problems.
Even
for those that can use sorting, switching to
       sorting or BatchExecutionKeyedStateBackend is not
always a
good
idea, but keeping RocksDB also can be
       risky.

With the current FLIP, the proposal is to use a sorter only when
the
inputs
have keys. According to this practice, operators which are not
using
sorting should have un-keyed inputs. I believe such an operator
will
not
even use a keyed state backend. Maybe I missed some use-case. Can
you
provide a use-case where we will have an operator with un-keyed
inputs
whose state size can explode due to we holding back watermarks?

For operators with keyed inputs that use sorting, I suppose it is
possible
that sorting + BatchExecutionKeyedStateBackend can be worse than
using
RocksDB. But I believe this is very very rare (if possible) in
almost
practical usage of Flink.

Take one step back, if this indeed cause regression for a real
use-case,
user can set execution.checkpointing.interval-during-backlog to
anything
other than 0 so that this FLIP will not use
sorter + BatchExecutionKeyedStateBackend even even when there is
backlog.
I would hope we can find a way to automatically determine whether
using
sorting + BatchExecutionKeyedStateBackend can be better or worse
than
using
RocksDB alone. But I could not find a good and reliable way to do
this.
Maybe we can update Flink to do this when we find a good way to
do
this
in
the future?



- Keyed state backend
    - I think you haven't described what happens during
switching
from
streaming to backlog processing.

Good point. This indeed needs to be described. I added a TODO in
the
"Behavior changes ..." section to describe what happens when
isBacklog
switches from false to true, for all
watermark/checkpoint/statebackend
etc.
Let me explain this for the state backend here for now. I will
update
FLIP
later.

When isBacklog switches from false to true, operator with keyed
inputs
can
optionally (as determined by its implementation) starts to use
internal
sorter to sort inputs by key, without processing inputs or
updating
statebackend, until it receives end-of-inputs or isBacklog is
switched
to
false again.



    - Switch can be an unnecessary overhead.

I agree it can cause unnecessary overhead, particularly when
isBacklog
switches back and forth frequently. Whether or not this is
unnecessary
likely depends on the duration/throughput of the backlog phase as
well
as
the specific computation logic of the operator. I am not sure
there
is
a
good way for Flink to determine in advance whether switching is
unnecessary.

Note that for the existing use-case where we expect to change
isBacklog
to
true (e.g. MySQL CDC snapshot phase, Kafka source watermark lag
being
too
high), we don't expect the watermark to switch back and force
frequently.
And user can disable this switch by setting
execution.checkpointing.interval-during-backlog to anything other
than
0.
Therefore, I am wondering if we can also view this as a
performance
optimization opportunity for extra use-cases in the future,
rather
than a
blocking issue of this FLIP for the MVP use-case (e.g. snapshot
phase
for
any CDC source, Kafka watermark lag).


At the same time, in your current proposal, for
`execution.checkpointing.interval-during-backlog > 0` we won't
switch to "batch" mode at all. That's a bit of shame, I don't
understand
why those two things should be coupled
together?

We can in general classify optimizations as those that are
compatible
with
checkpointing, and those that are not compatible with
checkpointing.
For
example, input sorting is currently not compatible with
checkpointing.
And
buffering input records to reduce state backend overhead (and
probably
columnar processing for mini-batch in the future) is compatible
with
checkpointing.

The primary of FLIP-327 is to support optimizations not
compatible
with
checkpointing. If
execution.checkpointing.interval-during-backlog >
0,
which means that user intends to still do checkpointing even when
there
is
backog, then we will not be able to support such optimizations.

For optimizations that are compatible with checkpointing, we can
do
this
even when the operator does not run in "batch mode". There are
extra
problems to solve in order to achieve this optimization, such as
supporting
unaligned checkpointing without prolonging its sync phase. I plan
to
explain how this can be done in FLIP-325.


All in all, shouldn't we aim for some more clever process of
switching
back
and forth between streaming/batch modes
for watermark strategy/state backend/sorting based on some
metrics?
Trying
to either predict if switching might help,
or trying to estimate if the last switch was beneficial? Maybe
something
along the lines:
- sort only in memory and during sorting count the number of
distinct
keys
(NDK)
    - maybe allow for spilling if so far in memory we have NDK
*
5
=
#records
- do not allow to buffer records above a certain threshold, as
otherwise
checkpointing can explode
- switch to `BatchExecutionKeyedStateBackend` only if NDK * 2
=
#records
- do not sort if last NDKs (or EMA of NDK?) 1.5 <= #records

Or even maybe for starters something even simpler and then test
out
something more fancy as a follow up?

I agree it is worth investigating these ideas to further optimize
the
performance during backlog.

I just think these can be done independently after this FLIP. The
focus
of
this FLIP is to re-use in stream mode the same optimization which
we
already use in batch mode, rather than inventing or improving the
performance of these existing optimizations.

Given that there are already a lot of new mechanism/features to
discuss
and
address in this FLIP, I am hoping we can limit the scope of this
FLIP
to
re-use the existing optimization, and do these extra optimization
opportunities as future work.

What do you think?


At the same time,
`execution.checkpointing.interval-during-backlog=0`
seems
a weird setting to me, that I would
not feel safe recommending to anyone. If processing of a
backlog
takes
a
long time, a job might stop making
any progress due to some random failures. Especially dangerous
if a
job
switches from streaming mode back to
backlog processing due to some reasons, as that could happen
months
after
someone started a job with this
strange setting. So should we even have it? I would simply
disallow
it. I
Good point. I do agree we need to further work to improve the
failover
performance in case any task fails.

As of the current FLIP, if any task fails during backlog and
execution.checkpointing.interval-during-backlog = 0, we will need
to
restart all operators to the last checkpointed state and continue
processing backlog. And this can be a lot of rollback since there
is
no
checkpoint during backlog. And this can also be worse than batch
since
this
FLIP currently does not support exporting/saving records to local
disk
(or
shuffle service) so that a failed task can re-consume the records
from
the
upstream task (or shuffle service) in the same way as how Flink
failover
a
task in batch mode.

I think we can extend this FLIP to solve this problem so that it
can
have
at least the same behavior/performance as batch-mode job. The
idea
is
to
also follow what batch mode does. For example, we can trigger a
checkpoint
when isBacklog switches to true, and every operator should buffer
its
output in the TM local disk (or remote shuffle service).
Therefore,
after a
task fails, it can restart from the last checkpoint and
re-consume
data
buffered in the upstream task.

I will update FLIP as described above. Would this address your
concern?


could see a power setting like:
        `execution.backlog.use-full-batch-mode-on-start
(default
false)`
I am not sure I fully understand this config or its motivation.
Can
you
help explain the exact semantics of this config?


that would override any heuristic of switching to backlog if
someone
is
submitting a new job that starts with
`isBacklog=true`.

Or we could limit the scope of this FLIP to only support
starting
with
batch mode and switching only once to
streaming, and design a follow up with switching back and
forth?
Sure, that sounds good to me. I am happy to split this FLIP into
two
FLIPs
so that we can make incremental progress.

Best,
Dong


I'm looking forwards to hearing/reading out your thoughts.

Best,
Piotrek


śr., 12 lip 2023 o 12:38 Jing Ge <j...@ververica.com.invalid>
napisał(a):
Hi Dong,

Thanks for your reply!

Best regards,
Jing

On Wed, Jul 12, 2023 at 3:25 AM Dong Lin <
lindon...@gmail.com>
wrote:
Hi Jing,

Thanks for the comments. Please see my reply inline.

On Wed, Jul 12, 2023 at 5:04 AM Jing Ge
<j...@ververica.com.invalid
wrote:

Hi Dong,

Thanks for the clarification. Now it is clear for me. I
got
additional
noob
questions wrt the internal sorter.

1. when to call setter to set the internalSorterSupported
to
be
true?
Developer of the operator class (i.e. those classes which
implements
`StreamOperator`) should override the
`#getOperatorAttributes()`
API
to
set
internalSorterSupported to true, if he/she decides to sort
records
internally in the operator.


2
*"For those operators whose throughput can be
considerably
improved
with
an
internal sorter, update it to take advantage of the
internal
sorter
when
its input has isBacklog=true.*
*Typically, operators that involve aggregation operation
(e.g.
join,
cogroup, aggregate) on keyed inputs can benefit from
using
an
internal
sorter."*

*"The operator that performs CoGroup operation will
instantiate
two
internal sorter to sorts records from its two inputs
separately.
Then
it
can pull the sorted records from these two sorters. This
can
be
done
without wrapping input records with TaggedUnion<...>. In
comparison,
the
existing DataStream#coGroup needs to wrap input records
with
TaggedUnion<...> before sorting them using one external
sorter,
which
introduces higher overhead."*

According to the performance test, it seems that internal
sorter
has
better
performance than external sorter. Is it possible to make
those
operators
that can benefit from it use internal sorter by default?

Yes, it is possible. After this FLIP is done, users can use
DataStream#coGroup with EndOfStreamWindows as the window
assigner
to
co-group two streams in effectively the batch manner. An
operator
that
uses
an internal sorter will be used to perform the co-group
operation.
There
is
no need for users of the DataStream API to explicitly know
or
set
the
internal sorter in anyway.

In the future, we plan to incrementally optimize other
aggregation
operation (e.g. aggregate) on the DataStream API when
EndOfStreamWindows
is
used as the window assigner.

Best,
Dong


Best regards,
Jing


On Tue, Jul 11, 2023 at 2:58 PM Dong Lin <
lindon...@gmail.com>
wrote:
Hi Jing,

Thank you for the comments! Please see my reply inline.

On Tue, Jul 11, 2023 at 5:41 AM Jing Ge
<j...@ververica.com.invalid
wrote:

Hi Dong,

Thanks for the proposal! The FLIP is already in good
shape. I
got
some
NIT
questions.

1. It is a little bit weird to write the hint right
after
the
motivation
that some features have been moved to FLIP-331,
because
at
that
time,
readers don't know the context about what features
does
it
mean.
I
would
suggest moving the note to the beginning of "Public
interfaces"
sections.
Given that the reviewer who commented on this email
thread
before I
refactored the FLIP (i.e. Piotr) has read FLP-331, I
think
it
is
simpler
to
just remove any mention of FLIP-331. I have updated the
FLIP
accordingly.

2. It is also a little bit weird to describe all
behaviour
changes
at
first
but only focus on one single feature, i.e. how to
implement
internalSorterSupported. TBH, I was lost while I was
reading
the
Public
interfaces. Maybe change the FLIP title? Another
option
could
be
to
write a
short summary of all features and point out that this
FLIP
will
only
focus
on the internalSorterSupported feature. Others could
be
found
in
FLIP-331.
WDYT?

Conceptually, the purpose of this FLIP is to allow a
stream
mode
job
to
run
parts of the topology in batch mode so that it can
apply
optimizations/computations that can not be used
together
with
checkpointing
(and thus not usable in stream mode). Although internal
sorter
is
the
only
optimization immediately supported in this FLIP, this
FLIP
lays
the
foundation to support other optimizations in the
future,
such
as
using
GPU
to process a bounded stream of records.

Therefore, I find it better to keep the current title
rather
than
limiting
the scope to internal sorter. What do you think?



3. There should be a typo at 4) Checkpoint and
failover
strategy
->
Mixed
mode ->

   - If any task fails when isBacklog=false true,
this
task
is
restarted
to
   re-process its input from the beginning.


Thank you for catching this issue. It is fixed now.

Best,
Dong



Best regards
Jing


On Thu, Jul 6, 2023 at 1:24 PM Dong Lin <
lindon...@gmail.com
wrote:
Hi Piotr,

Thanks for your comments! Please see my reply
inline.
On Wed, Jul 5, 2023 at 11:44 PM Piotr Nowojski <
piotr.nowoj...@gmail.com
wrote:

Hi Dong,

I have a couple of questions.

Could you explain why those properties

    @Nullable private Boolean isOutputOnEOF =
null;
    @Nullable private Boolean
isOutputOnCheckpoint
=
null;
    @Nullable private Boolean
isInternalSorterSupported =
null;
must be `@Nullable`, instead of having the
default
value
set
to
`false`?
By initializing these private variables in
OperatorAttributesBuilder
as
null, we can implement
`OperatorAttributesBuilder#build()`
in
such
a
way
that it can print DEBUG level logging to say
"isOutputOnCheckpoint
is
not
explicitly set". This can help user/SRE debug
performance
issues
(or
lack
of the expected optimization) due to operators not
explicitly
setting
the
right operator attribute.

For example, we might want a job to always use the
longer
checkpointing
interval (i.e.
execution.checkpointing.interval-during-backlog)
if
all
running operators have isOutputOnCheckpoint==false,
and
use
the
short
checkpointing interval otherwise. If a user has
explicitly
configured
the
execution.checkpointing.interval-during-backlog but
the
two-phase
commit
sink library has not been upgraded to set
isOutputOnCheckpoint=true,
then
the job will end up using the long checkpointing
interval,
and
it
will
be
useful to figure out what is going wrong in this
case
by
checking
the
log.
Note that the default value of these fields of the
OperatorAttributes
instance built by OperatorAttributesBuilder will
still
be
false.
The
following is mentioned in the Java doc of
`OperatorAttributesBuilder#build()`:

/**
  * If any operator attribute is null, we will log
it
at
DEBUG
level
and
use the following
  * default values.
  * - isOutputOnEOF defaults to false
  * - isOutputOnCheckpoint defaults to false
  * - isInternalSorterSupported defaults to false
  */


Second question, have you thought about cases
where
someone
is
either bootstrapping from a streaming source like
Kafka
or simply trying to catch up after a long period
of
downtime
in a
purely
streaming job? Generally speaking a cases where
user doesn't care about latency in the catch up
phase,
regardless
if
the
source is bounded or unbounded, but wants to
process
the data as fast as possible, and then switch
dynamically
to
real
time
processing?

Yes, I have thought about this. We should allow
this
job
to
effectively
run
in batch mode when the job is in the catch-up
phase.
FLIP-327
is
actually
an important step toward addressing this use-case.

In order to address the above use-case, all we need
is
a
way
for
source
operator (e.g. Kafka) to tell Flink runtime (via
IsProcessingBacklog)
whether it is in the catch-up phase.

Since every Kafka message has event-timestamp, we
can
allow
users
to
specify a job-level config such as
backlog-watermark-lag-threshold,
and
consider a Kafka Source to have
IsProcessingBacklog=true
if
system_time -
watermark > backlog-watermark-lag-threshold. This
effectively
allows
us
to
determine whether Kafka is in the catch up phase.

Once we have this capability (I plan to work on
this
in
FLIP-328),
we
can
directly use the features proposed in FLIP-325 and
FLIP-327
to
optimize
the
above use-case.

What do you think?

Best,
Dong


Best,
Piotrek

niedz., 2 lip 2023 o 16:15 Dong Lin <
lindon...@gmail.com
napisał(a):
Hi all,

I am opening this thread to discuss FLIP-327:
Support
stream-batch
unified
operator to improve job throughput when
processing
backlog
data.
The
design
doc can be found at


https://cwiki.apache.org/confluence/display/FLINK/FLIP-327%3A+Support+stream-batch+unified+operator+to+improve+job+throughput+when+processing+backlog+data
.

This FLIP enables a Flink job to initially
operate
in
batch
mode,
achieving
high throughput while processing records that
do
not
require
low
processing
latency. Subsequently, the job can seamlessly
transition
to
stream
mode
for
processing real-time records with low latency.
Importantly,
the
same
state
can be utilized before and after this mode
switch,
making
it
particularly
valuable when users wish to bootstrap the job's
state
using
historical
data.

We would greatly appreciate any comments or
feedback
you
may
have
on
this
proposal.

Cheers,
Dong


Reply via email to