lukecwik commented on a change in pull request #11596:
URL: https://github.com/apache/beam/pull/11596#discussion_r428095390



##########
File path: 
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/HL7v2IO.java
##########
@@ -472,24 +523,77 @@ public void initClient() throws IOException {
       this.client = new HttpHealthcareApiClient();
     }
 
+    @GetInitialRestriction
+    public OffsetRange getEarliestToLatestRestriction(@Element String 
hl7v2Store)
+        throws IOException {
+      from = this.client.getEarliestHL7v2SendTime(hl7v2Store, 
this.filter.get());
+      // filters are [from, to) to match logic of OffsetRangeTracker but need 
latest element to be
+      // included in results set to add an extra ms to the upper bound.
+      to = this.client.getLatestHL7v2SendTime(hl7v2Store, 
this.filter.get()).plus(1);
+      return new OffsetRange(from.getMillis(), to.getMillis());
+    }
+
+    @SplitRestriction
+    public void split(@Restriction OffsetRange timeRange, 
OutputReceiver<OffsetRange> out) {
+      List<OffsetRange> splits =
+          timeRange.split(initialSplitDuration.getMillis(), 
DEFAULT_MIN_SPLIT_DURATION.getMillis());
+      Instant from = Instant.ofEpochMilli(timeRange.getFrom());
+      Instant to = Instant.ofEpochMilli(timeRange.getTo());
+      Duration totalDuration = new Duration(from, to);
+      LOG.info(
+          String.format(
+              "splitting initial sendTime restriction of [minSendTime, now): 
[%s,%s), "
+                  + "or [%s, %s). \n"
+                  + "total days: %s \n"
+                  + "into %s splits. \n"
+                  + "Last split: %s",
+              from,
+              to,
+              timeRange.getFrom(),
+              timeRange.getTo(),
+              totalDuration.getStandardDays(),
+              splits.size(),
+              splits.get(splits.size() - 1).toString()));
+
+      for (OffsetRange s : splits) {
+        out.output(s);
+      }
+    }
+
     /**
      * List messages.
      *
-     * @param context the context
+     * @param hl7v2Store the HL7v2 store to list messages from
      * @throws IOException the io exception
      */
     @ProcessElement
