[ 
https://issues.apache.org/jira/browse/BEAM-5605?focusedWorklogId=385434&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-385434
 ]

ASF GitHub Bot logged work on BEAM-5605:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 11/Feb/20 20:49
            Start Date: 11/Feb/20 20:49
    Worklog Time Spent: 10m 
      Work Description: iemejia commented on pull request #10576: [BEAM-5605] 
Convert all BoundedSources to SplittableDoFns when using beam_fn_api experiment.
URL: https://github.com/apache/beam/pull/10576#discussion_r377889999
 
 

 ##########
 File path: sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java
 ##########
 @@ -177,4 +205,128 @@ public void populateDisplayData(DisplayData.Builder 
builder) {
           .include("source", source);
     }
   }
+
+  /**
+   * A splittable {@link DoFn} which executes a {@link BoundedSource}.
+   *
+   * <p>We model the element as the original source and the restriction as the 
sub-source. This
+   * allows us to split the sub-source over and over yet still receive 
"source" objects as inputs.
+   */
+  static class BoundedSourceAsSDFWrapperFn<T> extends DoFn<BoundedSource<T>, 
T> {
+    private static final long DEFAULT_DESIRED_BUNDLE_SIZE_BYTES = 64 * (1 << 
20);
+
+    @GetInitialRestriction
+    public BoundedSource<T> initialRestriction(@Element BoundedSource<T> 
element) {
+      return element;
+    }
+
+    @GetSize
+    public double getSize(
+        @Restriction BoundedSource<T> restriction, PipelineOptions 
pipelineOptions)
+        throws Exception {
+      return restriction.getEstimatedSizeBytes(pipelineOptions);
+    }
+
+    @SplitRestriction
+    public void splitRestriction(
+        @Restriction BoundedSource<T> restriction,
+        OutputReceiver<BoundedSource<T>> receiver,
+        PipelineOptions pipelineOptions)
+        throws Exception {
+      for (BoundedSource<T> split :
+          restriction.split(DEFAULT_DESIRED_BUNDLE_SIZE_BYTES, 
pipelineOptions)) {
+        receiver.output(split);
+      }
+    }
+
+    @NewTracker
+    public RestrictionTracker<BoundedSource<T>, Object[]> restrictionTracker(
+        @Restriction BoundedSource<T> restriction, PipelineOptions 
pipelineOptions) {
+      return new BoundedSourceAsSDFRestrictionTracker<>(restriction, 
pipelineOptions);
+    }
+
+    @ProcessElement
+    public void processElement(
+        RestrictionTracker<BoundedSource<T>, Object[]> tracker, 
OutputReceiver<T> receiver)
+        throws IOException {
+      Object[] out = new Object[1];
+      while (tracker.tryClaim(out)) {
+        receiver.output((T) out[0]);
+      }
+    }
+
+    @GetRestrictionCoder
+    public Coder<BoundedSource<T>> restrictionCoder() {
+      return SerializableCoder.of(new TypeDescriptor<BoundedSource<T>>() {});
+    }
+
+    /**
+     * A fake restriction tracker which adapts to the {@link BoundedSource} 
API. The restriction
+     * object is used to advance the underlying source and to "return" the 
current element.
+     */
+    private static class BoundedSourceAsSDFRestrictionTracker<T>
 
 Review comment:
   :clap: 
 
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

    Worklog Id:     (was: 385434)
    Time Spent: 12h 20m  (was: 12h 10m)

> Support Portable SplittableDoFn for batch
> -----------------------------------------
>
>                 Key: BEAM-5605
>                 URL: https://issues.apache.org/jira/browse/BEAM-5605
>             Project: Beam
>          Issue Type: New Feature
>          Components: sdk-java-core
>            Reporter: Scott Wegner
>            Assignee: Luke Cwik
>            Priority: Major
>              Labels: portability
>          Time Spent: 12h 20m
>  Remaining Estimate: 0h
>
> Roll-up item tracking work towards supporting portable SplittableDoFn for 
> batch



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to