[
https://issues.apache.org/jira/browse/BEAM-9856?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17096961#comment-17096961
]
Jacob Ferriero commented on BEAM-9856:
--------------------------------------
The existing GA [HL7v2 Messages.List
API|https://cloud.google.com/healthcare/docs/reference/rest/v1beta1/projects.locations.datasets.hl7V2Stores.messages/list]
allows us to specify a filter and order by createTime. We should be able to
use this as our restriction dimension to make this a splitable DoFn.
I will investigate feasibility of this.
Basic Design Proposal:
Each Messages.List query will have a createTime filter based on it's
restriction and orderBy createTime in order closely mimic the
[OffsetRangeTracker|https://beam.apache.org/releases/javadoc/2.20.0/org/apache/beam/sdk/transforms/splittabledofn/OffsetRangeTracker.html]
pattern.
* getInitialRestriction: run a query against HL7v2 store to get earliest
createTime
* RestrictionTracker: keep a "watermark" based on createTime
* splitRestriction: split timestamp range based on fraction (this will require
an existing HL7v2 Message List call can be notified to "stop paginating through
the at an arbitrary createTime". Note that each List query might not get to the
end of it's createTime filter (due to splitting). Instead it should just spot
processing results when it encounters are record past the end of it's
restriction.
This should allow us to more eagerly emit results as certain restrictions are
completed.
Open Questions:
* Is there a canonical way of specifying more than one initial restriction
based on assumption that you know for most use cases you'll want to split at
least n-times upfront (e.g. partition by day/hour to start and dynamically
split from there) ? Is this an anti-pattern because
* Should the initial restriction have a endTime? Should this be an optional
user parameter for the transform? What should the default be (e.g.
Instant.now() just before firing the first query)? or Should the ListMessages
just scroll until it is "caught up" there are no newer messages?
Jake's two cents:
I consider this List Messages transform to primarily serve a batch / bounded
backfill or replay use case. I believe real-time use cases should use the Pub
Sub notifications for event driven / streaming updates from the HL7v2 store
with HL7v2IO.readAll() (as this allows for much greater parallelization and is
more likely to "keep up" during higher throughput). However, there is nothing
stopping a user from using ListMessages against a store that is still being
updated. If this becomes a splittable DoFn that fires many ListMessages
throughtout it's life time and our initial restriction is [minCreateTime,
inf), this could become an unbounded source (or more specifically unbounded per
element). I feel we should force ListMessages to be bounded per element by
always having a endCreateTime.
> HL7v2IO.ListMessages should be refactored once Bulk Export API is available
> ---------------------------------------------------------------------------
>
> Key: BEAM-9856
> URL: https://issues.apache.org/jira/browse/BEAM-9856
> Project: Beam
> Issue Type: Improvement
> Components: io-java-gcp
> Reporter: Jacob Ferriero
> Assignee: Jacob Ferriero
> Priority: Major
>
> Currently the List Messages API paginates through in a single ProcessElement
> Call.
>
> In the future if a bulk export API becomes available that would allow
> splitting on some dimension (e.g. create time), this should be refactored as
> a splittable DoFn or at least run several sub queries so that we are not
> listing an entire store in a single thread.
>
> This could look like paginating through each hour of data w/ in the time
> frame that the store spans, in a separate thread.
>
--
This message was sent by Atlassian Jira
(v8.3.4#803005)