jaketf commented on a change in pull request #11596: URL: https://github.com/apache/beam/pull/11596#discussion_r427545617
########## File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/HL7v2IO.java ########## @@ -472,24 +547,118 @@ public void initClient() throws IOException { this.client = new HttpHealthcareApiClient(); } + @GetInitialRestriction + public OffsetRange getEarliestToLatestRestriction(@Element String hl7v2Store) + throws IOException { + from = this.client.getEarliestHL7v2SendTime(hl7v2Store, this.filter); + // filters are [from, to) to match logic of OffsetRangeTracker but need latest element to be + // included in results set to add an extra ms to the upper bound. + to = this.client.getLatestHL7v2SendTime(hl7v2Store, this.filter).plus(1); + return new OffsetRange(from.getMillis(), to.getMillis()); + } + + @NewTracker + public OffsetRangeTracker newTracker(@Restriction OffsetRange timeRange) { + return timeRange.newTracker(); + } + + @SplitRestriction + public void split(@Restriction OffsetRange timeRange, OutputReceiver<OffsetRange> out) { + List<OffsetRange> splits = + timeRange.split(initialSplitDuration.getMillis(), DEFAULT_MIN_SPLIT_DURATION.getMillis()); + Instant from = Instant.ofEpochMilli(timeRange.getFrom()); + Instant to = Instant.ofEpochMilli(timeRange.getTo()); + Duration totalDuration = new Duration(from, to); + LOG.info( + String.format( + "splitting initial sendTime restriction of [minSendTime, now): [%s,%s), " + + "or [%s, %s). \n" + + "total days: %s \n" + + "into %s splits. \n" + + "Last split: %s", + from, + to, + timeRange.getFrom(), + timeRange.getTo(), + totalDuration.getStandardDays(), + splits.size(), + splits.get(splits.size() - 1).toString())); + + for (OffsetRange s : splits) { + out.output(s); + } + } + /** * List messages. * - * @param context the context + * @param hl7v2Store the HL7v2 store to list messages from * @throws IOException the io exception */ @ProcessElement - public void listMessages(ProcessContext context) throws IOException { - String hl7v2Store = context.element(); - // Output all elements of all pages. + public void listMessages( + @Element String hl7v2Store, + RestrictionTracker tracker, + OutputReceiver<HL7v2Message> outputReceiver) + throws IOException { + OffsetRange currentRestriction = (OffsetRange) tracker.currentRestriction(); + Instant startRestriction = Instant.ofEpochMilli(currentRestriction.getFrom()); + Instant endRestriction = Instant.ofEpochMilli(currentRestriction.getTo()); HttpHealthcareApiClient.HL7v2MessagePages pages = - new HttpHealthcareApiClient.HL7v2MessagePages(client, hl7v2Store, this.filter); + new HttpHealthcareApiClient.HL7v2MessagePages( + client, hl7v2Store, startRestriction, endRestriction, filter, "sendTime"); long reqestTime = Instant.now().getMillis(); - for (Stream<HL7v2Message> page : pages) { + long lastClaimedMilliSecond; + Instant cursor; + boolean hangingClaim = false; // flag if the claimed ms spans spills over to the next page. + for (List<HL7v2Message> page : pages) { // loop over pages. + int i = 0; + HL7v2Message msg = page.get(i); + while (i < page.size()) { // loop over messages in page + cursor = Instant.parse(msg.getSendTime()); + lastClaimedMilliSecond = cursor.getMillis(); + LOG.info( + String.format( + "initial claim for page %s lastClaimedMilliSecond = %s", + i, lastClaimedMilliSecond)); + if (hangingClaim || tracker.tryClaim(lastClaimedMilliSecond)) { Review comment: tl;dr No that is not possible based on 629-640 `tryClaim` on 624 claims `lastClaimedMillisSecond` `tryClaim` on 645 claims the millisecond of the cursor (which is advanced based on the while loop in L629-633) if the cusor is not advanced to a new millisecond (this happens when many messages at the end of a page came in the same millisecond) the if block in L636-640 contains `continue` which will eagerly exit this iteration of the while loop before getting to the try claim on L645 again. I called this scenario "hangingClaim" because we cannot know if the first message(s) of the next page will also be from this millisecond. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org