[
https://issues.apache.org/jira/browse/BEAM-7192?focusedWorklogId=236752&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-236752
]
ASF GitHub Bot logged work on BEAM-7192:
----------------------------------------
Author: ASF GitHub Bot
Created on: 03/May/19 09:23
Start Date: 03/May/19 09:23
Worklog Time Spent: 10m
Work Description: tweise commented on pull request #8441: [BEAM-7192] Fix
partitioning of buffered elements during checkpointing
URL: https://github.com/apache/beam/pull/8441#discussion_r280707456
##########
File path:
runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
##########
@@ -790,49 +792,55 @@ private void setCurrentOutputWatermark(long
currentOutputWatermark) {
/** Factory for creating an {@link BufferedOutputManager} from a Flink
{@link Output}. */
interface OutputManagerFactory<OutputT> extends Serializable {
BufferedOutputManager<OutputT> create(
- Output<StreamRecord<WindowedValue<OutputT>>> output, StateInternals
stateInternals);
+ Output<StreamRecord<WindowedValue<OutputT>>> output,
+ Lock bufferLock,
+ @Nullable OperatorStateBackend operatorStateBackend,
+ @Nullable KeyedStateBackend keyedStateBackend,
+ @Nullable KeySelector keySelector)
+ throws Exception;
}
/**
* A {@link DoFnRunners.OutputManager} that can buffer its outputs. Uses
{@link
Review comment:
Add comment why we need to buffer and that this will become unnecessary once
`prepareSnapshotPreBarrier` can be used?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 236752)
Time Spent: 2h 20m (was: 2h 10m)
> Elements buffered during checkpointing may not be partitioned correctly
> -----------------------------------------------------------------------
>
> Key: BEAM-7192
> URL: https://issues.apache.org/jira/browse/BEAM-7192
> Project: Beam
> Issue Type: Bug
> Components: runner-flink
> Reporter: Maximilian Michels
> Assignee: Maximilian Michels
> Priority: Major
> Fix For: 2.13.0
>
> Time Spent: 2h 20m
> Remaining Estimate: 0h
>
> When a Flink checkpoint is taken, the current bundle is finalized. The
> finalization happens when the checkpoint barrier has already been sent
> downstream; emitting elements at this point would violate the checkpoint
> barrier alignment.
> When elements are emitted during checkpointing they are buffered until the
> checkpoint is complete. We should ensure that they are keyed correctly and
> emission of the buffered elements does not interfere with any concurrent
> state requests (in case of portability).
>
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)