[ 
https://issues.apache.org/jira/browse/BEAM-2939?focusedWorklogId=393889&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-393889
 ]

ASF GitHub Bot logged work on BEAM-2939:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 27/Feb/20 01:30
            Start Date: 27/Feb/20 01:30
    Worklog Time Spent: 10m 
      Work Description: lukecwik commented on pull request #10897: [BEAM-2939] 
Java UnboundedSource SDF wrapper
URL: https://github.com/apache/beam/pull/10897#discussion_r384866125
 
 

 ##########
 File path: sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java
 ##########
 @@ -375,4 +422,361 @@ public void checkDone() throws IllegalStateException {
       }
     }
   }
+
+  /**
+   * A splittable {@link DoFn} which executes an {@link UnboundedSource}.
+   *
+   * <p>We model the element as the original source and the restriction as a 
pair of the sub-source
+   * and its {@link CheckpointMark}. This allows us to split the sub-source 
over and over as long as
+   * the checkpoint mark is {@code null} or the {@link NoopCheckpointMark} 
since it does not
+   * maintain any state.
+   */
+  // TODO: Support reporting the watermark, currently the watermark never 
advances.
+  @UnboundedPerElement
+  static class UnboundedSourceAsSDFWrapperFn<OutputT, CheckpointT extends 
CheckpointMark>
+      extends DoFn<UnboundedSource<OutputT, CheckpointT>, 
ValueWithRecordId<OutputT>> {
+
+    private static final int DEFAULT_DESIRED_NUM_SPLITS = 20;
+    private static final int DEFAULT_BUNDLE_FINALIZATION_LIMIT_MINS = 10;
+    private final Coder<CheckpointT> restrictionCoder;
+
+    private UnboundedSourceAsSDFWrapperFn(Coder<CheckpointT> restrictionCoder) 
{
+      this.restrictionCoder = restrictionCoder;
+    }
+
+    @GetInitialRestriction
+    public KV<UnboundedSource<OutputT, CheckpointT>, CheckpointT> 
initialRestriction(
+        @Element UnboundedSource<OutputT, CheckpointT> element) {
+      return KV.of(element, null);
+    }
+
+    @GetSize
+    public double getSize(
+        @Restriction KV<UnboundedSource<OutputT, CheckpointT>, CheckpointT> 
restriction,
+        PipelineOptions pipelineOptions)
+        throws Exception {
+      if (restriction.getKey() instanceof EmptyUnboundedSource) {
+        return 1;
+      }
+
+      UnboundedReader<OutputT> reader =
+          restriction.getKey().createReader(pipelineOptions, 
restriction.getValue());
+      long size = reader.getSplitBacklogBytes();
+      if (size != UnboundedReader.BACKLOG_UNKNOWN) {
+        return size;
+      }
+      // TODO: Support "global" backlog reporting
+      // size = reader.getTotalBacklogBytes();
+      // if (size != UnboundedReader.BACKLOG_UNKNOWN) {
+      //   return size;
+      // }
+      return 1;
+    }
+
+    @SplitRestriction
+    public void splitRestriction(
+        @Restriction KV<UnboundedSource<OutputT, CheckpointT>, CheckpointT> 
restriction,
+        OutputReceiver<KV<UnboundedSource<OutputT, CheckpointT>, CheckpointT>> 
receiver,
+        PipelineOptions pipelineOptions)
+        throws Exception {
+      // The empty unbounded source is trivially done and hence we don't need 
to output any splits
+      // for it.
+      if (restriction.getKey() instanceof EmptyUnboundedSource) {
+        return;
+      }
+
+      // The UnboundedSource API does not support splitting after a meaningful 
checkpoint mark has
+      // been created.
+      if (restriction.getValue() != null
+          && !(restriction.getValue()
+              instanceof UnboundedSource.CheckpointMark.NoopCheckpointMark)) {
+        receiver.output(restriction);
+      }
+
+      try {
+        for (UnboundedSource<OutputT, CheckpointT> split :
+            restriction.getKey().split(DEFAULT_DESIRED_NUM_SPLITS, 
pipelineOptions)) {
+          receiver.output(KV.of(split, null));
+        }
+      } catch (Exception e) {
+        receiver.output(restriction);
+      }
+    }
+
+    @NewTracker
+    public RestrictionTracker<
+            KV<UnboundedSource<OutputT, CheckpointT>, CheckpointT>, 
UnboundedSourceValue<OutputT>[]>
+        restrictionTracker(
+            @Restriction KV<UnboundedSource<OutputT, CheckpointT>, 
CheckpointT> restriction,
+            PipelineOptions pipelineOptions) {
+      return new UnboundedSourceAsSDFRestrictionTracker(restriction, 
pipelineOptions);
+    }
+
+    @ProcessElement
+    public ProcessContinuation processElement(
+        RestrictionTracker<
+                KV<UnboundedSource<OutputT, CheckpointT>, CheckpointT>, 
UnboundedSourceValue[]>
+            tracker,
+        OutputReceiver<ValueWithRecordId<OutputT>> receiver,
+        BundleFinalizer bundleFinalizer)
+        throws IOException {
+      UnboundedSourceValue<OutputT>[] out = new UnboundedSourceValue[1];
+      while (tracker.tryClaim(out)) {
+        receiver.outputWithTimestamp(
+            new ValueWithRecordId<>(out[0].getValue(), out[0].getId()), 
out[0].getTimestamp());
+      }
+
+      // Add the checkpoint mark to be finalized if the checkpoint mark isn't 
trivial.
+      KV<UnboundedSource<OutputT, CheckpointT>, CheckpointT> 
currentRestriction =
+          tracker.currentRestriction();
+      if (currentRestriction.getValue() != null
+          && !(tracker.currentRestriction().getValue() instanceof 
NoopCheckpointMark)) {
+        bundleFinalizer.afterBundleCommit(
+            
Instant.now().plus(Duration.standardMinutes(DEFAULT_BUNDLE_FINALIZATION_LIMIT_MINS)),
+            currentRestriction.getValue()::finalizeCheckpoint);
+      }
+
+      // If we have been split/checkpoint by a runner, the tracker will have 
been updated to the
+      // empty source and we will return stop. Otherwise the unbounded source 
has only temporarily
+      // run out of work.
+      if (tracker.currentRestriction().getKey() instanceof 
EmptyUnboundedSource) {
+        return ProcessContinuation.stop();
+      }
+      return ProcessContinuation.resume();
+    }
+
+    @GetRestrictionCoder
+    public Coder<KV<UnboundedSource<OutputT, CheckpointT>, CheckpointT>> 
restrictionCoder() {
+      return KvCoder.of(
+          SerializableCoder.of(new TypeDescriptor<UnboundedSource<OutputT, 
CheckpointT>>() {}),
+          NullableCoder.of(restrictionCoder));
+    }
+
+    /**
+     * A named tuple representing all the values we need to pass between the 
{@link UnboundedReader}
+     * and the {@link org.apache.beam.sdk.transforms.DoFn.ProcessElement 
@ProcessElement} method of
+     * the splittable DoFn.
+     */
+    @AutoValue
+    abstract static class UnboundedSourceValue<T> {
+      public static <T> UnboundedSourceValue<T> create(
+          byte[] id, T value, Instant timestamp, Instant watermark) {
+        return new 
AutoValue_Read_UnboundedSourceAsSDFWrapperFn_UnboundedSourceValue<T>(
+            id, value, timestamp, watermark);
+      }
+
+      @SuppressWarnings("mutable")
+      public abstract byte[] getId();
+
+      public abstract T getValue();
+
+      public abstract Instant getTimestamp();
+
+      public abstract Instant getWatermark();
 
 Review comment:
   The UnboundedSource can report a watermark that is reported separately from 
the element timestamp.
 
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 393889)
    Time Spent: 21h 10m  (was: 21h)

> Fn API SDF support
> ------------------
>
>                 Key: BEAM-2939
>                 URL: https://issues.apache.org/jira/browse/BEAM-2939
>             Project: Beam
>          Issue Type: Improvement
>          Components: beam-model
>            Reporter: Henning Rohde
>            Assignee: Luke Cwik
>            Priority: Major
>              Labels: portability
>          Time Spent: 21h 10m
>  Remaining Estimate: 0h
>
> The Fn API should support streaming SDF. Detailed design TBD.
> Once design is ready, expand subtasks similarly to BEAM-2822.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to