[ 
https://issues.apache.org/jira/browse/BEAM-3499?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Eugene Kirpichov updated BEAM-3499:
-----------------------------------
    Description: 
E.g. when using it to poll a filepattern with hundreds of thousands of files, a 
single poll may take >10 seconds (default checkpoint interval in 
OutputAndTimeBoundedSplittableProcessElementInvoker). Because of that, the 
tracker (GrowthTracker) gets checkpointed before anything is added to it, i.e. 
before 
[https://github.com/apache/beam/blob/0d918b7cab8c4ccb2b5e050501327912161d40a7/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Watch.java#L727,]
 at a moment when it doesn't contain any useful information, so the residual 
checkpoint state is as empty as the initial one. When we resume from the 
residual checkpoint, the situation simply repeats - until we get lucky enough 
to either take <10s to poll, or to not be asked to checkpoint for >10s (e.g. 
cause the checkpointing thread isn't scheduled).

One possible fix to this is to change the SDF checkpointing strategy to have a 
progress guarantee: e.g., start counting time from the moment the first block 
is claimed, or allow the tracker to refuse checkpointing if nothing is claimed 
yet, or something like that.

 

A workaround for users of this (primarily via FileIO.match().continuously()) is 
to shard their filepattern into a set of finer-granularity filepatterns 
matching fewer files, so that each match call takes less than 10 seconds.

  was:
E.g. when using it to poll a filepattern with hundreds of thousands of files, a 
single poll may take >10 seconds (default checkpoint interval in 
OutputAndTimeBoundedSplittableProcessElementInvoker). Because of that, the 
tracker (GrowthTracker) gets checkpointed before anything is added to it, i.e. 
before 
[https://github.com/apache/beam/blob/0d918b7cab8c4ccb2b5e050501327912161d40a7/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Watch.java#L727,]
 at a moment when it doesn't contain any useful information, so the residual 
checkpoint state is as empty as the initial one.

One possible fix to this is to change the SDF checkpointing strategy to have a 
progress guarantee: e.g., start counting time from the moment the first block 
is claimed, or allow the tracker to refuse checkpointing if nothing is claimed 
yet, or something like that.

 

A workaround for users of this (primarily via FileIO.match().continuously()) is 
to shard their filepattern into a set of finer-granularity filepatterns 
matching fewer files, so that each match call takes less than 10 seconds.


> Watch can make no progress if a single poll takes more than checkpoint 
> interval
> -------------------------------------------------------------------------------
>
>                 Key: BEAM-3499
>                 URL: https://issues.apache.org/jira/browse/BEAM-3499
>             Project: Beam
>          Issue Type: Bug
>          Components: sdk-java-core
>            Reporter: Eugene Kirpichov
>            Assignee: Eugene Kirpichov
>            Priority: Major
>
> E.g. when using it to poll a filepattern with hundreds of thousands of files, 
> a single poll may take >10 seconds (default checkpoint interval in 
> OutputAndTimeBoundedSplittableProcessElementInvoker). Because of that, the 
> tracker (GrowthTracker) gets checkpointed before anything is added to it, 
> i.e. before 
> [https://github.com/apache/beam/blob/0d918b7cab8c4ccb2b5e050501327912161d40a7/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Watch.java#L727,]
>  at a moment when it doesn't contain any useful information, so the residual 
> checkpoint state is as empty as the initial one. When we resume from the 
> residual checkpoint, the situation simply repeats - until we get lucky enough 
> to either take <10s to poll, or to not be asked to checkpoint for >10s (e.g. 
> cause the checkpointing thread isn't scheduled).
> One possible fix to this is to change the SDF checkpointing strategy to have 
> a progress guarantee: e.g., start counting time from the moment the first 
> block is claimed, or allow the tracker to refuse checkpointing if nothing is 
> claimed yet, or something like that.
>  
> A workaround for users of this (primarily via FileIO.match().continuously()) 
> is to shard their filepattern into a set of finer-granularity filepatterns 
> matching fewer files, so that each match call takes less than 10 seconds.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to