lukecwik commented on a change in pull request #11596:
URL: https://github.com/apache/beam/pull/11596#discussion_r427616856



##########
File path: 
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/HL7v2IO.java
##########
@@ -415,10 +423,29 @@ private Message fetchMessage(HealthcareApiClient client, 
String msgId)
     }
   }
 
-  /** List HL7v2 messages in HL7v2 Stores with optional filter. */
+  /**
+   * List HL7v2 messages in HL7v2 Stores with optional filter.
+   *
+   * <p>This transform is optimized for dynamic splitting of message.list 
calls for large batches of
+   * historical data and assumes rather continuous stream of sendTimes. It 
will dynamically
+   * rebalance resources to handle "peak traffic times" but will waste 
resources if there are large
+   * durations (days) of the sendTime dimension without data.

Review comment:
       wouldn' this just be a small amount of waste since we would effectively 
get an empty response?

##########
File path: 
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/HL7v2IO.java
##########
@@ -415,10 +423,29 @@ private Message fetchMessage(HealthcareApiClient client, 
String msgId)
     }
   }
 
-  /** List HL7v2 messages in HL7v2 Stores with optional filter. */
+  /**
+   * List HL7v2 messages in HL7v2 Stores with optional filter.
+   *
+   * <p>This transform is optimized for dynamic splitting of message.list 
calls for large batches of
+   * historical data and assumes rather continuous stream of sendTimes. It 
will dynamically
+   * rebalance resources to handle "peak traffic times" but will waste 
resources if there are large
+   * durations (days) of the sendTime dimension without data.
+   *
+   * <p>Implementation includes overhead for: 1. two api calls to determine 
the min/max sendTime of

Review comment:
       consider using `<ol>` and `<li>` tags in the javadoc for your ordered 
list

##########
File path: 
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/HL7v2IO.java
##########
@@ -415,10 +423,29 @@ private Message fetchMessage(HealthcareApiClient client, 
String msgId)
     }
   }
 
-  /** List HL7v2 messages in HL7v2 Stores with optional filter. */
+  /**
+   * List HL7v2 messages in HL7v2 Stores with optional filter.
+   *
+   * <p>This transform is optimized for dynamic splitting of message.list 
calls for large batches of
+   * historical data and assumes rather continuous stream of sendTimes. It 
will dynamically
+   * rebalance resources to handle "peak traffic times" but will waste 
resources if there are large
+   * durations (days) of the sendTime dimension without data.
+   *
+   * <p>Implementation includes overhead for: 1. two api calls to determine 
the min/max sendTime of
+   * the HL7v2 store at invocation time. 2. initial splitting into 
non-overlapping time ranges
+   * (default daily) to achieve parallelization in separate messages.list 
calls.
+   *
+   * <p>This will make more queries than necessary when used with very small 
data sets. (or very
+   * sparse data sets in the sendTime dimension).
+   *
+   * <p>If you have large but sparse data (e.g. hours between consecutive 
message sendTimes) and
+   * know something about the time ranges where you have no data, consider 
using multiple instances
+   * of this transform specifying sendTime filters to omit the ranges where 
there is no data.
+   */
   public static class ListHL7v2Messages extends PTransform<PBegin, 
PCollection<HL7v2Message>> {
-    private final List<String> hl7v2Stores;
-    private final String filter;
+    private final ValueProvider<List<String>> hl7v2Stores;
+    private ValueProvider<String> filter;
+    private Duration initialSplitDuration;

Review comment:
       even if a member variable is null, it should still be final since it 
doesn't look like we mutate it locally. Same reason for other places I suggest 
to change this.
   ```suggestion
       private final ValueProvider<String> filter;
       private final Duration initialSplitDuration;
   ```

##########
File path: 
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/HL7v2IO.java
##########
@@ -427,29 +454,75 @@ private Message fetchMessage(HealthcareApiClient client, 
String msgId)
      * @param filter the filter
      */
     ListHL7v2Messages(ValueProvider<List<String>> hl7v2Stores, 
ValueProvider<String> filter) {
-      this.hl7v2Stores = hl7v2Stores.get();
-      this.filter = filter.get();
+      this.hl7v2Stores = hl7v2Stores;
+      this.filter = filter;
+    }
+
+    /**
+     * Instantiates a new List hl 7 v 2 messages.
+     *
+     * @param hl7v2Stores the hl 7 v 2 stores
+     * @param filter the filter
+     * @param initialSplitDuration the initial split duration for sendTime 
dimension splits
+     */
+    ListHL7v2Messages(
+        ValueProvider<List<String>> hl7v2Stores,
+        ValueProvider<String> filter,
+        Duration initialSplitDuration) {
+      this.hl7v2Stores = hl7v2Stores;
+      this.filter = filter;
+      this.initialSplitDuration = initialSplitDuration;
     }
 
+    /**
+     * Instantiates a new List hl7v2 messages.
+     *
+     * @param hl7v2Stores the hl7v2 stores
+     */
     ListHL7v2Messages(ValueProvider<List<String>> hl7v2Stores) {
-      this.hl7v2Stores = hl7v2Stores.get();
+      this.hl7v2Stores = hl7v2Stores;
       this.filter = null;
     }
 
+    /**
+     * Instantiates a new List hl7v2 messages.
+     *
+     * @param hl7v2Stores the hl7v2 stores
+     * @param initialSplitDuration the initial split duration
+     */
+    ListHL7v2Messages(ValueProvider<List<String>> hl7v2Stores, Duration 
initialSplitDuration) {
+      this.hl7v2Stores = hl7v2Stores;
+      this.initialSplitDuration = initialSplitDuration;
+    }
+
     @Override
     public PCollection<HL7v2Message> expand(PBegin input) {
       return input
-          .apply(Create.of(this.hl7v2Stores))
-          .apply(ParDo.of(new ListHL7v2MessagesFn(this.filter)))
+          .apply(Create.ofProvider(this.hl7v2Stores, 
ListCoder.of(StringUtf8Coder.of())))
+          .apply(FlatMapElements.into(TypeDescriptors.strings()).via((x) -> x))
+          .apply(ParDo.of(new ListHL7v2MessagesFn(this.filter, 
initialSplitDuration)))
           .setCoder(new HL7v2MessageCoder())
           // Break fusion to encourage parallelization of downstream 
processing.
           .apply(Reshuffle.viaRandomKey());
     }
   }
 
