mxm commented on a change in pull request #13105:
URL: https://github.com/apache/beam/pull/13105#discussion_r512524675
##########
File path: runners/flink/job-server/flink_job_server.gradle
##########
@@ -166,23 +166,22 @@ def portableValidatesRunnerTask(String name, Boolean
streaming, Boolean checkpoi
excludeCategories 'org.apache.beam.sdk.testing.UsesBundleFinalizer'
excludeCategories 'org.apache.beam.sdk.testing.UsesOrderedListState'
if (streaming) {
+ excludeCategories
'org.apache.beam.sdk.testing.UsesBoundedSplittableParDo'
excludeCategories
'org.apache.beam.sdk.testing.UsesTestStreamWithProcessingTime'
excludeCategories
'org.apache.beam.sdk.testing.UsesTestStreamWithMultipleStages'
excludeCategories
'org.apache.beam.sdk.testing.UsesTestStreamWithOutputTimestamp'
} else {
+ excludeCategories
'org.apache.beam.sdk.testing.UsesUnboundedSplittableParDo'
+ excludeCategories
'org.apache.beam.sdk.testing.UsesSplittableParDoWithWindowedSideInputs'
excludeCategories
'org.apache.beam.sdk.testing.UsesUnboundedPCollections'
excludeCategories 'org.apache.beam.sdk.testing.UsesTestStream'
}
- //SplitableDoFnTests
- excludeCategories
'org.apache.beam.sdk.testing.UsesBoundedSplittableParDo'
- excludeCategories
'org.apache.beam.sdk.testing.UsesSplittableParDoWithWindowedSideInputs'
- excludeCategories
'org.apache.beam.sdk.testing.UsesUnboundedSplittableParDo'
-
}
},
testFilter: {
// TODO(BEAM-10016)
excludeTestsMatching
'org.apache.beam.sdk.transforms.FlattenTest.testFlattenWithDifferentInputAndOutputCoders2'
+ excludeTestsMatching
'org.apache.beam.sdk.transforms.SplittableDoFnTest.testPairWithIndexWindowedTimestampedBounded'
Review comment:
Maybe add a comment why this one is excluded?
##########
File path:
runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
##########
@@ -1315,7 +1316,7 @@ void processPendingProcessingTimeTimers() {
keyedStateBackend.setCurrentKey(internalTimer.getKey());
TimerData timer = internalTimer.getNamespace();
checkInvokeStartBundle();
- fireTimer(timer);
+ fireTimerInternal((ByteBuffer) internalTimer.getKey(), timer);
Review comment:
I'm assuming this was a bug?
##########
File path:
runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
##########
@@ -1330,6 +1331,14 @@ private void onNewEventTimer(TimerData newTimer) {
}
}
+ /** Holds the watermark when there is an sdf timer. */
+ private void onNewSdfTimer(TimerData newTimer) {
+ Preconditions.checkState(
+
StateAndTimerBundleCheckpointHandler.isSdfTimer(newTimer.getTimerId()));
+ Preconditions.checkState(timerUsesOutputTimestamp(newTimer));
+ keyedStateInternals.addWatermarkHoldUsage(newTimer.getOutputTimestamp());
+ }
Review comment:
Wouldn't it make sense to integrate this check with the
`timerUsesOutputTimestamp` method?
##########
File path:
runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
##########
@@ -1451,6 +1472,8 @@ void onFiredOrDeletedTimer(TimerData timer) {
pendingTimersById.remove(getContextTimerId(timer.getTimerId(),
timer.getNamespace()));
if (timer.getDomain() == TimeDomain.EVENT_TIME) {
onRemovedEventTimer(timer);
+ } else if
(StateAndTimerBundleCheckpointHandler.isSdfTimer(timer.getTimerId())) {
+ onRemovedSdfTimer(timer);
Review comment:
Could be simplified by having a generic call here to `onFiredTimer`. See
above.
##########
File path:
runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPortablePipelineTranslator.java
##########
@@ -683,8 +694,24 @@ private void translateStreamingImpulse(
inputPCollectionId,
valueCoder.getClass().getSimpleName()));
}
- keyCoder = ((KvCoder) valueCoder).getKeyCoder();
- keySelector = new KvToByteBufferKeySelector(keyCoder);
+ if (stateful) {
+ keyCoder = ((KvCoder) valueCoder).getKeyCoder();
+ keySelector = new KvToByteBufferKeySelector(keyCoder);
+ } else {
+ // For an SDF, we know that the input element should be
+ // KV<KV<element, KV<restriction, watermarkState>>, size>. We are
going to use the element
+ // as the key.
+ if (!(((KvCoder) valueCoder).getKeyCoder() instanceof KvCoder)) {
+ throw new IllegalStateException(
+ String.format(
+ Locale.ENGLISH,
+ "The element coder for splittable DoFn '%s' must be
KVCoder(KvCoder, DoubleCoder) but is: %s",
+ inputPCollectionId,
+ valueCoder.getClass().getSimpleName()));
+ }
+ keyCoder = ((KvCoder) ((KvCoder)
valueCoder).getKeyCoder()).getKeyCoder();
+ keySelector = new SdfByteBufferKeySelector(keyCoder);
+ }
Review comment:
Will SDFs ever support stateful operations? If so, this wouldn't work
anymore because keys are not guaranteed to be processed on the same operator
instance.
##########
File path:
runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
##########
@@ -1330,6 +1331,14 @@ private void onNewEventTimer(TimerData newTimer) {
}
}
+ /** Holds the watermark when there is an sdf timer. */
+ private void onNewSdfTimer(TimerData newTimer) {
+ Preconditions.checkState(
+
StateAndTimerBundleCheckpointHandler.isSdfTimer(newTimer.getTimerId()));
+ Preconditions.checkState(timerUsesOutputTimestamp(newTimer));
+ keyedStateInternals.addWatermarkHoldUsage(newTimer.getOutputTimestamp());
+ }
Review comment:
We could rename the mentioned method to `onFiredTimer` and include the
checks for output watermark holds in there.
##########
File path:
runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
##########
@@ -1342,6 +1351,14 @@ private void onRemovedEventTimer(TimerData removedTimer)
{
}
}
+ private void onRemovedSdfTimer(TimerData removedTimer) {
+ Preconditions.checkState(
+
StateAndTimerBundleCheckpointHandler.isSdfTimer(removedTimer.getTimerId()));
+ Preconditions.checkState(timerUsesOutputTimestamp(removedTimer));
+ // Remove the watermark hold which is set for this sdf timer.
+
keyedStateInternals.removeWatermarkHoldUsage(removedTimer.getOutputTimestamp());
+ }
Review comment:
We could rename the mentioned method to onFiredTimer and include the
checks for output watermark holds in there.
##########
File path:
runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
##########
@@ -1342,6 +1351,14 @@ private void onRemovedEventTimer(TimerData removedTimer)
{
}
}
+ private void onRemovedSdfTimer(TimerData removedTimer) {
+ Preconditions.checkState(
+
StateAndTimerBundleCheckpointHandler.isSdfTimer(removedTimer.getTimerId()));
+ Preconditions.checkState(timerUsesOutputTimestamp(removedTimer));
+ // Remove the watermark hold which is set for this sdf timer.
+
keyedStateInternals.removeWatermarkHoldUsage(removedTimer.getOutputTimestamp());
+ }
Review comment:
Wouldn't it make sense to integrate this check with the
`timerUsesOutputTimestamp` method?
##########
File path:
runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperator.java
##########
@@ -484,6 +530,148 @@ void setTimer(Timer<?> timerElement,
TimerInternals.TimerData timerData) {
}
}
+ /**
+ * A {@link TimerInternalsFactory} for Flink operator to create a {@link
+ * StateAndTimerBundleCheckpointHandler} to handle {@link
+ * org.apache.beam.model.fnexecution.v1.BeamFnApi.DelayedBundleApplication}.
+ */
+ class SdfFlinkTimerInternalsFactory implements TimerInternalsFactory<InputT>
{
Review comment:
Maybe we should move these out of this class.
##########
File path:
runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
##########
@@ -1424,6 +1442,9 @@ private void registerTimer(TimerData timer, String
contextTimerId) throws Except
case PROCESSING_TIME:
case SYNCHRONIZED_PROCESSING_TIME:
timerService.registerProcessingTimeTimer(timer,
adjustTimestampForFlink(time));
+ if
(StateAndTimerBundleCheckpointHandler.isSdfTimer(timer.getTimerId())) {
+ onNewSdfTimer(timer);
+ }
Review comment:
Could be simplified by having a generic call here to `onFiredTimer`. See
above.
##########
File path:
runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperator.java
##########
@@ -484,6 +530,148 @@ void setTimer(Timer<?> timerElement,
TimerInternals.TimerData timerData) {
}
}
+ /**
+ * A {@link TimerInternalsFactory} for Flink operator to create a {@link
+ * StateAndTimerBundleCheckpointHandler} to handle {@link
+ * org.apache.beam.model.fnexecution.v1.BeamFnApi.DelayedBundleApplication}.
+ */
+ class SdfFlinkTimerInternalsFactory implements TimerInternalsFactory<InputT>
{
+ @Override
+ public TimerInternals timerInternalsForKey(InputT key) {
+ try {
+ ByteBuffer encodedKey =
+ (ByteBuffer)
keySelector.getKey(WindowedValue.valueInGlobalWindow(key));
+ return new SdfFlinkTimerInternals(encodedKey);
+ } catch (Exception e) {
+ throw new RuntimeException("Couldn't get a timer internals", e);
+ }
+ }
+ }
+
+ /**
+ * A {@link TimerInternals} for rescheduling {@link
+ * org.apache.beam.model.fnexecution.v1.BeamFnApi.DelayedBundleApplication}.
+ */
+ class SdfFlinkTimerInternals implements TimerInternals {
+ private final ByteBuffer key;
+
+ SdfFlinkTimerInternals(ByteBuffer key) {
+ this.key = key;
+ }
+
+ @Override
+ public void setTimer(
+ StateNamespace namespace,
+ String timerId,
+ String timerFamilyId,
+ Instant target,
+ Instant outputTimestamp,
+ TimeDomain timeDomain) {
+ setTimer(
+ TimerData.of(timerId, timerFamilyId, namespace, target,
outputTimestamp, timeDomain));
+ }
+
+ @Override
+ public void setTimer(TimerData timerData) {
+ try {
+ try (Locker locker = Locker.locked(stateBackendLock)) {
+ getKeyedStateBackend().setCurrentKey(key);
+ timerInternals.setTimer(timerData);
+ minEventTimeTimerTimestampInCurrentBundle =
+ Math.min(
+ minEventTimeTimerTimestampInCurrentBundle,
+
adjustTimestampForFlink(timerData.getOutputTimestamp().getMillis()));
+ }
+ } catch (Exception e) {
+ throw new RuntimeException("Couldn't set timer", e);
+ }
+ }
+
+ @Override
+ public void deleteTimer(StateNamespace namespace, String timerId,
TimeDomain timeDomain) {
+ throw new UnsupportedOperationException(
+ "It is not expected to use SdfFlinkTimerInternals to delete a
timer");
+ }
+
+ @Override
+ public void deleteTimer(StateNamespace namespace, String timerId, String
timerFamilyId) {
+ throw new UnsupportedOperationException(
+ "It is not expected to use SdfFlinkTimerInternals to delete a
timer");
+ }
+
+ @Override
+ public void deleteTimer(TimerData timerKey) {
+ throw new UnsupportedOperationException(
+ "It is not expected to use SdfFlinkTimerInternals to delete a
timer");
+ }
+
+ @Override
+ public Instant currentProcessingTime() {
+ return timerInternals.currentProcessingTime();
+ }
+
+ @Override
+ public @Nullable Instant currentSynchronizedProcessingTime() {
+ return timerInternals.currentSynchronizedProcessingTime();
+ }
+
+ @Override
+ public Instant currentInputWatermarkTime() {
+ return timerInternals.currentInputWatermarkTime();
+ }
+
+ @Override
+ public @Nullable Instant currentOutputWatermarkTime() {
+ return timerInternals.currentOutputWatermarkTime();
+ }
+ }
+
+ /**
+ * A {@link StateInternalsFactory} for Flink operator to create a {@link
+ * StateAndTimerBundleCheckpointHandler} to handle {@link
+ * org.apache.beam.model.fnexecution.v1.BeamFnApi.DelayedBundleApplication}.
+ */
+ class SdfFlinkStateInternalsFactory implements StateInternalsFactory<InputT>
{
+ @Override
+ public StateInternals stateInternalsForKey(InputT key) {
+ try {
+ ByteBuffer encodedKey =
+ (ByteBuffer)
keySelector.getKey(WindowedValue.valueInGlobalWindow(key));
+ return new SdfFlinkStateInternals(encodedKey);
+ } catch (Exception e) {
+ throw new RuntimeException("Couldn't get a state internals", e);
+ }
+ }
+ }
+
+ /** A {@link StateInternals} for keeping {@link DelayedBundleApplication}s
as states. */
+ class SdfFlinkStateInternals implements StateInternals {
Review comment:
Maybe we should move these out of this class.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]