pabloem commented on a change in pull request #11596: URL: https://github.com/apache/beam/pull/11596#discussion_r427546641
########## File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/HL7v2IO.java ########## @@ -472,24 +547,118 @@ public void initClient() throws IOException { this.client = new HttpHealthcareApiClient(); } + @GetInitialRestriction + public OffsetRange getEarliestToLatestRestriction(@Element String hl7v2Store) + throws IOException { + from = this.client.getEarliestHL7v2SendTime(hl7v2Store, this.filter); + // filters are [from, to) to match logic of OffsetRangeTracker but need latest element to be + // included in results set to add an extra ms to the upper bound. + to = this.client.getLatestHL7v2SendTime(hl7v2Store, this.filter).plus(1); + return new OffsetRange(from.getMillis(), to.getMillis()); + } + + @NewTracker + public OffsetRangeTracker newTracker(@Restriction OffsetRange timeRange) { + return timeRange.newTracker(); + } + + @SplitRestriction + public void split(@Restriction OffsetRange timeRange, OutputReceiver<OffsetRange> out) { + List<OffsetRange> splits = + timeRange.split(initialSplitDuration.getMillis(), DEFAULT_MIN_SPLIT_DURATION.getMillis()); + Instant from = Instant.ofEpochMilli(timeRange.getFrom()); + Instant to = Instant.ofEpochMilli(timeRange.getTo()); + Duration totalDuration = new Duration(from, to); + LOG.info( + String.format( + "splitting initial sendTime restriction of [minSendTime, now): [%s,%s), " + + "or [%s, %s). \n" + + "total days: %s \n" + + "into %s splits. \n" + + "Last split: %s", + from, + to, + timeRange.getFrom(), + timeRange.getTo(), + totalDuration.getStandardDays(), + splits.size(), + splits.get(splits.size() - 1).toString())); + + for (OffsetRange s : splits) { + out.output(s); + } + } + /** * List messages. * - * @param context the context + * @param hl7v2Store the HL7v2 store to list messages from * @throws IOException the io exception */ @ProcessElement - public void listMessages(ProcessContext context) throws IOException { - String hl7v2Store = context.element(); - // Output all elements of all pages. + public void listMessages( + @Element String hl7v2Store, + RestrictionTracker tracker, + OutputReceiver<HL7v2Message> outputReceiver) + throws IOException { + OffsetRange currentRestriction = (OffsetRange) tracker.currentRestriction(); + Instant startRestriction = Instant.ofEpochMilli(currentRestriction.getFrom()); + Instant endRestriction = Instant.ofEpochMilli(currentRestriction.getTo()); HttpHealthcareApiClient.HL7v2MessagePages pages = - new HttpHealthcareApiClient.HL7v2MessagePages(client, hl7v2Store, this.filter); + new HttpHealthcareApiClient.HL7v2MessagePages( + client, hl7v2Store, startRestriction, endRestriction, filter, "sendTime"); long reqestTime = Instant.now().getMillis(); - for (Stream<HL7v2Message> page : pages) { + long lastClaimedMilliSecond; + Instant cursor; + boolean hangingClaim = false; // flag if the claimed ms spans spills over to the next page. + for (List<HL7v2Message> page : pages) { // loop over pages. + int i = 0; + HL7v2Message msg = page.get(i); + while (i < page.size()) { // loop over messages in page + cursor = Instant.parse(msg.getSendTime()); + lastClaimedMilliSecond = cursor.getMillis(); + LOG.info( + String.format( + "initial claim for page %s lastClaimedMilliSecond = %s", + i, lastClaimedMilliSecond)); + if (hangingClaim || tracker.tryClaim(lastClaimedMilliSecond)) { Review comment: sgtm ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org