lukecwik commented on a change in pull request #11596: URL: https://github.com/apache/beam/pull/11596#discussion_r427616856
########## File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/HL7v2IO.java ########## @@ -415,10 +423,29 @@ private Message fetchMessage(HealthcareApiClient client, String msgId) } } - /** List HL7v2 messages in HL7v2 Stores with optional filter. */ + /** + * List HL7v2 messages in HL7v2 Stores with optional filter. + * + * <p>This transform is optimized for dynamic splitting of message.list calls for large batches of + * historical data and assumes rather continuous stream of sendTimes. It will dynamically + * rebalance resources to handle "peak traffic times" but will waste resources if there are large + * durations (days) of the sendTime dimension without data. Review comment: wouldn' this just be a small amount of waste since we would effectively get an empty response? ########## File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/HL7v2IO.java ########## @@ -415,10 +423,29 @@ private Message fetchMessage(HealthcareApiClient client, String msgId) } } - /** List HL7v2 messages in HL7v2 Stores with optional filter. */ + /** + * List HL7v2 messages in HL7v2 Stores with optional filter. + * + * <p>This transform is optimized for dynamic splitting of message.list calls for large batches of + * historical data and assumes rather continuous stream of sendTimes. It will dynamically + * rebalance resources to handle "peak traffic times" but will waste resources if there are large + * durations (days) of the sendTime dimension without data. + * + * <p>Implementation includes overhead for: 1. two api calls to determine the min/max sendTime of Review comment: consider using `<ol>` and `<li>` tags in the javadoc for your ordered list ########## File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/HL7v2IO.java ########## @@ -415,10 +423,29 @@ private Message fetchMessage(HealthcareApiClient client, String msgId) } } - /** List HL7v2 messages in HL7v2 Stores with optional filter. */ + /** + * List HL7v2 messages in HL7v2 Stores with optional filter. + * + * <p>This transform is optimized for dynamic splitting of message.list calls for large batches of + * historical data and assumes rather continuous stream of sendTimes. It will dynamically + * rebalance resources to handle "peak traffic times" but will waste resources if there are large + * durations (days) of the sendTime dimension without data. + * + * <p>Implementation includes overhead for: 1. two api calls to determine the min/max sendTime of + * the HL7v2 store at invocation time. 2. initial splitting into non-overlapping time ranges + * (default daily) to achieve parallelization in separate messages.list calls. + * + * <p>This will make more queries than necessary when used with very small data sets. (or very + * sparse data sets in the sendTime dimension). + * + * <p>If you have large but sparse data (e.g. hours between consecutive message sendTimes) and + * know something about the time ranges where you have no data, consider using multiple instances + * of this transform specifying sendTime filters to omit the ranges where there is no data. + */ public static class ListHL7v2Messages extends PTransform<PBegin, PCollection<HL7v2Message>> { - private final List<String> hl7v2Stores; - private final String filter; + private final ValueProvider<List<String>> hl7v2Stores; + private ValueProvider<String> filter; + private Duration initialSplitDuration; Review comment: even if a member variable is null, it should still be final since it doesn't look like we mutate it locally. Same reason for other places I suggest to change this. ```suggestion private final ValueProvider<String> filter; private final Duration initialSplitDuration; ``` ########## File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/HL7v2IO.java ########## @@ -427,29 +454,75 @@ private Message fetchMessage(HealthcareApiClient client, String msgId) * @param filter the filter */ ListHL7v2Messages(ValueProvider<List<String>> hl7v2Stores, ValueProvider<String> filter) { - this.hl7v2Stores = hl7v2Stores.get(); - this.filter = filter.get(); + this.hl7v2Stores = hl7v2Stores; + this.filter = filter; + } + + /** + * Instantiates a new List hl 7 v 2 messages. + * + * @param hl7v2Stores the hl 7 v 2 stores + * @param filter the filter + * @param initialSplitDuration the initial split duration for sendTime dimension splits + */ + ListHL7v2Messages( + ValueProvider<List<String>> hl7v2Stores, + ValueProvider<String> filter, + Duration initialSplitDuration) { + this.hl7v2Stores = hl7v2Stores; + this.filter = filter; + this.initialSplitDuration = initialSplitDuration; } + /** + * Instantiates a new List hl7v2 messages. + * + * @param hl7v2Stores the hl7v2 stores + */ ListHL7v2Messages(ValueProvider<List<String>> hl7v2Stores) { - this.hl7v2Stores = hl7v2Stores.get(); + this.hl7v2Stores = hl7v2Stores; this.filter = null; } + /** + * Instantiates a new List hl7v2 messages. + * + * @param hl7v2Stores the hl7v2 stores + * @param initialSplitDuration the initial split duration + */ + ListHL7v2Messages(ValueProvider<List<String>> hl7v2Stores, Duration initialSplitDuration) { + this.hl7v2Stores = hl7v2Stores; + this.initialSplitDuration = initialSplitDuration; + } + @Override public PCollection<HL7v2Message> expand(PBegin input) { return input - .apply(Create.of(this.hl7v2Stores)) - .apply(ParDo.of(new ListHL7v2MessagesFn(this.filter))) + .apply(Create.ofProvider(this.hl7v2Stores, ListCoder.of(StringUtf8Coder.of()))) + .apply(FlatMapElements.into(TypeDescriptors.strings()).via((x) -> x)) + .apply(ParDo.of(new ListHL7v2MessagesFn(this.filter, initialSplitDuration))) .setCoder(new HL7v2MessageCoder()) // Break fusion to encourage parallelization of downstream processing. .apply(Reshuffle.viaRandomKey()); } } + /** + * Implemented as Splitable DoFn that claims millisecond resolutions of offset restrictions in the + * Message.sendTime dimension. + */ + @BoundedPerElement static class ListHL7v2MessagesFn extends DoFn<String, HL7v2Message> { - private final String filter; + private static final Logger LOG = LoggerFactory.getLogger(ListHL7v2MessagesFn.class); + private ValueProvider<String> filter; + // These control the initial restriction split which means that the list of integer pairs + // must comfortably fit in memory. + private static final Duration DEFAULT_DESIRED_SPLIT_DURATION = Duration.standardDays(1); + private static final Duration DEFAULT_MIN_SPLIT_DURATION = Duration.standardHours(1); + private Duration initialSplitDuration; + private Instant from; + private Instant to; Review comment: Can any of these be final? ########## File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/HL7v2IO.java ########## @@ -472,24 +547,118 @@ public void initClient() throws IOException { this.client = new HttpHealthcareApiClient(); } + @GetInitialRestriction + public OffsetRange getEarliestToLatestRestriction(@Element String hl7v2Store) + throws IOException { + from = this.client.getEarliestHL7v2SendTime(hl7v2Store, this.filter); + // filters are [from, to) to match logic of OffsetRangeTracker but need latest element to be + // included in results set to add an extra ms to the upper bound. + to = this.client.getLatestHL7v2SendTime(hl7v2Store, this.filter).plus(1); + return new OffsetRange(from.getMillis(), to.getMillis()); + } + + @NewTracker + public OffsetRangeTracker newTracker(@Restriction OffsetRange timeRange) { + return timeRange.newTracker(); + } + + @SplitRestriction + public void split(@Restriction OffsetRange timeRange, OutputReceiver<OffsetRange> out) { + List<OffsetRange> splits = + timeRange.split(initialSplitDuration.getMillis(), DEFAULT_MIN_SPLIT_DURATION.getMillis()); + Instant from = Instant.ofEpochMilli(timeRange.getFrom()); + Instant to = Instant.ofEpochMilli(timeRange.getTo()); + Duration totalDuration = new Duration(from, to); + LOG.info( + String.format( + "splitting initial sendTime restriction of [minSendTime, now): [%s,%s), " + + "or [%s, %s). \n" + + "total days: %s \n" + + "into %s splits. \n" + + "Last split: %s", + from, + to, + timeRange.getFrom(), + timeRange.getTo(), + totalDuration.getStandardDays(), + splits.size(), + splits.get(splits.size() - 1).toString())); + + for (OffsetRange s : splits) { + out.output(s); + } + } + /** * List messages. * - * @param context the context + * @param hl7v2Store the HL7v2 store to list messages from * @throws IOException the io exception */ @ProcessElement - public void listMessages(ProcessContext context) throws IOException { - String hl7v2Store = context.element(); - // Output all elements of all pages. + public void listMessages( + @Element String hl7v2Store, + RestrictionTracker tracker, + OutputReceiver<HL7v2Message> outputReceiver) + throws IOException { + OffsetRange currentRestriction = (OffsetRange) tracker.currentRestriction(); + Instant startRestriction = Instant.ofEpochMilli(currentRestriction.getFrom()); + Instant endRestriction = Instant.ofEpochMilli(currentRestriction.getTo()); HttpHealthcareApiClient.HL7v2MessagePages pages = - new HttpHealthcareApiClient.HL7v2MessagePages(client, hl7v2Store, this.filter); + new HttpHealthcareApiClient.HL7v2MessagePages( + client, hl7v2Store, startRestriction, endRestriction, filter, "sendTime"); long reqestTime = Instant.now().getMillis(); - for (Stream<HL7v2Message> page : pages) { + long lastClaimedMilliSecond; + Instant cursor; + boolean hangingClaim = false; // flag if the claimed ms spans spills over to the next page. + for (List<HL7v2Message> page : pages) { // loop over pages. + int i = 0; + HL7v2Message msg = page.get(i); + while (i < page.size()) { // loop over messages in page + cursor = Instant.parse(msg.getSendTime()); + lastClaimedMilliSecond = cursor.getMillis(); + LOG.info( + String.format( + "initial claim for page %s lastClaimedMilliSecond = %s", + i, lastClaimedMilliSecond)); + if (hangingClaim || tracker.tryClaim(lastClaimedMilliSecond)) { + // This means we have claimed an entire millisecond we need to make sure that we + // process all messages for this millisecond because sendTime is allegedly nano second + // resolution. + // https://cloud.google.com/healthcare/docs/reference/rest/v1beta1/projects.locations.datasets.hl7V2Stores.messages#Message + while (cursor.getMillis() == lastClaimedMilliSecond + && i < page.size()) { // loop over messages in millisecond. + outputReceiver.output(msg); + msg = page.get(i++); Review comment: I agree with @pabloem that this logic is not simple to follow and I think you could really simplify your code if you used https://guava.dev/releases/21.0/api/docs/com/google/common/collect/FluentIterable.html#concat-java.lang.Iterable- since it would convert `Iterable<List<HL7v2Message>>` into `Iterable<HL7v2Message>` and only accesses the elements lazily so it wouldn't prefetch everything. This would allow you to not worry that `messages` are in a `page` and your processing multiple `pages`. ########## File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/HL7v2IO.java ########## @@ -427,29 +454,75 @@ private Message fetchMessage(HealthcareApiClient client, String msgId) * @param filter the filter */ ListHL7v2Messages(ValueProvider<List<String>> hl7v2Stores, ValueProvider<String> filter) { - this.hl7v2Stores = hl7v2Stores.get(); - this.filter = filter.get(); + this.hl7v2Stores = hl7v2Stores; + this.filter = filter; + } + + /** + * Instantiates a new List hl 7 v 2 messages. + * + * @param hl7v2Stores the hl 7 v 2 stores + * @param filter the filter + * @param initialSplitDuration the initial split duration for sendTime dimension splits + */ + ListHL7v2Messages( + ValueProvider<List<String>> hl7v2Stores, + ValueProvider<String> filter, + Duration initialSplitDuration) { + this.hl7v2Stores = hl7v2Stores; + this.filter = filter; + this.initialSplitDuration = initialSplitDuration; } + /** + * Instantiates a new List hl7v2 messages. + * + * @param hl7v2Stores the hl7v2 stores + */ ListHL7v2Messages(ValueProvider<List<String>> hl7v2Stores) { - this.hl7v2Stores = hl7v2Stores.get(); + this.hl7v2Stores = hl7v2Stores; this.filter = null; } + /** + * Instantiates a new List hl7v2 messages. + * + * @param hl7v2Stores the hl7v2 stores + * @param initialSplitDuration the initial split duration + */ + ListHL7v2Messages(ValueProvider<List<String>> hl7v2Stores, Duration initialSplitDuration) { + this.hl7v2Stores = hl7v2Stores; + this.initialSplitDuration = initialSplitDuration; + } + @Override public PCollection<HL7v2Message> expand(PBegin input) { return input - .apply(Create.of(this.hl7v2Stores)) - .apply(ParDo.of(new ListHL7v2MessagesFn(this.filter))) + .apply(Create.ofProvider(this.hl7v2Stores, ListCoder.of(StringUtf8Coder.of()))) + .apply(FlatMapElements.into(TypeDescriptors.strings()).via((x) -> x)) + .apply(ParDo.of(new ListHL7v2MessagesFn(this.filter, initialSplitDuration))) .setCoder(new HL7v2MessageCoder()) // Break fusion to encourage parallelization of downstream processing. .apply(Reshuffle.viaRandomKey()); } } + /** + * Implemented as Splitable DoFn that claims millisecond resolutions of offset restrictions in the + * Message.sendTime dimension. + */ + @BoundedPerElement static class ListHL7v2MessagesFn extends DoFn<String, HL7v2Message> { - private final String filter; + private static final Logger LOG = LoggerFactory.getLogger(ListHL7v2MessagesFn.class); + private ValueProvider<String> filter; + // These control the initial restriction split which means that the list of integer pairs + // must comfortably fit in memory. + private static final Duration DEFAULT_DESIRED_SPLIT_DURATION = Duration.standardDays(1); + private static final Duration DEFAULT_MIN_SPLIT_DURATION = Duration.standardHours(1); Review comment: nit: Might want to group your statics at the top together separate from the member variables. ########## File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/HL7v2IO.java ########## @@ -472,24 +551,118 @@ public void initClient() throws IOException { this.client = new HttpHealthcareApiClient(); } + @GetInitialRestriction + public OffsetRange getEarliestToLatestRestriction(@Element String hl7v2Store) + throws IOException { + from = this.client.getEarliestHL7v2SendTime(hl7v2Store, this.filter.get()); + // filters are [from, to) to match logic of OffsetRangeTracker but need latest element to be + // included in results set to add an extra ms to the upper bound. + to = this.client.getLatestHL7v2SendTime(hl7v2Store, this.filter.get()).plus(1); + return new OffsetRange(from.getMillis(), to.getMillis()); + } + + @NewTracker + public OffsetRangeTracker newTracker(@Restriction OffsetRange timeRange) { + return timeRange.newTracker(); + } + + @SplitRestriction + public void split(@Restriction OffsetRange timeRange, OutputReceiver<OffsetRange> out) { + List<OffsetRange> splits = + timeRange.split(initialSplitDuration.getMillis(), DEFAULT_MIN_SPLIT_DURATION.getMillis()); + Instant from = Instant.ofEpochMilli(timeRange.getFrom()); + Instant to = Instant.ofEpochMilli(timeRange.getTo()); + Duration totalDuration = new Duration(from, to); + LOG.info( + String.format( + "splitting initial sendTime restriction of [minSendTime, now): [%s,%s), " + + "or [%s, %s). \n" + + "total days: %s \n" + + "into %s splits. \n" + + "Last split: %s", + from, + to, + timeRange.getFrom(), + timeRange.getTo(), + totalDuration.getStandardDays(), + splits.size(), + splits.get(splits.size() - 1).toString())); + + for (OffsetRange s : splits) { + out.output(s); + } + } + /** * List messages. * - * @param context the context + * @param hl7v2Store the HL7v2 store to list messages from * @throws IOException the io exception */ @ProcessElement - public void listMessages(ProcessContext context) throws IOException { - String hl7v2Store = context.element(); - // Output all elements of all pages. + public void listMessages( + @Element String hl7v2Store, + RestrictionTracker tracker, + OutputReceiver<HL7v2Message> outputReceiver) + throws IOException { + OffsetRange currentRestriction = (OffsetRange) tracker.currentRestriction(); + Instant startRestriction = Instant.ofEpochMilli(currentRestriction.getFrom()); + Instant endRestriction = Instant.ofEpochMilli(currentRestriction.getTo()); HttpHealthcareApiClient.HL7v2MessagePages pages = - new HttpHealthcareApiClient.HL7v2MessagePages(client, hl7v2Store, this.filter); + new HttpHealthcareApiClient.HL7v2MessagePages( + client, hl7v2Store, startRestriction, endRestriction, filter.get(), "sendTime"); long reqestTime = Instant.now().getMillis(); - for (Stream<HL7v2Message> page : pages) { + long lastClaimedMilliSecond; + Instant cursor; + boolean hangingClaim = false; // flag if the claimed ms spans spills over to the next page. + for (List<HL7v2Message> page : pages) { // loop over pages. + int i = 0; + HL7v2Message msg = page.get(i); + while (i < page.size()) { // loop over messages in page + cursor = Instant.parse(msg.getSendTime()); + lastClaimedMilliSecond = cursor.getMillis(); + LOG.info( + String.format( + "initial claim for page %s lastClaimedMilliSecond = %s", + i, lastClaimedMilliSecond)); + if (hangingClaim || tracker.tryClaim(lastClaimedMilliSecond)) { + // This means we have claimed an entire millisecond we need to make sure that we + // process all messages for this millisecond because sendTime is allegedly nano second + // resolution. + // https://cloud.google.com/healthcare/docs/reference/rest/v1beta1/projects.locations.datasets.hl7V2Stores.messages#Message + while (cursor.getMillis() == lastClaimedMilliSecond + && i < page.size()) { // loop over messages in millisecond. + outputReceiver.output(msg); Review comment: Should we be outputting elements with the timestamp of the message and should we be reporting a watermark? Even though you have a bounded SDF, it could be useful to report the watermark incase it is used in a streaming pipeline or users wanted to assign windows and perform grouping per window. The current logic will assign the input's timestamp to all outputs which won't allow users to use windowing to effectively window the elements being output without assigning timestamps themselves. If we do want to go down this path it is simple right now because this transform always starts with PBegin but what would you want to do it the timestamp of the record is before the timestamp of the input element to the SDF (since it is illegal to output messages with timestamps before the input elements timestamp)? To add watermark tracking based on timestamp of elements output, you would need to add the implementation for `@GetInitialWatermarkEstimatorState` and `@NewWatermarkEstimator` as seen in https://github.com/apache/beam/blob/27656d74fa9fb7085e2532275c821c3601c3f4b7/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Watch.java#L763 ########## File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/HL7v2IO.java ########## @@ -472,24 +551,118 @@ public void initClient() throws IOException { this.client = new HttpHealthcareApiClient(); } + @GetInitialRestriction + public OffsetRange getEarliestToLatestRestriction(@Element String hl7v2Store) + throws IOException { + from = this.client.getEarliestHL7v2SendTime(hl7v2Store, this.filter.get()); + // filters are [from, to) to match logic of OffsetRangeTracker but need latest element to be + // included in results set to add an extra ms to the upper bound. + to = this.client.getLatestHL7v2SendTime(hl7v2Store, this.filter.get()).plus(1); + return new OffsetRange(from.getMillis(), to.getMillis()); + } + + @NewTracker + public OffsetRangeTracker newTracker(@Restriction OffsetRange timeRange) { + return timeRange.newTracker(); + } + Review comment: This method is not necessary since `OffsetRange` supports `HasDefaultTracker` which your effectively invoking yourself. ```suggestion ``` ########## File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/HL7v2IO.java ########## @@ -415,10 +423,29 @@ private Message fetchMessage(HealthcareApiClient client, String msgId) } } - /** List HL7v2 messages in HL7v2 Stores with optional filter. */ + /** + * List HL7v2 messages in HL7v2 Stores with optional filter. + * + * <p>This transform is optimized for dynamic splitting of message.list calls for large batches of Review comment: Consider using `{@code ...}` when referring to code and `{@link ...} for thinks you can directly link against. ```suggestion * <p>This transform is optimized for dynamic splitting of {@code message.list} calls for large batches of ``` ########## File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/HL7v2IO.java ########## @@ -415,10 +423,29 @@ private Message fetchMessage(HealthcareApiClient client, String msgId) } } - /** List HL7v2 messages in HL7v2 Stores with optional filter. */ + /** + * List HL7v2 messages in HL7v2 Stores with optional filter. + * + * <p>This transform is optimized for dynamic splitting of message.list calls for large batches of + * historical data and assumes rather continuous stream of sendTimes. It will dynamically + * rebalance resources to handle "peak traffic times" but will waste resources if there are large + * durations (days) of the sendTime dimension without data. + * + * <p>Implementation includes overhead for: 1. two api calls to determine the min/max sendTime of + * the HL7v2 store at invocation time. 2. initial splitting into non-overlapping time ranges + * (default daily) to achieve parallelization in separate messages.list calls. Review comment: I'm not sure if the users need to know the exact implementation details as this may lock future maintainers into meeting these goals even when they can produce a more efficient solution in the future. ########## File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/HL7v2IO.java ########## @@ -415,10 +423,29 @@ private Message fetchMessage(HealthcareApiClient client, String msgId) } } - /** List HL7v2 messages in HL7v2 Stores with optional filter. */ + /** + * List HL7v2 messages in HL7v2 Stores with optional filter. + * + * <p>This transform is optimized for dynamic splitting of message.list calls for large batches of + * historical data and assumes rather continuous stream of sendTimes. It will dynamically + * rebalance resources to handle "peak traffic times" but will waste resources if there are large + * durations (days) of the sendTime dimension without data. + * + * <p>Implementation includes overhead for: 1. two api calls to determine the min/max sendTime of + * the HL7v2 store at invocation time. 2. initial splitting into non-overlapping time ranges + * (default daily) to achieve parallelization in separate messages.list calls. + * + * <p>This will make more queries than necessary when used with very small data sets. (or very + * sparse data sets in the sendTime dimension). Review comment: ```suggestion * sparse data sets in the {@code sendTime} dimension). ``` ########## File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/HL7v2IO.java ########## @@ -415,10 +423,29 @@ private Message fetchMessage(HealthcareApiClient client, String msgId) } } - /** List HL7v2 messages in HL7v2 Stores with optional filter. */ + /** + * List HL7v2 messages in HL7v2 Stores with optional filter. + * + * <p>This transform is optimized for dynamic splitting of message.list calls for large batches of + * historical data and assumes rather continuous stream of sendTimes. It will dynamically + * rebalance resources to handle "peak traffic times" but will waste resources if there are large + * durations (days) of the sendTime dimension without data. + * + * <p>Implementation includes overhead for: 1. two api calls to determine the min/max sendTime of + * the HL7v2 store at invocation time. 2. initial splitting into non-overlapping time ranges + * (default daily) to achieve parallelization in separate messages.list calls. + * + * <p>This will make more queries than necessary when used with very small data sets. (or very + * sparse data sets in the sendTime dimension). + * + * <p>If you have large but sparse data (e.g. hours between consecutive message sendTimes) and + * know something about the time ranges where you have no data, consider using multiple instances + * of this transform specifying sendTime filters to omit the ranges where there is no data. Review comment: I don't think we want people to do this since empty splits are not that expensive and will quickly clear out a block of work. ########## File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/HL7v2IO.java ########## @@ -427,29 +454,75 @@ private Message fetchMessage(HealthcareApiClient client, String msgId) * @param filter the filter */ ListHL7v2Messages(ValueProvider<List<String>> hl7v2Stores, ValueProvider<String> filter) { - this.hl7v2Stores = hl7v2Stores.get(); - this.filter = filter.get(); + this.hl7v2Stores = hl7v2Stores; + this.filter = filter; + } + + /** + * Instantiates a new List hl 7 v 2 messages. + * + * @param hl7v2Stores the hl 7 v 2 stores + * @param filter the filter + * @param initialSplitDuration the initial split duration for sendTime dimension splits + */ + ListHL7v2Messages( + ValueProvider<List<String>> hl7v2Stores, + ValueProvider<String> filter, + Duration initialSplitDuration) { + this.hl7v2Stores = hl7v2Stores; + this.filter = filter; + this.initialSplitDuration = initialSplitDuration; } + /** + * Instantiates a new List hl7v2 messages. + * + * @param hl7v2Stores the hl7v2 stores + */ ListHL7v2Messages(ValueProvider<List<String>> hl7v2Stores) { - this.hl7v2Stores = hl7v2Stores.get(); + this.hl7v2Stores = hl7v2Stores; this.filter = null; } + /** + * Instantiates a new List hl7v2 messages. + * + * @param hl7v2Stores the hl7v2 stores + * @param initialSplitDuration the initial split duration + */ + ListHL7v2Messages(ValueProvider<List<String>> hl7v2Stores, Duration initialSplitDuration) { + this.hl7v2Stores = hl7v2Stores; + this.initialSplitDuration = initialSplitDuration; + } + @Override public PCollection<HL7v2Message> expand(PBegin input) { return input - .apply(Create.of(this.hl7v2Stores)) - .apply(ParDo.of(new ListHL7v2MessagesFn(this.filter))) + .apply(Create.ofProvider(this.hl7v2Stores, ListCoder.of(StringUtf8Coder.of()))) + .apply(FlatMapElements.into(TypeDescriptors.strings()).via((x) -> x)) + .apply(ParDo.of(new ListHL7v2MessagesFn(this.filter, initialSplitDuration))) .setCoder(new HL7v2MessageCoder()) // Break fusion to encourage parallelization of downstream processing. .apply(Reshuffle.viaRandomKey()); } } + /** + * Implemented as Splitable DoFn that claims millisecond resolutions of offset restrictions in the + * Message.sendTime dimension. + */ + @BoundedPerElement static class ListHL7v2MessagesFn extends DoFn<String, HL7v2Message> { Review comment: ```suggestion @VisibleForTesting static class ListHL7v2MessagesFn extends DoFn<String, HL7v2Message> { ``` ########## File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/HL7v2IO.java ########## @@ -459,7 +532,13 @@ private Message fetchMessage(HealthcareApiClient client, String msgId) * @param filter the filter */ ListHL7v2MessagesFn(String filter) { + new ListHL7v2MessagesFn(StaticValueProvider.of(filter), null); Review comment: https://stackoverflow.com/questions/285177/how-do-i-call-one-constructor-from-another-in-java ```suggestion this(StaticValueProvider.of(filter), null); ``` ########## File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/HL7v2IO.java ########## @@ -472,24 +551,118 @@ public void initClient() throws IOException { this.client = new HttpHealthcareApiClient(); } + @GetInitialRestriction + public OffsetRange getEarliestToLatestRestriction(@Element String hl7v2Store) + throws IOException { + from = this.client.getEarliestHL7v2SendTime(hl7v2Store, this.filter.get()); + // filters are [from, to) to match logic of OffsetRangeTracker but need latest element to be + // included in results set to add an extra ms to the upper bound. + to = this.client.getLatestHL7v2SendTime(hl7v2Store, this.filter.get()).plus(1); + return new OffsetRange(from.getMillis(), to.getMillis()); + } + + @NewTracker + public OffsetRangeTracker newTracker(@Restriction OffsetRange timeRange) { + return timeRange.newTracker(); + } + + @SplitRestriction + public void split(@Restriction OffsetRange timeRange, OutputReceiver<OffsetRange> out) { + List<OffsetRange> splits = + timeRange.split(initialSplitDuration.getMillis(), DEFAULT_MIN_SPLIT_DURATION.getMillis()); + Instant from = Instant.ofEpochMilli(timeRange.getFrom()); + Instant to = Instant.ofEpochMilli(timeRange.getTo()); + Duration totalDuration = new Duration(from, to); + LOG.info( + String.format( + "splitting initial sendTime restriction of [minSendTime, now): [%s,%s), " + + "or [%s, %s). \n" + + "total days: %s \n" + + "into %s splits. \n" + + "Last split: %s", + from, + to, + timeRange.getFrom(), + timeRange.getTo(), + totalDuration.getStandardDays(), + splits.size(), + splits.get(splits.size() - 1).toString())); + + for (OffsetRange s : splits) { + out.output(s); + } + } + /** * List messages. * - * @param context the context + * @param hl7v2Store the HL7v2 store to list messages from * @throws IOException the io exception */ @ProcessElement - public void listMessages(ProcessContext context) throws IOException { - String hl7v2Store = context.element(); - // Output all elements of all pages. + public void listMessages( + @Element String hl7v2Store, + RestrictionTracker tracker, Review comment: ```suggestion RestrictionTracker<OffsetRange, Long> tracker, ``` ########## File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/HL7v2IO.java ########## @@ -472,24 +551,118 @@ public void initClient() throws IOException { this.client = new HttpHealthcareApiClient(); } + @GetInitialRestriction + public OffsetRange getEarliestToLatestRestriction(@Element String hl7v2Store) + throws IOException { + from = this.client.getEarliestHL7v2SendTime(hl7v2Store, this.filter.get()); + // filters are [from, to) to match logic of OffsetRangeTracker but need latest element to be + // included in results set to add an extra ms to the upper bound. + to = this.client.getLatestHL7v2SendTime(hl7v2Store, this.filter.get()).plus(1); + return new OffsetRange(from.getMillis(), to.getMillis()); + } + + @NewTracker + public OffsetRangeTracker newTracker(@Restriction OffsetRange timeRange) { + return timeRange.newTracker(); + } + + @SplitRestriction + public void split(@Restriction OffsetRange timeRange, OutputReceiver<OffsetRange> out) { + List<OffsetRange> splits = + timeRange.split(initialSplitDuration.getMillis(), DEFAULT_MIN_SPLIT_DURATION.getMillis()); + Instant from = Instant.ofEpochMilli(timeRange.getFrom()); + Instant to = Instant.ofEpochMilli(timeRange.getTo()); + Duration totalDuration = new Duration(from, to); + LOG.info( + String.format( + "splitting initial sendTime restriction of [minSendTime, now): [%s,%s), " + + "or [%s, %s). \n" + + "total days: %s \n" + + "into %s splits. \n" + + "Last split: %s", + from, + to, + timeRange.getFrom(), + timeRange.getTo(), + totalDuration.getStandardDays(), + splits.size(), + splits.get(splits.size() - 1).toString())); + + for (OffsetRange s : splits) { + out.output(s); + } + } + /** * List messages. * - * @param context the context + * @param hl7v2Store the HL7v2 store to list messages from * @throws IOException the io exception */ @ProcessElement - public void listMessages(ProcessContext context) throws IOException { - String hl7v2Store = context.element(); - // Output all elements of all pages. + public void listMessages( + @Element String hl7v2Store, + RestrictionTracker tracker, + OutputReceiver<HL7v2Message> outputReceiver) + throws IOException { + OffsetRange currentRestriction = (OffsetRange) tracker.currentRestriction(); + Instant startRestriction = Instant.ofEpochMilli(currentRestriction.getFrom()); + Instant endRestriction = Instant.ofEpochMilli(currentRestriction.getTo()); HttpHealthcareApiClient.HL7v2MessagePages pages = - new HttpHealthcareApiClient.HL7v2MessagePages(client, hl7v2Store, this.filter); + new HttpHealthcareApiClient.HL7v2MessagePages( + client, hl7v2Store, startRestriction, endRestriction, filter.get(), "sendTime"); long reqestTime = Instant.now().getMillis(); - for (Stream<HL7v2Message> page : pages) { + long lastClaimedMilliSecond; + Instant cursor; + boolean hangingClaim = false; // flag if the claimed ms spans spills over to the next page. + for (List<HL7v2Message> page : pages) { // loop over pages. + int i = 0; + HL7v2Message msg = page.get(i); + while (i < page.size()) { // loop over messages in page + cursor = Instant.parse(msg.getSendTime()); + lastClaimedMilliSecond = cursor.getMillis(); + LOG.info( + String.format( + "initial claim for page %s lastClaimedMilliSecond = %s", + i, lastClaimedMilliSecond)); + if (hangingClaim || tracker.tryClaim(lastClaimedMilliSecond)) { + // This means we have claimed an entire millisecond we need to make sure that we + // process all messages for this millisecond because sendTime is allegedly nano second + // resolution. + // https://cloud.google.com/healthcare/docs/reference/rest/v1beta1/projects.locations.datasets.hl7V2Stores.messages#Message + while (cursor.getMillis() == lastClaimedMilliSecond + && i < page.size()) { // loop over messages in millisecond. + outputReceiver.output(msg); + msg = page.get(i++); + cursor = Instant.parse(msg.getSendTime()); + } + + if (i == page.size() && cursor.getMillis() == lastClaimedMilliSecond) { + // reached the end of the page and timestamp still in the claimed ms. + hangingClaim = true; + continue; + } + + // If reached this point, msg.sendTime is outside the current claim. + // Need to claim time range up to (and including) the cursor to properly advance the + // tracker. + tracker.tryClaim(cursor.getMillis()); + lastClaimedMilliSecond = cursor.getMillis(); + LOG.info( + String.format( + "After claiming between messages lastClaimedMilliSecond = %s", + lastClaimedMilliSecond)); + } + } messageListingLatencyMs.update(Instant.now().getMillis() - reqestTime); Review comment: It would sense to move this into wherever we do the list call. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org