[
https://issues.apache.org/jira/browse/BEAM-4681?focusedWorklogId=165057&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-165057
]
ASF GitHub Bot logged work on BEAM-4681:
----------------------------------------
Author: ASF GitHub Bot
Created on: 12/Nov/18 18:48
Start Date: 12/Nov/18 18:48
Worklog Time Spent: 10m
Work Description: tweise commented on a change in pull request #6981:
[BEAM-4681] Add support for portable timers in Flink streaming mode
URL: https://github.com/apache/beam/pull/6981#discussion_r232771920
##########
File path:
runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperator.java
##########
@@ -304,16 +342,141 @@ protected void
addSideInputValue(StreamRecord<RawUnionValue> streamRecord) {
@Override
protected DoFnRunner<InputT, OutputT> createWrappingDoFnRunner(
DoFnRunner<InputT, OutputT> wrappedRunner) {
- return new SdkHarnessDoFnRunner();
+ sdkHarnessRunner =
+ new SdkHarnessDoFnRunner<>(
+ executableStage.getInputPCollection().getId(),
+ stageBundleFactory,
+ stateRequestHandler,
+ progressHandler,
+ outputManager,
+ outputMap,
+ executableStage.getTimers(),
+ (Coder<BoundedWindow>)
windowingStrategy.getWindowFn().windowCoder(),
+ (WindowedValue<InputT> key, TimerInternals.TimerData timerData) ->
{
+ try {
+ synchronized (getKeyedStateBackend()) {
+ setCurrentKey(keySelector.getKey(key));
+ timerInternals.setTimer(timerData);
+ }
+ } catch (Exception e) {
+ throw new RuntimeException("Couldn't set key", e);
+ }
+ },
+ () -> {
+ synchronized (getKeyedStateBackend()) {
+ ByteBuffer encodedKey = (ByteBuffer)
getKeyedStateBackend().getCurrentKey();
+ @SuppressWarnings("ByteBufferBackingArray")
+ ByteArrayInputStream byteStream = new
ByteArrayInputStream(encodedKey.array());
+ try {
+ return keyCoder.decode(byteStream);
+ } catch (IOException e) {
+ throw new RuntimeException(
+ String.format(
+ Locale.ENGLISH, "Failed to decode encoded key: %s",
encodedKey));
+ }
+ }
+ });
+ return sdkHarnessRunner;
}
- private class SdkHarnessDoFnRunner implements DoFnRunner<InputT, OutputT> {
+ @Override
+ public void processWatermark(Watermark mark) throws Exception {
+ // Due to the asynchronous communication with the SDK harness,
+ // a bundle might still be in progress and not all items have
+ // yet been received from the SDk harness. If we just set this
+ // watermark as the new output watermark, we could violate the
+ // order of the records, i.e. pending items in the SDK harness
+ // could become "late" although they were "on time".
+ //
+ // We can solve this problem using one of the following options:
+ //
+ // 1) Finish the current bundle and emit this watermark as the
+ // new output watermark. Finishing the bundle ensures that
+ // all the items have been processed by the SDK harness and
+ // received by the outputQueue (see below), where they will
+ // have been emitted to the output stream.
+ //
+ // 2) Put a hold on the output watermark for as long as the current
+ // bundle has not been finished. We have to remember to manually
+ // finish the bundle in case we receive the final watermark.
+ // To avoid latency, we should process this watermark again as
+ // soon as the current bundle is finished.
+ //
+ // Approach 1) is the easiest, yet 2) gives better throughput due
+ // to the bundle getting cut on every watermark. So we have
+ // implemented 2) below.
+ //
+ if (sdkHarnessRunner.isBundleInProgress()) {
+ if (mark.getTimestamp() >=
BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis()) {
+ invokeFinishBundle();
+ } else {
+ // It is not safe to advance the output watermark yet, so add a hold
on the current
+ // output watermark.
+ setPushedBackWatermark(Math.min(currentOutputWatermark,
getPushbackWatermarkHold()));
+ sdkHarnessRunner.setBundleFinishedCallback(
+ () -> {
+ try {
+ processWatermark(mark);
+ } catch (Exception e) {
+ throw new RuntimeException(
+ "Failed to process pushed back watermark after finished
bundle.", e);
+ }
+ });
+ }
+ }
+ super.processWatermark(mark);
+ }
+
+ private static class SdkHarnessDoFnRunner<InputT, OutputT>
Review comment:
Yeah, you can probably take the argument either way. I see this in few other
places in the Flink runner and I see your point regarding the required
dependencies. On the other hand these redundant constructor argument lists and
member declarations reduce readability, sometimes make it hard to find what
really matters and overall provide little benefit when the class is an internal
of the outer class and fully encapsulated. Not a blocker for me though.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 165057)
Time Spent: 6h 40m (was: 6.5h)
> Integrate support for timers using the portability APIs into Flink
> ------------------------------------------------------------------
>
> Key: BEAM-4681
> URL: https://issues.apache.org/jira/browse/BEAM-4681
> Project: Beam
> Issue Type: Sub-task
> Components: runner-flink
> Reporter: Luke Cwik
> Assignee: Maximilian Michels
> Priority: Major
> Labels: portability, portability-flink
> Time Spent: 6h 40m
> Remaining Estimate: 0h
>
> Consider using the code produced in BEAM-4658 to support timers.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)