+  /**
+   * Implemented as Splitable DoFn that claims millisecond resolutions of 
offset restrictions in the
+   * Message.sendTime dimension.
+   */
+  @BoundedPerElement
   static class ListHL7v2MessagesFn extends DoFn<String, HL7v2Message> {
 
-    private final String filter;
+    private static final Logger LOG = 
LoggerFactory.getLogger(ListHL7v2MessagesFn.class);
+    private ValueProvider<String> filter;
+    // These control the initial restriction split which means that the list 
of integer pairs
+    // must comfortably fit in memory.
+    private static final Duration DEFAULT_DESIRED_SPLIT_DURATION = 
Duration.standardDays(1);
+    private static final Duration DEFAULT_MIN_SPLIT_DURATION = 
Duration.standardHours(1);
+    private Duration initialSplitDuration;
+    private Instant from;
+    private Instant to;

Review comment:
       Can any of these be final?

##########
File path: 
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/HL7v2IO.java
##########
@@ -472,24 +547,118 @@ public void initClient() throws IOException {
       this.client = new HttpHealthcareApiClient();
     }
 
+    @GetInitialRestriction
+    public OffsetRange getEarliestToLatestRestriction(@Element String 
hl7v2Store)
+        throws IOException {
+      from = this.client.getEarliestHL7v2SendTime(hl7v2Store, this.filter);
+      // filters are [from, to) to match logic of OffsetRangeTracker but need 
latest element to be
+      // included in results set to add an extra ms to the upper bound.
+      to = this.client.getLatestHL7v2SendTime(hl7v2Store, this.filter).plus(1);
+      return new OffsetRange(from.getMillis(), to.getMillis());
+    }
+
+    @NewTracker
+    public OffsetRangeTracker newTracker(@Restriction OffsetRange timeRange) {
+      return timeRange.newTracker();
+    }
+
+    @SplitRestriction
+    public void split(@Restriction OffsetRange timeRange, 
OutputReceiver<OffsetRange> out) {
+      List<OffsetRange> splits =
+          timeRange.split(initialSplitDuration.getMillis(), 
DEFAULT_MIN_SPLIT_DURATION.getMillis());
+      Instant from = Instant.ofEpochMilli(timeRange.getFrom());
+      Instant to = Instant.ofEpochMilli(timeRange.getTo());
+      Duration totalDuration = new Duration(from, to);
+      LOG.info(
+          String.format(
+              "splitting initial sendTime restriction of [minSendTime, now): 
[%s,%s), "
+                  + "or [%s, %s). \n"
+                  + "total days: %s \n"
+                  + "into %s splits. \n"
+                  + "Last split: %s",
+              from,
+              to,
+              timeRange.getFrom(),
+              timeRange.getTo(),
+              totalDuration.getStandardDays(),
+              splits.size(),
+              splits.get(splits.size() - 1).toString()));
+
+      for (OffsetRange s : splits) {
+        out.output(s);
+      }
+    }
+
     /**
      * List messages.
      *
-     * @param context the context
+     * @param hl7v2Store the HL7v2 store to list messages from
      * @throws IOException the io exception
      */
     @ProcessElement
-    public void listMessages(ProcessContext context) throws IOException {
-      String hl7v2Store = context.element();
-      // Output all elements of all pages.
+    public void listMessages(
+        @Element String hl7v2Store,
+        RestrictionTracker tracker,
+        OutputReceiver<HL7v2Message> outputReceiver)
+        throws IOException {
+      OffsetRange currentRestriction = (OffsetRange) 
tracker.currentRestriction();
+      Instant startRestriction = 
Instant.ofEpochMilli(currentRestriction.getFrom());
+      Instant endRestriction = 
Instant.ofEpochMilli(currentRestriction.getTo());
       HttpHealthcareApiClient.HL7v2MessagePages pages =
-          new HttpHealthcareApiClient.HL7v2MessagePages(client, hl7v2Store, 
this.filter);
+          new HttpHealthcareApiClient.HL7v2MessagePages(
+              client, hl7v2Store, startRestriction, endRestriction, filter, 
"sendTime");
       long reqestTime = Instant.now().getMillis();
-      for (Stream<HL7v2Message> page : pages) {
+      long lastClaimedMilliSecond;
+      Instant cursor;
+      boolean hangingClaim = false; // flag if the claimed ms spans spills 
over to the next page.
+      for (List<HL7v2Message> page : pages) { // loop over pages.
+        int i = 0;
+        HL7v2Message msg = page.get(i);
+        while (i < page.size()) { // loop over messages in page
+          cursor = Instant.parse(msg.getSendTime());
+          lastClaimedMilliSecond = cursor.getMillis();
+          LOG.info(
+              String.format(
+                  "initial claim for page %s lastClaimedMilliSecond = %s",
+                  i, lastClaimedMilliSecond));
+          if (hangingClaim || tracker.tryClaim(lastClaimedMilliSecond)) {
+            // This means we have claimed an entire millisecond we need to 
make sure that we
+            // process all messages for this millisecond because sendTime is 
allegedly nano second
+            // resolution.
+            // 
https://cloud.google.com/healthcare/docs/reference/rest/v1beta1/projects.locations.datasets.hl7V2Stores.messages#Message
+            while (cursor.getMillis() == lastClaimedMilliSecond
+                && i < page.size()) { // loop over messages in millisecond.
+              outputReceiver.output(msg);
+              msg = page.get(i++);

Review comment:
       I agree with @pabloem that this logic is not simple to follow and I 
think you could really simplify your code if you used 
https://guava.dev/releases/21.0/api/docs/com/google/common/collect/FluentIterable.html#concat-java.lang.Iterable-
   since it would convert `Iterable<List<HL7v2Message>>` into 
`Iterable<HL7v2Message>` and only accesses the elements lazily so it wouldn't 
prefetch everything.
   
   This would allow you to not worry that `messages` are in a `page` and your 
processing multiple `pages`.

##########
File path: 
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/HL7v2IO.java
##########
@@ -427,29 +454,75 @@ private Message fetchMessage(HealthcareApiClient client, 
String msgId)
      * @param filter the filter
      */
     ListHL7v2Messages(ValueProvider<List<String>> hl7v2Stores, 
ValueProvider<String> filter) {
-      this.hl7v2Stores = hl7v2Stores.get();
-      this.filter = filter.get();
+      this.hl7v2Stores = hl7v2Stores;
+      this.filter = filter;
+    }
+
+    /**
+     * Instantiates a new List hl 7 v 2 messages.
+     *
+     * @param hl7v2Stores the hl 7 v 2 stores
+     * @param filter the filter
+     * @param initialSplitDuration the initial split duration for sendTime 
dimension splits
+     */
+    ListHL7v2Messages(
+        ValueProvider<List<String>> hl7v2Stores,
+        ValueProvider<String> filter,
+        Duration initialSplitDuration) {
+      this.hl7v2Stores = hl7v2Stores;
+      this.filter = filter;
+      this.initialSplitDuration = initialSplitDuration;
     }
 
+    /**
+     * Instantiates a new List hl7v2 messages.
+     *
+     * @param hl7v2Stores the hl7v2 stores
+     */
     ListHL7v2Messages(ValueProvider<List<String>> hl7v2Stores) {
-      this.hl7v2Stores = hl7v2Stores.get();
+      this.hl7v2Stores = hl7v2Stores;
       this.filter = null;
     }
 
+    /**
+     * Instantiates a new List hl7v2 messages.
+     *
+     * @param hl7v2Stores the hl7v2 stores
+     * @param initialSplitDuration the initial split duration
+     */
+    ListHL7v2Messages(ValueProvider<List<String>> hl7v2Stores, Duration 
initialSplitDuration) {
+      this.hl7v2Stores = hl7v2Stores;
+      this.initialSplitDuration = initialSplitDuration;
+    }
+
     @Override
     public PCollection<HL7v2Message> expand(PBegin input) {
       return input
-          .apply(Create.of(this.hl7v2Stores))
-          .apply(ParDo.of(new ListHL7v2MessagesFn(this.filter)))
+          .apply(Create.ofProvider(this.hl7v2Stores, 
ListCoder.of(StringUtf8Coder.of())))
+          .apply(FlatMapElements.into(TypeDescriptors.strings()).via((x) -> x))
+          .apply(ParDo.of(new ListHL7v2MessagesFn(this.filter, 
initialSplitDuration)))
           .setCoder(new HL7v2MessageCoder())
           // Break fusion to encourage parallelization of downstream 
processing.
           .apply(Reshuffle.viaRandomKey());
     }
   }
 
+  /**
+   * Implemented as Splitable DoFn that claims millisecond resolutions of 
offset restrictions in the
+   * Message.sendTime dimension.
+   */
+  @BoundedPerElement
   static class ListHL7v2MessagesFn extends DoFn<String, HL7v2Message> {
 
-    private final String filter;
+    private static final Logger LOG = 
LoggerFactory.getLogger(ListHL7v2MessagesFn.class);
+    private ValueProvider<String> filter;
+    // These control the initial restriction split which means that the list 
of integer pairs
+    // must comfortably fit in memory.
+    private static final Duration DEFAULT_DESIRED_SPLIT_DURATION = 
Duration.standardDays(1);
+    private static final Duration DEFAULT_MIN_SPLIT_DURATION = 
Duration.standardHours(1);

Review comment:
       nit: Might want to group your statics at the top together separate from 
the member variables.

##########
File path: 
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/HL7v2IO.java
##########
@@ -472,24 +551,118 @@ public void initClient() throws IOException {
       this.client = new HttpHealthcareApiClient();
     }
 
+    @GetInitialRestriction
+    public OffsetRange getEarliestToLatestRestriction(@Element String 
hl7v2Store)
+        throws IOException {
+      from = this.client.getEarliestHL7v2SendTime(hl7v2Store, 
this.filter.get());
+      // filters are [from, to) to match logic of OffsetRangeTracker but need 
latest element to be
+      // included in results set to add an extra ms to the upper bound.
+      to = this.client.getLatestHL7v2SendTime(hl7v2Store, 
this.filter.get()).plus(1);
+      return new OffsetRange(from.getMillis(), to.getMillis());
+    }
+
+    @NewTracker
+    public OffsetRangeTracker newTracker(@Restriction OffsetRange timeRange) {
+      return timeRange.newTracker();
+    }
+
+    @SplitRestriction
+    public void split(@Restriction OffsetRange timeRange, 
OutputReceiver<OffsetRange> out) {
+      List<OffsetRange> splits =
+          timeRange.split(initialSplitDuration.getMillis(), 
DEFAULT_MIN_SPLIT_DURATION.getMillis());
+      Instant from = Instant.ofEpochMilli(timeRange.getFrom());
+      Instant to = Instant.ofEpochMilli(timeRange.getTo());
+      Duration totalDuration = new Duration(from, to);
+      LOG.info(
+          String.format(
+              "splitting initial sendTime restriction of [minSendTime, now): 
[%s,%s), "
+                  + "or [%s, %s). \n"
+                  + "total days: %s \n"
+                  + "into %s splits. \n"
+                  + "Last split: %s",
+              from,
+              to,
+              timeRange.getFrom(),
+              timeRange.getTo(),
+              totalDuration.getStandardDays(),
+              splits.size(),
+              splits.get(splits.size() - 1).toString()));
+
+      for (OffsetRange s : splits) {
+        out.output(s);
+      }
+    }
+
     /**
      * List messages.
      *
-     * @param context the context
+     * @param hl7v2Store the HL7v2 store to list messages from
      * @throws IOException the io exception
      */
     @ProcessElement
-    public void listMessages(ProcessContext context) throws IOException {
-      String hl7v2Store = context.element();
-      // Output all elements of all pages.
+    public void listMessages(
+        @Element String hl7v2Store,
+        RestrictionTracker tracker,
+        OutputReceiver<HL7v2Message> outputReceiver)
+        throws IOException {
+      OffsetRange currentRestriction = (OffsetRange) 
tracker.currentRestriction();
+      Instant startRestriction = 
Instant.ofEpochMilli(currentRestriction.getFrom());
+      Instant endRestriction = 
Instant.ofEpochMilli(currentRestriction.getTo());
       HttpHealthcareApiClient.HL7v2MessagePages pages =
-          new HttpHealthcareApiClient.HL7v2MessagePages(client, hl7v2Store, 
this.filter);
+          new HttpHealthcareApiClient.HL7v2MessagePages(
+              client, hl7v2Store, startRestriction, endRestriction, 
filter.get(), "sendTime");
       long reqestTime = Instant.now().getMillis();
-      for (Stream<HL7v2Message> page : pages) {
+      long lastClaimedMilliSecond;
+      Instant cursor;
+      boolean hangingClaim = false; // flag if the claimed ms spans spills 
over to the next page.
+      for (List<HL7v2Message> page : pages) { // loop over pages.
+        int i = 0;
+        HL7v2Message msg = page.get(i);
+        while (i < page.size()) { // loop over messages in page
+          cursor = Instant.parse(msg.getSendTime());
+          lastClaimedMilliSecond = cursor.getMillis();
+          LOG.info(
+              String.format(
+                  "initial claim for page %s lastClaimedMilliSecond = %s",
+                  i, lastClaimedMilliSecond));
+          if (hangingClaim || tracker.tryClaim(lastClaimedMilliSecond)) {
+            // This means we have claimed an entire millisecond we need to 
make sure that we
+            // process all messages for this millisecond because sendTime is 
allegedly nano second
+            // resolution.
+            // 
https://cloud.google.com/healthcare/docs/reference/rest/v1beta1/projects.locations.datasets.hl7V2Stores.messages#Message
+            while (cursor.getMillis() == lastClaimedMilliSecond
+                && i < page.size()) { // loop over messages in millisecond.
+              outputReceiver.output(msg);

Review comment:
       Should we be outputting elements with the timestamp of the message and 
should we be reporting a watermark?
   
   Even though you have a bounded SDF, it could be useful to report the 
watermark incase it is used in a streaming pipeline or users wanted to assign 
windows and perform grouping per window.
   
   The current logic will assign the input's timestamp to all outputs which 
won't allow users to use windowing to effectively window the elements being 
output without assigning timestamps themselves. If we do want to go down this 
path it is simple right now because this transform always starts with PBegin 
but what would you want to do it the timestamp of the record is before the 
timestamp of the input element to the SDF (since it is illegal to output 
messages with timestamps before the input elements timestamp)?
   
   To add watermark tracking based on timestamp of elements output, you would 
need to add the implementation for `@GetInitialWatermarkEstimatorState` and 
`@NewWatermarkEstimator` as seen in 
https://github.com/apache/beam/blob/27656d74fa9fb7085e2532275c821c3601c3f4b7/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Watch.java#L763
   
   
   
   

##########
File path: 
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/HL7v2IO.java
##########
@@ -472,24 +551,118 @@ public void initClient() throws IOException {
       this.client = new HttpHealthcareApiClient();
     }
 
+    @GetInitialRestriction
+    public OffsetRange getEarliestToLatestRestriction(@Element String 
hl7v2Store)
+        throws IOException {
+      from = this.client.getEarliestHL7v2SendTime(hl7v2Store, 
this.filter.get());
+      // filters are [from, to) to match logic of OffsetRangeTracker but need 
latest element to be
+      // included in results set to add an extra ms to the upper bound.
+      to = this.client.getLatestHL7v2SendTime(hl7v2Store, 
this.filter.get()).plus(1);
+      return new OffsetRange(from.getMillis(), to.getMillis());
+    }
+
+    @NewTracker
+    public OffsetRangeTracker newTracker(@Restriction OffsetRange timeRange) {
+      return timeRange.newTracker();
+    }
+

Review comment:
       This method is not necessary since `OffsetRange` supports 
`HasDefaultTracker` which your effectively invoking yourself.
   ```suggestion
   ```

##########
File path: 
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/HL7v2IO.java
##########
@@ -415,10 +423,29 @@ private Message fetchMessage(HealthcareApiClient client, 
String msgId)
     }
   }
 
-  /** List HL7v2 messages in HL7v2 Stores with optional filter. */
+  /**
+   * List HL7v2 messages in HL7v2 Stores with optional filter.
+   *
+   * <p>This transform is optimized for dynamic splitting of message.list 
calls for large batches of

Review comment:
       Consider using `{@code ...}` when referring to code and `{@link ...} for 
thinks you can directly link against.
   
   ```suggestion
      * <p>This transform is optimized for dynamic splitting of {@code 
message.list} calls for large batches of
   ```

##########
File path: 
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/HL7v2IO.java
##########
@@ -415,10 +423,29 @@ private Message fetchMessage(HealthcareApiClient client, 
String msgId)
     }
   }
 
-  /** List HL7v2 messages in HL7v2 Stores with optional filter. */
+  /**
+   * List HL7v2 messages in HL7v2 Stores with optional filter.
+   *
+   * <p>This transform is optimized for dynamic splitting of message.list 
calls for large batches of
+   * historical data and assumes rather continuous stream of sendTimes. It 
will dynamically
+   * rebalance resources to handle "peak traffic times" but will waste 
resources if there are large
+   * durations (days) of the sendTime dimension without data.
+   *
+   * <p>Implementation includes overhead for: 1. two api calls to determine 
the min/max sendTime of
+   * the HL7v2 store at invocation time. 2. initial splitting into 
non-overlapping time ranges
+   * (default daily) to achieve parallelization in separate messages.list 
calls.

Review comment:
       I'm not sure if the users need to know the exact implementation details 
as this may lock future maintainers into meeting these goals even when they can 
produce a more efficient solution in the future.

##########
File path: 
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/HL7v2IO.java
##########
@@ -415,10 +423,29 @@ private Message fetchMessage(HealthcareApiClient client, 
String msgId)
     }
   }
 
-  /** List HL7v2 messages in HL7v2 Stores with optional filter. */
+  /**
+   * List HL7v2 messages in HL7v2 Stores with optional filter.
+   *
+   * <p>This transform is optimized for dynamic splitting of message.list 
calls for large batches of
+   * historical data and assumes rather continuous stream of sendTimes. It 
will dynamically
+   * rebalance resources to handle "peak traffic times" but will waste 
resources if there are large
+   * durations (days) of the sendTime dimension without data.
+   *
+   * <p>Implementation includes overhead for: 1. two api calls to determine 
the min/max sendTime of
+   * the HL7v2 store at invocation time. 2. initial splitting into 
non-overlapping time ranges
+   * (default daily) to achieve parallelization in separate messages.list 
calls.
+   *
+   * <p>This will make more queries than necessary when used with very small 
data sets. (or very
+   * sparse data sets in the sendTime dimension).

Review comment:
       ```suggestion
      * sparse data sets in the {@code sendTime} dimension).
   ```

##########
File path: 
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/HL7v2IO.java
##########
@@ -415,10 +423,29 @@ private Message fetchMessage(HealthcareApiClient client, 
String msgId)
     }
   }
 
-  /** List HL7v2 messages in HL7v2 Stores with optional filter. */
+  /**
+   * List HL7v2 messages in HL7v2 Stores with optional filter.
+   *
+   * <p>This transform is optimized for dynamic splitting of message.list 
calls for large batches of
+   * historical data and assumes rather continuous stream of sendTimes. It 
will dynamically
+   * rebalance resources to handle "peak traffic times" but will waste 
resources if there are large
+   * durations (days) of the sendTime dimension without data.
+   *
+   * <p>Implementation includes overhead for: 1. two api calls to determine 
the min/max sendTime of
+   * the HL7v2 store at invocation time. 2. initial splitting into 
non-overlapping time ranges
+   * (default daily) to achieve parallelization in separate messages.list 
calls.
+   *
+   * <p>This will make more queries than necessary when used with very small 
data sets. (or very
+   * sparse data sets in the sendTime dimension).
+   *
+   * <p>If you have large but sparse data (e.g. hours between consecutive 
message sendTimes) and
+   * know something about the time ranges where you have no data, consider 
using multiple instances
+   * of this transform specifying sendTime filters to omit the ranges where 
there is no data.

Review comment:
       I don't think we want people to do this since empty splits are not that 
expensive and will quickly clear out a block of work.

##########
File path: 
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/HL7v2IO.java
##########
@@ -427,29 +454,75 @@ private Message fetchMessage(HealthcareApiClient client, 
String msgId)
      * @param filter the filter
      */
     ListHL7v2Messages(ValueProvider<List<String>> hl7v2Stores, 
ValueProvider<String> filter) {
-      this.hl7v2Stores = hl7v2Stores.get();
-      this.filter = filter.get();
+      this.hl7v2Stores = hl7v2Stores;
+      this.filter = filter;
+    }
+
+    /**
+     * Instantiates a new List hl 7 v 2 messages.
+     *
+     * @param hl7v2Stores the hl 7 v 2 stores
+     * @param filter the filter
+     * @param initialSplitDuration the initial split duration for sendTime 
dimension splits
+     */
+    ListHL7v2Messages(
+        ValueProvider<List<String>> hl7v2Stores,
+        ValueProvider<String> filter,
+        Duration initialSplitDuration) {
+      this.hl7v2Stores = hl7v2Stores;
+      this.filter = filter;
+      this.initialSplitDuration = initialSplitDuration;
     }
 
+    /**
+     * Instantiates a new List hl7v2 messages.
+     *
+     * @param hl7v2Stores the hl7v2 stores
+     */
     ListHL7v2Messages(ValueProvider<List<String>> hl7v2Stores) {
-      this.hl7v2Stores = hl7v2Stores.get();
+      this.hl7v2Stores = hl7v2Stores;
       this.filter = null;
     }
 
+    /**
+     * Instantiates a new List hl7v2 messages.
+     *
+     * @param hl7v2Stores the hl7v2 stores
+     * @param initialSplitDuration the initial split duration
+     */
+    ListHL7v2Messages(ValueProvider<List<String>> hl7v2Stores, Duration 
initialSplitDuration) {
+      this.hl7v2Stores = hl7v2Stores;
+      this.initialSplitDuration = initialSplitDuration;
+    }
+
     @Override
     public PCollection<HL7v2Message> expand(PBegin input) {
       return input
-          .apply(Create.of(this.hl7v2Stores))
-          .apply(ParDo.of(new ListHL7v2MessagesFn(this.filter)))
+          .apply(Create.ofProvider(this.hl7v2Stores, 
ListCoder.of(StringUtf8Coder.of())))
+          .apply(FlatMapElements.into(TypeDescriptors.strings()).via((x) -> x))
+          .apply(ParDo.of(new ListHL7v2MessagesFn(this.filter, 
initialSplitDuration)))
           .setCoder(new HL7v2MessageCoder())
           // Break fusion to encourage parallelization of downstream 
processing.
           .apply(Reshuffle.viaRandomKey());
     }
   }
 
+  /**
+   * Implemented as Splitable DoFn that claims millisecond resolutions of 
offset restrictions in the
+   * Message.sendTime dimension.
+   */
+  @BoundedPerElement
   static class ListHL7v2MessagesFn extends DoFn<String, HL7v2Message> {

Review comment:
       ```suggestion
     @VisibleForTesting
     static class ListHL7v2MessagesFn extends DoFn<String, HL7v2Message> {
   ```

##########
File path: 
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/HL7v2IO.java
##########
@@ -459,7 +532,13 @@ private Message fetchMessage(HealthcareApiClient client, 
String msgId)
      * @param filter the filter
      */
     ListHL7v2MessagesFn(String filter) {
+      new ListHL7v2MessagesFn(StaticValueProvider.of(filter), null);

Review comment:
       
https://stackoverflow.com/questions/285177/how-do-i-call-one-constructor-from-another-in-java
   ```suggestion
         this(StaticValueProvider.of(filter), null);
   ```

##########
File path: 
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/HL7v2IO.java
##########
@@ -472,24 +551,118 @@ public void initClient() throws IOException {
       this.client = new HttpHealthcareApiClient();
     }
 
+    @GetInitialRestriction
+    public OffsetRange getEarliestToLatestRestriction(@Element String 
hl7v2Store)
+        throws IOException {
+      from = this.client.getEarliestHL7v2SendTime(hl7v2Store, 
this.filter.get());
+      // filters are [from, to) to match logic of OffsetRangeTracker but need 
latest element to be
+      // included in results set to add an extra ms to the upper bound.
+      to = this.client.getLatestHL7v2SendTime(hl7v2Store, 
this.filter.get()).plus(1);
+      return new OffsetRange(from.getMillis(), to.getMillis());
+    }
+
+    @NewTracker
+    public OffsetRangeTracker newTracker(@Restriction OffsetRange timeRange) {
+      return timeRange.newTracker();
+    }
+
+    @SplitRestriction
+    public void split(@Restriction OffsetRange timeRange, 
OutputReceiver<OffsetRange> out) {
+      List<OffsetRange> splits =
+          timeRange.split(initialSplitDuration.getMillis(), 
DEFAULT_MIN_SPLIT_DURATION.getMillis());
+      Instant from = Instant.ofEpochMilli(timeRange.getFrom());
+      Instant to = Instant.ofEpochMilli(timeRange.getTo());
+      Duration totalDuration = new Duration(from, to);
+      LOG.info(
+          String.format(
+              "splitting initial sendTime restriction of [minSendTime, now): 
[%s,%s), "
+                  + "or [%s, %s). \n"
+                  + "total days: %s \n"
+                  + "into %s splits. \n"
+                  + "Last split: %s",
+              from,
+              to,
+              timeRange.getFrom(),
+              timeRange.getTo(),
+              totalDuration.getStandardDays(),
+              splits.size(),
+              splits.get(splits.size() - 1).toString()));
+
+      for (OffsetRange s : splits) {
+        out.output(s);
+      }
+    }
+
     /**
      * List messages.
      *
-     * @param context the context
+     * @param hl7v2Store the HL7v2 store to list messages from
      * @throws IOException the io exception
      */
     @ProcessElement
-    public void listMessages(ProcessContext context) throws IOException {
-      String hl7v2Store = context.element();
-      // Output all elements of all pages.
+    public void listMessages(
+        @Element String hl7v2Store,
+        RestrictionTracker tracker,

Review comment:
       ```suggestion
           RestrictionTracker<OffsetRange, Long> tracker,
   ```

##########
File path: 
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/HL7v2IO.java
##########
@@ -472,24 +551,118 @@ public void initClient() throws IOException {
       this.client = new HttpHealthcareApiClient();
     }
 
+    @GetInitialRestriction
+    public OffsetRange getEarliestToLatestRestriction(@Element String 
hl7v2Store)
+        throws IOException {
+      from = this.client.getEarliestHL7v2SendTime(hl7v2Store, 
this.filter.get());
+      // filters are [from, to) to match logic of OffsetRangeTracker but need 
latest element to be
+      // included in results set to add an extra ms to the upper bound.
+      to = this.client.getLatestHL7v2SendTime(hl7v2Store, 
this.filter.get()).plus(1);
+      return new OffsetRange(from.getMillis(), to.getMillis());
+    }
+
+    @NewTracker
+    public OffsetRangeTracker newTracker(@Restriction OffsetRange timeRange) {
+      return timeRange.newTracker();
+    }
+
+    @SplitRestriction
+    public void split(@Restriction OffsetRange timeRange, 
OutputReceiver<OffsetRange> out) {
+      List<OffsetRange> splits =
+          timeRange.split(initialSplitDuration.getMillis(), 
DEFAULT_MIN_SPLIT_DURATION.getMillis());
+      Instant from = Instant.ofEpochMilli(timeRange.getFrom());
+      Instant to = Instant.ofEpochMilli(timeRange.getTo());
+      Duration totalDuration = new Duration(from, to);
+      LOG.info(
+          String.format(
+              "splitting initial sendTime restriction of [minSendTime, now): 
[%s,%s), "
+                  + "or [%s, %s). \n"
+                  + "total days: %s \n"
+                  + "into %s splits. \n"
+                  + "Last split: %s",
+              from,
+              to,
+              timeRange.getFrom(),
+              timeRange.getTo(),
+              totalDuration.getStandardDays(),
+              splits.size(),
+              splits.get(splits.size() - 1).toString()));
+
+      for (OffsetRange s : splits) {
+        out.output(s);
+      }
+    }
+
     /**
      * List messages.
      *
-     * @param context the context
+     * @param hl7v2Store the HL7v2 store to list messages from
      * @throws IOException the io exception
      */
     @ProcessElement
-    public void listMessages(ProcessContext context) throws IOException {
-      String hl7v2Store = context.element();
-      // Output all elements of all pages.
+    public void listMessages(
+        @Element String hl7v2Store,
+        RestrictionTracker tracker,
+        OutputReceiver<HL7v2Message> outputReceiver)
+        throws IOException {
+      OffsetRange currentRestriction = (OffsetRange) 
tracker.currentRestriction();
+      Instant startRestriction = 
Instant.ofEpochMilli(currentRestriction.getFrom());
+      Instant endRestriction = 
Instant.ofEpochMilli(currentRestriction.getTo());
       HttpHealthcareApiClient.HL7v2MessagePages pages =
-          new HttpHealthcareApiClient.HL7v2MessagePages(client, hl7v2Store, 
this.filter);
+          new HttpHealthcareApiClient.HL7v2MessagePages(
+              client, hl7v2Store, startRestriction, endRestriction, 
filter.get(), "sendTime");
       long reqestTime = Instant.now().getMillis();
-      for (Stream<HL7v2Message> page : pages) {
+      long lastClaimedMilliSecond;
+      Instant cursor;
+      boolean hangingClaim = false; // flag if the claimed ms spans spills 
over to the next page.
+      for (List<HL7v2Message> page : pages) { // loop over pages.
+        int i = 0;
+        HL7v2Message msg = page.get(i);
+        while (i < page.size()) { // loop over messages in page
+          cursor = Instant.parse(msg.getSendTime());
+          lastClaimedMilliSecond = cursor.getMillis();
+          LOG.info(
+              String.format(
+                  "initial claim for page %s lastClaimedMilliSecond = %s",
+                  i, lastClaimedMilliSecond));
+          if (hangingClaim || tracker.tryClaim(lastClaimedMilliSecond)) {
+            // This means we have claimed an entire millisecond we need to 
make sure that we
+            // process all messages for this millisecond because sendTime is 
allegedly nano second
+            // resolution.
+            // 
https://cloud.google.com/healthcare/docs/reference/rest/v1beta1/projects.locations.datasets.hl7V2Stores.messages#Message
+            while (cursor.getMillis() == lastClaimedMilliSecond
+                && i < page.size()) { // loop over messages in millisecond.
+              outputReceiver.output(msg);
+              msg = page.get(i++);
+              cursor = Instant.parse(msg.getSendTime());
+            }
+
+            if (i == page.size() && cursor.getMillis() == 
lastClaimedMilliSecond) {
+              // reached the end of the page and timestamp still in the 
claimed ms.
+              hangingClaim = true;
+              continue;
+            }
+
+            // If reached this point, msg.sendTime is outside the current 
claim.
+            // Need to claim time range up to (and including) the cursor to 
properly advance the
+            // tracker.
+            tracker.tryClaim(cursor.getMillis());
+            lastClaimedMilliSecond = cursor.getMillis();
+            LOG.info(
+                String.format(
+                    "After claiming between messages lastClaimedMilliSecond = 
%s",
+                    lastClaimedMilliSecond));
+          }
+        }
         messageListingLatencyMs.update(Instant.now().getMillis() - reqestTime);

Review comment:
       It would sense to move this into wherever we do the list call.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to