jaketf commented on a change in pull request #11702:
URL: https://github.com/apache/beam/pull/11702#discussion_r436837505
##########
File path:
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/FhirIO.java
##########
@@ -155,17 +168,53 @@
* FhirIO.Write.Result writeResult =
* output.apply("Execute FHIR Bundles",
FhirIO.executeBundles(options.getExistingFhirStore()));
*
+ * // Alternatively you could use import for high throughput to a new store.
+ * FhirIO.Write.Result writeResult =
+ * output.apply("Import FHIR Resources",
FhirIO.executeBundles(options.getNewFhirStore()));
+ * // [End Writing ]
+ *
* PCollection<HealthcareIOError<String>> failedBundles =
writeResult.getFailedInsertsWithErr();
*
+ * // [Begin Writing to Dead Letter Queue]
* failedBundles.apply("Write failed bundles to BigQuery",
* BigQueryIO
* .write()
* .to(option.getBQFhirExecuteBundlesDeadLetterTable())
* .withFormatFunction(new HealthcareIOErrorToTableRow()));
+ * // [End Writing to Dead Letter Queue]
+ *
+ * // Alternatively you may want to handle DeadLetter with conditional update
+ * // [Begin Reconciliation with Conditional Update]
+ * failedBundles
+ * .apply("Reconcile with Conditional Update",
+ * FhirIO.ConditionalUpdate(fhirStore)
+ *
.withFormatBodyFunction(HealthcareIOError<String>::getDataResource)
+ * .withTypeFunction((HealthcareIOError<String> err) -> {
+ * String body = err.getDataResource();
+ * // TODO(user) insert logic to exctract type.
+ * return params;
+ * })
+ * .withSearchParametersFunction((HealthcareIOError<String> err)
-> {
Review comment:
this is a great suggestion will add a FhirSearchParameter Builder.
##########
File path:
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/FhirIO.java
##########
@@ -155,17 +168,53 @@
* FhirIO.Write.Result writeResult =
* output.apply("Execute FHIR Bundles",
FhirIO.executeBundles(options.getExistingFhirStore()));
*
+ * // Alternatively you could use import for high throughput to a new store.
+ * FhirIO.Write.Result writeResult =
+ * output.apply("Import FHIR Resources",
FhirIO.executeBundles(options.getNewFhirStore()));
+ * // [End Writing ]
+ *
* PCollection<HealthcareIOError<String>> failedBundles =
writeResult.getFailedInsertsWithErr();
*
+ * // [Begin Writing to Dead Letter Queue]
* failedBundles.apply("Write failed bundles to BigQuery",
* BigQueryIO
* .write()
* .to(option.getBQFhirExecuteBundlesDeadLetterTable())
* .withFormatFunction(new HealthcareIOErrorToTableRow()));
+ * // [End Writing to Dead Letter Queue]
+ *
+ * // Alternatively you may want to handle DeadLetter with conditional update
+ * // [Begin Reconciliation with Conditional Update]
+ * failedBundles
+ * .apply("Reconcile with Conditional Update",
+ * FhirIO.ConditionalUpdate(fhirStore)
+ *
.withFormatBodyFunction(HealthcareIOError<String>::getDataResource)
+ * .withTypeFunction((HealthcareIOError<String> err) -> {
+ * String body = err.getDataResource();
+ * // TODO(user) insert logic to exctract type.
+ * return params;
Review comment:
These are just examples. For R4 the exhaustive list is
[here](https://www.hl7.org/fhir/valueset-resource-types.html).
I don't want to add an enum because I think different versions of FHIR may
provide different resource types and I want FhirIO implementation be pretty
version agnostic. Though we test against all versions, the code path is
identical.
If we were to add an enum I wouldn't want to maintain it as part of beam.
If we want to institute stronger typing and domain sanity checks, we could
consider adding a dependency on the open source [HAPI
library](https://hapifhir.io/hapi-fhir/apidocs/hapi-fhir-structures-r4/org/hl7/fhir/r4/model/package-summary.html)
which is maintained by the HL7 community and is pretty popular.
##########
File path:
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/FhirIO.java
##########
@@ -1173,4 +1276,339 @@ public void executeBundles(ProcessContext context) {
}
}
}
+
+ /**
+ * Create resources fhir io . create resources.
+ *
+ * @param <T> the type parameter
+ * @param fhirStore the fhir store
+ * @return the fhir io . create resources
+ */
+ public static <T> FhirIO.CreateResources<T>
createResources(ValueProvider<String> fhirStore) {
+ return new CreateResources(fhirStore);
+ }
+
+ /**
+ * Create resources fhir io . create resources.
+ *
+ * @param <T> the type parameter
+ * @param fhirStore the fhir store
+ * @return the fhir io . create resources
+ */
+ public static <T> FhirIO.CreateResources<T> createResources(String
fhirStore) {
+ return new CreateResources(fhirStore);
+ }
+ /**
+ * {@link PTransform} for Creating FHIR resources.
+ *
+ *
<p>https://cloud.google.com/healthcare/docs/reference/rest/v1beta1/projects.locations.datasets.fhirStores.fhir/create
+ */
+ public static class CreateResources<T> extends PTransform<PCollection<T>,
Write.Result> {
+ private final String fhirStore;
+ private SerializableFunction<T, String> ifNoneExistFunction;
+ private SerializableFunction<T, String> formatBodyFunction;
+ private SerializableFunction<T, String> typeFunction;
+ private static final Logger LOG =
LoggerFactory.getLogger(CreateResources.class);
+
+ /**
+ * Instantiates a new Create resources transform.
+ *
+ * @param fhirStore the fhir store
+ */
+ CreateResources(ValueProvider<String> fhirStore) {
+ this.fhirStore = fhirStore.get();
+ }
+
+ /**
+ * Instantiates a new Create resources.
+ *
+ * @param fhirStore the fhir store
+ */
+ CreateResources(String fhirStore) {
+ this.fhirStore = fhirStore;
+ }
+
+ /**
+ * This adds a {@link SerializableFunction} that reads an resource string
and extracts an
+ * If-None-Exists query for conditional create. Typically this will just
be extracting an ID to
+ * look for.
+ *
+ *
<p>https://cloud.google.com/healthcare/docs/reference/rest/v1beta1/projects.locations.datasets.fhirStores.fhir/create
+ *
+ * @param ifNoneExistFunction the if none exist function
+ * @return the create resources
+ */
+ public CreateResources withIfNotExistFunction(
+ SerializableFunction<T, String> ifNoneExistFunction) {
+ this.ifNoneExistFunction = ifNoneExistFunction;
+ return this;
+ }
+
+ /**
+ * This adds a {@link SerializableFunction} that reads an resource string
and extracts an
+ * resource type.
+ *
+ *
<p>https://cloud.google.com/healthcare/docs/reference/rest/v1beta1/projects.locations.datasets.fhirStores.fhir/create
+ *
+ * @param typeFunction for extracting type from a resource.
+ * @return the create resources
+ */
+ public CreateResources withTypeFunction(SerializableFunction<T, String>
typeFunction) {
+ this.typeFunction = typeFunction;
+ return this;
+ }
+ /**
+ * With format body function create resources.
+ *
+ * @param formatBodyFunction the format body function
+ * @return the create resources
+ */
+ public CreateResources withFormatBodyFunction(
Review comment:
The main idea for this was is if you have a PCollection of resource body
wrapped in HealthcareIOError (or some other custom user class, or a [HAPI
](https://hapifhir.io/) class) you use this function to "format" it as a string
JSON body for the REST request.
I considered the name "`withExtractBodyFunction`" but chose format because
this is what BigQueryIO API calls it.
I will add some more docs for this but do you think that there is a better
word than Body?
- [ ] Payload
- [ ] RequestBody
- [ ] JsonBody
- [ ] Request
##########
File path:
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/FhirIO.java
##########
@@ -155,17 +168,53 @@
* FhirIO.Write.Result writeResult =
* output.apply("Execute FHIR Bundles",
FhirIO.executeBundles(options.getExistingFhirStore()));
*
+ * // Alternatively you could use import for high throughput to a new store.
+ * FhirIO.Write.Result writeResult =
+ * output.apply("Import FHIR Resources",
FhirIO.executeBundles(options.getNewFhirStore()));
+ * // [End Writing ]
+ *
* PCollection<HealthcareIOError<String>> failedBundles =
writeResult.getFailedInsertsWithErr();
*
+ * // [Begin Writing to Dead Letter Queue]
* failedBundles.apply("Write failed bundles to BigQuery",
* BigQueryIO
* .write()
* .to(option.getBQFhirExecuteBundlesDeadLetterTable())
* .withFormatFunction(new HealthcareIOErrorToTableRow()));
+ * // [End Writing to Dead Letter Queue]
+ *
+ * // Alternatively you may want to handle DeadLetter with conditional update
+ * // [Begin Reconciliation with Conditional Update]
+ * failedBundles
+ * .apply("Reconcile with Conditional Update",
+ * FhirIO.ConditionalUpdate(fhirStore)
+ *
.withFormatBodyFunction(HealthcareIOError<String>::getDataResource)
+ * .withTypeFunction((HealthcareIOError<String> err) -> {
+ * String body = err.getDataResource();
+ * // TODO(user) insert logic to exctract type.
+ * return params;
+ * })
+ * .withSearchParametersFunction((HealthcareIOError<String> err)
-> {
+ * String body = err.getDataResource();
+ * Map<String, String> params = new HashMap();
+ * // TODO(user) insert logic to exctract search query
parameters.
+ * return params;
+ * });
+ * // [End Reconciliation with Conditional Update]
+ *
+ * // Alternatively you may want to handle DeadLetter with conditional create
+ * // [Begin Reconciliation with Conditional Update]
+ * failedBundles
+ * .apply("Reconcile with Conditional Create",
+ * FhirIO.CreateResources(fhirStore)
+ *
.withFormatBodyFunction(HealthcareIOError<String>::getDataResource)
+ * .withIfNotExistsFunction((HealthcareIOError<String> err) -> {
+ * String body = err.getDataResource();
+ * // TODO(user) insert logic to exctract a query to be used in
If-Not-Exists header.
+ * return params;
Review comment:
Fully formatted query.
Will clarify in the docs and link to the examples defined in the
FhirIOTestUtil class.
##########
File path:
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/FhirIO.java
##########
@@ -130,10 +132,18 @@
* FhirIO.Write.Result#getFailedBodies()} to retrieve a {@link
PCollection} of {@link
* HealthcareIOError} containing the {@link String} that failed to be
ingested and the
* exception.
+ * <h3>Conditional Creating / Updating Resources</h3>
+ * {@link FhirIO} supports interfaces for conditional update. These can be
useful to handle
+ * scenarios where an executeBundle failed. For example if you tried to
create a resource that
+ * already exists you can grab the faield bodies of your {@link
FhirIO.ExecuteBundles} transform
+ * with {@link FhirIO.Write.Result#getFailedBodies()} perform some logic
on the reason for
+ * failures and if appropriate route this to {@link
FhirIO.ConditionalUpdate} or {@link
+ * FhirIO.CreateResources} to take the appropriate action on your FHIR
store.
Review comment:
ACK will do.
##########
File path:
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/FhirIO.java
##########
@@ -1173,4 +1276,339 @@ public void executeBundles(ProcessContext context) {
}
}
}
+
+ /**
+ * Create resources fhir io . create resources.
+ *
+ * @param <T> the type parameter
+ * @param fhirStore the fhir store
+ * @return the fhir io . create resources
+ */
+ public static <T> FhirIO.CreateResources<T>
createResources(ValueProvider<String> fhirStore) {
+ return new CreateResources(fhirStore);
+ }
+
+ /**
+ * Create resources fhir io . create resources.
+ *
+ * @param <T> the type parameter
+ * @param fhirStore the fhir store
+ * @return the fhir io . create resources
+ */
+ public static <T> FhirIO.CreateResources<T> createResources(String
fhirStore) {
+ return new CreateResources(fhirStore);
+ }
+ /**
+ * {@link PTransform} for Creating FHIR resources.
+ *
+ *
<p>https://cloud.google.com/healthcare/docs/reference/rest/v1beta1/projects.locations.datasets.fhirStores.fhir/create
+ */
+ public static class CreateResources<T> extends PTransform<PCollection<T>,
Write.Result> {
+ private final String fhirStore;
+ private SerializableFunction<T, String> ifNoneExistFunction;
+ private SerializableFunction<T, String> formatBodyFunction;
+ private SerializableFunction<T, String> typeFunction;
+ private static final Logger LOG =
LoggerFactory.getLogger(CreateResources.class);
+
+ /**
+ * Instantiates a new Create resources transform.
+ *
+ * @param fhirStore the fhir store
+ */
+ CreateResources(ValueProvider<String> fhirStore) {
+ this.fhirStore = fhirStore.get();
+ }
+
+ /**
+ * Instantiates a new Create resources.
+ *
+ * @param fhirStore the fhir store
+ */
+ CreateResources(String fhirStore) {
+ this.fhirStore = fhirStore;
+ }
+
+ /**
+ * This adds a {@link SerializableFunction} that reads an resource string
and extracts an
+ * If-None-Exists query for conditional create. Typically this will just
be extracting an ID to
+ * look for.
+ *
+ *
<p>https://cloud.google.com/healthcare/docs/reference/rest/v1beta1/projects.locations.datasets.fhirStores.fhir/create
+ *
+ * @param ifNoneExistFunction the if none exist function
+ * @return the create resources
+ */
+ public CreateResources withIfNotExistFunction(
+ SerializableFunction<T, String> ifNoneExistFunction) {
+ this.ifNoneExistFunction = ifNoneExistFunction;
+ return this;
+ }
+
+ /**
+ * This adds a {@link SerializableFunction} that reads an resource string
and extracts an
+ * resource type.
+ *
+ *
<p>https://cloud.google.com/healthcare/docs/reference/rest/v1beta1/projects.locations.datasets.fhirStores.fhir/create
+ *
+ * @param typeFunction for extracting type from a resource.
+ * @return the create resources
+ */
+ public CreateResources withTypeFunction(SerializableFunction<T, String>
typeFunction) {
+ this.typeFunction = typeFunction;
+ return this;
+ }
+ /**
+ * With format body function create resources.
+ *
+ * @param formatBodyFunction the format body function
+ * @return the create resources
+ */
+ public CreateResources withFormatBodyFunction(
Review comment:
The main idea for this was is if you have a PCollection of resource body
wrapped in HealthcareIOError (or some other custom user class, or a [HAPI
](https://hapifhir.io/) class) you use this function to "format" it as a string
JSON body for the REST request.
I considered the name "`withExtractBodyFunction`" but chose format because
this is what BigQueryIO API calls it.
I will add some more docs for this but do you think that there is a better
word than Body?
- [ ] Payload
- [ ] RequestBody
- [ ] JsonBody
- [ ] Request
- [ ] Resource
##########
File path:
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/FhirIO.java
##########
@@ -155,17 +168,53 @@
* FhirIO.Write.Result writeResult =
* output.apply("Execute FHIR Bundles",
FhirIO.executeBundles(options.getExistingFhirStore()));
*
+ * // Alternatively you could use import for high throughput to a new store.
+ * FhirIO.Write.Result writeResult =
+ * output.apply("Import FHIR Resources",
FhirIO.executeBundles(options.getNewFhirStore()));
+ * // [End Writing ]
+ *
* PCollection<HealthcareIOError<String>> failedBundles =
writeResult.getFailedInsertsWithErr();
*
+ * // [Begin Writing to Dead Letter Queue]
* failedBundles.apply("Write failed bundles to BigQuery",
* BigQueryIO
* .write()
* .to(option.getBQFhirExecuteBundlesDeadLetterTable())
* .withFormatFunction(new HealthcareIOErrorToTableRow()));
+ * // [End Writing to Dead Letter Queue]
+ *
+ * // Alternatively you may want to handle DeadLetter with conditional update
+ * // [Begin Reconciliation with Conditional Update]
+ * failedBundles
+ * .apply("Reconcile with Conditional Update",
+ * FhirIO.ConditionalUpdate(fhirStore)
+ *
.withFormatBodyFunction(HealthcareIOError<String>::getDataResource)
+ * .withTypeFunction((HealthcareIOError<String> err) -> {
+ * String body = err.getDataResource();
+ * // TODO(user) insert logic to exctract type.
+ * return params;
+ * })
+ * .withSearchParametersFunction((HealthcareIOError<String> err)
-> {
+ * String body = err.getDataResource();
+ * Map<String, String> params = new HashMap();
+ * // TODO(user) insert logic to exctract search query
parameters.
+ * return params;
+ * });
+ * // [End Reconciliation with Conditional Update]
+ *
+ * // Alternatively you may want to handle DeadLetter with conditional create
+ * // [Begin Reconciliation with Conditional Update]
+ * failedBundles
+ * .apply("Reconcile with Conditional Create",
+ * FhirIO.CreateResources(fhirStore)
+ *
.withFormatBodyFunction(HealthcareIOError<String>::getDataResource)
Review comment:
open to changing the name.
Let's continue the conversation in the [comment
above](https://github.com/apache/beam/pull/11702#discussion_r436846234) and be
consistent as these serve a similar purpose.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]