-    public void listMessages(ProcessContext context) throws IOException {
-      String hl7v2Store = context.element();
-      // Output all elements of all pages.
+    public void listMessages(
+        @Element String hl7v2Store,
+        RestrictionTracker<OffsetRange, Long> tracker,
+        OutputReceiver<HL7v2Message> outputReceiver)
+        throws IOException {
+      OffsetRange currentRestriction = (OffsetRange) 
tracker.currentRestriction();
+      Instant startRestriction = 
Instant.ofEpochMilli(currentRestriction.getFrom());
+      Instant endRestriction = 
Instant.ofEpochMilli(currentRestriction.getTo());
       HttpHealthcareApiClient.HL7v2MessagePages pages =
-          new HttpHealthcareApiClient.HL7v2MessagePages(client, hl7v2Store, 
this.filter);
-      long reqestTime = Instant.now().getMillis();
-      for (Stream<HL7v2Message> page : pages) {
-        messageListingLatencyMs.update(Instant.now().getMillis() - reqestTime);
-        page.forEach(context::output);
-        reqestTime = Instant.now().getMillis();
+          new HttpHealthcareApiClient.HL7v2MessagePages(
+              client, hl7v2Store, startRestriction, endRestriction, 
filter.get(), "sendTime");
+      Instant cursor;
+      long lastClaimedMilliSecond = startRestriction.getMillis() - 1;
+      for (HL7v2Message msg : FluentIterable.concat(pages)) {
+        cursor = Instant.parse(msg.getSendTime());
+        if (cursor.getMillis() > lastClaimedMilliSecond && 
tracker.tryClaim(cursor.getMillis())) {
+          lastClaimedMilliSecond = cursor.getMillis();
+        }
+
+        if (cursor.getMillis() == lastClaimedMilliSecond) { // loop over 
messages in millisecond.
+          outputReceiver.output(msg);
+        }

Review comment:
       ```suggestion
           if (cursor.getMillis() > lastClaimedMilliSecond) {
             // Return early after the first claim failure preventing us from 
iterating
             // through the remaining messages.
             if (!tracker.tryClaim(cursor.getMillis())) {
               return;
             }
             lastClaimedMilliSecond = cursor.getMillis();
           }
   
           outputReceiver.output(msg);
   ```

##########
File path: 
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/HL7v2IO.java
##########
@@ -472,24 +551,118 @@ public void initClient() throws IOException {
       this.client = new HttpHealthcareApiClient();
     }
 
+    @GetInitialRestriction
+    public OffsetRange getEarliestToLatestRestriction(@Element String 
hl7v2Store)
+        throws IOException {
+      from = this.client.getEarliestHL7v2SendTime(hl7v2Store, 
this.filter.get());
+      // filters are [from, to) to match logic of OffsetRangeTracker but need 
latest element to be
+      // included in results set to add an extra ms to the upper bound.
+      to = this.client.getLatestHL7v2SendTime(hl7v2Store, 
this.filter.get()).plus(1);
+      return new OffsetRange(from.getMillis(), to.getMillis());
+    }
+
+    @NewTracker
+    public OffsetRangeTracker newTracker(@Restriction OffsetRange timeRange) {
+      return timeRange.newTracker();
+    }
+
+    @SplitRestriction
+    public void split(@Restriction OffsetRange timeRange, 
OutputReceiver<OffsetRange> out) {
+      List<OffsetRange> splits =
+          timeRange.split(initialSplitDuration.getMillis(), 
DEFAULT_MIN_SPLIT_DURATION.getMillis());
+      Instant from = Instant.ofEpochMilli(timeRange.getFrom());
+      Instant to = Instant.ofEpochMilli(timeRange.getTo());
+      Duration totalDuration = new Duration(from, to);
+      LOG.info(
+          String.format(
+              "splitting initial sendTime restriction of [minSendTime, now): 
[%s,%s), "
+                  + "or [%s, %s). \n"
+                  + "total days: %s \n"
+                  + "into %s splits. \n"
+                  + "Last split: %s",
+              from,
+              to,
+              timeRange.getFrom(),
+              timeRange.getTo(),
+              totalDuration.getStandardDays(),
+              splits.size(),
+              splits.get(splits.size() - 1).toString()));
+
+      for (OffsetRange s : splits) {
+        out.output(s);
+      }
+    }
+
     /**
      * List messages.
      *
-     * @param context the context
+     * @param hl7v2Store the HL7v2 store to list messages from
      * @throws IOException the io exception
      */
     @ProcessElement
-    public void listMessages(ProcessContext context) throws IOException {
-      String hl7v2Store = context.element();
-      // Output all elements of all pages.
+    public void listMessages(
+        @Element String hl7v2Store,
+        RestrictionTracker tracker,
+        OutputReceiver<HL7v2Message> outputReceiver)
+        throws IOException {
+      OffsetRange currentRestriction = (OffsetRange) 
tracker.currentRestriction();
+      Instant startRestriction = 
Instant.ofEpochMilli(currentRestriction.getFrom());
+      Instant endRestriction = 
Instant.ofEpochMilli(currentRestriction.getTo());
       HttpHealthcareApiClient.HL7v2MessagePages pages =
-          new HttpHealthcareApiClient.HL7v2MessagePages(client, hl7v2Store, 
this.filter);
+          new HttpHealthcareApiClient.HL7v2MessagePages(
+              client, hl7v2Store, startRestriction, endRestriction, 
filter.get(), "sendTime");
       long reqestTime = Instant.now().getMillis();
-      for (Stream<HL7v2Message> page : pages) {
+      long lastClaimedMilliSecond;
+      Instant cursor;
+      boolean hangingClaim = false; // flag if the claimed ms spans spills 
over to the next page.
+      for (List<HL7v2Message> page : pages) { // loop over pages.
+        int i = 0;
+        HL7v2Message msg = page.get(i);
+        while (i < page.size()) { // loop over messages in page
+          cursor = Instant.parse(msg.getSendTime());
+          lastClaimedMilliSecond = cursor.getMillis();
+          LOG.info(
+              String.format(
+                  "initial claim for page %s lastClaimedMilliSecond = %s",
+                  i, lastClaimedMilliSecond));
+          if (hangingClaim || tracker.tryClaim(lastClaimedMilliSecond)) {
+            // This means we have claimed an entire millisecond we need to 
make sure that we
+            // process all messages for this millisecond because sendTime is 
allegedly nano second
+            // resolution.
+            // 
https://cloud.google.com/healthcare/docs/reference/rest/v1beta1/projects.locations.datasets.hl7V2Stores.messages#Message
+            while (cursor.getMillis() == lastClaimedMilliSecond
+                && i < page.size()) { // loop over messages in millisecond.
+              outputReceiver.output(msg);

Review comment:
       Yes we can defer since this would change what people are getting from 
the existing implementation and would likely require opt in to not break any 
existing users.
   
   If nobody depends on this transform yet then it would be wise to address it 
before adoption since a lot of users typically expect the output timestamp to 
match the record's source timestamp. This may not apply to this specific source 
and is dependent on what users expect so itis your judgement call.
   
   If you do go with changing the output timestamp, the watermark tracking 
would help for streaming pipelines since it would allow them to perform better. 
The current implementation would still produce correct results with or without 
and adding it later would be very safe (it may expose problems in pipelines 
that were already broken for other reasons).
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to