[
https://issues.apache.org/jira/browse/BEAM-3499?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16338590#comment-16338590
]
ASF GitHub Bot commented on BEAM-3499:
--------------------------------------
jkff opened a new pull request #4483: [BEAM-3499, BEAM-2607] Gives the runner
access to positions of SDF claimed blocks
URL: https://github.com/apache/beam/pull/4483
This addresses the following issues:
* https://issues.apache.org/jira/browse/BEAM-3499 Watch can make no progress
if a single poll takes more than checkpoint interval
* https://issues.apache.org/jira/browse/BEAM-2607 Enforce that SDF must
return stop() after a failed tryClaim() call
The former is the primary motivation for this PR. This PR changes SDF
checkpointing timer countdown to start from the first claimed block, rather
than from the beginning of `@ProcessElement`. This requires giving the runner
visibility into claimed blocks. Such visibility enables fixing BEAM-2607 as
well. It also is a required part of implementing SDF splitting over Fn API
(tracked separately).
This PR also, of course, changes the Watch transform to the new API; and,
while we're at it, does some related improvements:
* Compresses Watch.GrowthState using Snappy. E.g. with 100k files, the
encoded state is about 3MB instead of 8MB. Compressing it much more is
difficult because the state includes uncompressible hashes. To address this,
one must shard the filepattern, or implement the improvements suggested in
https://issues.apache.org/jira/browse/BEAM-2680 .
* Makes direct runner create a clone of state cells - I did this mainly
because I noticed that GrowthStateCoder was never called on the Watch state,
which risks missing coder bugs when testing with direct runner.
This PR is update-incompatible for users of the Watch transform, e.g.
FileIO.match().continuously(). This is an experimental and very recent
transform, so I'm going to ignore the incompatibility. It also requires a
traditional Dataflow worker dance to get the worker container in sync with
these runners-core changes - I'll perform that when the rest of the PR is
approved.
R: @tgroh @chamikaramj
CC: @kennknowles @reuvenlax
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
> Watch can make no progress if a single poll takes more than checkpoint
> interval
> -------------------------------------------------------------------------------
>
> Key: BEAM-3499
> URL: https://issues.apache.org/jira/browse/BEAM-3499
> Project: Beam
> Issue Type: Bug
> Components: sdk-java-core
> Reporter: Eugene Kirpichov
> Assignee: Eugene Kirpichov
> Priority: Major
>
> E.g. when using it to poll a filepattern with hundreds of thousands of files,
> a single poll may take >10 seconds (default checkpoint interval in
> OutputAndTimeBoundedSplittableProcessElementInvoker). Because of that, the
> tracker (GrowthTracker) gets checkpointed before anything is added to it,
> i.e. before
> [https://github.com/apache/beam/blob/0d918b7cab8c4ccb2b5e050501327912161d40a7/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Watch.java#L727,]
> at a moment when it doesn't contain any useful information, so the residual
> checkpoint state is as empty as the initial one. When we resume from the
> residual checkpoint, the situation simply repeats - until we get lucky enough
> to either take <10s to poll, or to not be asked to checkpoint for >10s (e.g.
> cause the checkpointing thread isn't scheduled).
> One possible fix to this is to change the SDF checkpointing strategy to have
> a progress guarantee: e.g., start counting time from the moment the first
> block is claimed, or allow the tracker to refuse checkpointing if nothing is
> claimed yet, or something like that.
>
> A workaround for users of this (primarily via FileIO.match().continuously())
> is to shard their filepattern into a set of finer-granularity filepatterns
> matching fewer files, so that each match call takes less than 10 seconds.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)