[ 
https://issues.apache.org/jira/browse/BEAM-9856?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17096961#comment-17096961
 ] 

Jacob Ferriero commented on BEAM-9856:
--------------------------------------

The existing GA [HL7v2 Messages.List 
API|https://cloud.google.com/healthcare/docs/reference/rest/v1beta1/projects.locations.datasets.hl7V2Stores.messages/list]
 allows us to specify a filter and order by createTime. We should be able to 
use this as our restriction dimension to make this a splitable DoFn.

I will investigate feasibility of this.


Basic Design Proposal:
Each Messages.List query will have a createTime filter based on it's 
restriction and orderBy createTime in order closely mimic the 
[OffsetRangeTracker|https://beam.apache.org/releases/javadoc/2.20.0/org/apache/beam/sdk/transforms/splittabledofn/OffsetRangeTracker.html]
 pattern.

* getInitialRestriction: run a query against HL7v2 store to get earliest 
createTime 
* RestrictionTracker: keep a "watermark" based on createTime
* splitRestriction: split timestamp range based on fraction (this will require 
an existing HL7v2 Message List call can be notified to "stop paginating through 
the at an arbitrary createTime". Note that each List query might not get to the 
end of it's createTime filter (due to splitting). Instead it should just spot 
processing results when it encounters are record past the end of it's 
restriction.

This should allow us to more eagerly emit results as certain restrictions are 
completed.


Open Questions:
* Is there a canonical way of specifying more than one initial restriction 
based on assumption that you know for most use cases you'll want to split at 
least n-times upfront (e.g. partition by day/hour to start and dynamically 
split from there) ? Is this an anti-pattern because
* Should the initial restriction have a endTime? Should this be an optional 
user parameter for the transform? What should the default be (e.g. 
Instant.now() just before firing the first query)? or Should the ListMessages 
just scroll until it is "caught up" there are no newer messages?


Jake's two cents:
I consider this List Messages transform to primarily serve a batch / bounded 
backfill or replay use case. I believe real-time use cases should use the Pub 
Sub notifications for event driven / streaming updates from the HL7v2 store 
with HL7v2IO.readAll() (as this allows for much greater parallelization and is 
more likely to "keep up" during higher throughput). However, there is nothing 
stopping a user from using ListMessages against a store that is still being 
updated. If this becomes a splittable DoFn that fires many ListMessages 
throughtout it's life time and our initial restriction is [minCreateTime,  
inf), this could become an unbounded source (or more specifically unbounded per 
element). I feel we should force ListMessages to be bounded per element by 
always having a endCreateTime.

> HL7v2IO.ListMessages should be refactored once Bulk Export API is available
> ---------------------------------------------------------------------------
>
>                 Key: BEAM-9856
>                 URL: https://issues.apache.org/jira/browse/BEAM-9856
>             Project: Beam
>          Issue Type: Improvement
>          Components: io-java-gcp
>            Reporter: Jacob Ferriero
>            Assignee: Jacob Ferriero
>            Priority: Major
>
> Currently the List Messages API paginates through in a single ProcessElement 
> Call.
>  
> In the future if a bulk export API becomes available that would allow 
> splitting on some dimension (e.g. create time), this should be refactored as 
> a splittable DoFn or at least run several sub queries so that we are not 
> listing an entire store in a single thread.
>  
> This could look like paginating through each hour of data w/ in the time 
> frame that the store spans, in a separate thread.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to