msbukal commented on a change in pull request #14691:
URL: https://github.com/apache/beam/pull/14691#discussion_r626868844
##########
File path:
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/FhirIO.java
##########
@@ -1417,8 +1428,69 @@ public void executeBundles(ProcessContext context) {
}
}
+ /** The type Patch resources. */
+ public static class PatchResources
+ extends PTransform<PCollection<FhirPatchParameter>, Write.Result> {
+ @Override
+ public FhirIO.Write.Result expand(PCollection<FhirPatchParameter> input) {
+ PCollectionTuple bodies =
+ input.apply(
+ ParDo.of(new PatchResourcesFn())
+ .withOutputTags(Write.SUCCESSFUL_BODY,
TupleTagList.of(Write.FAILED_BODY)));
+ bodies.get(Write.SUCCESSFUL_BODY).setCoder(StringUtf8Coder.of());
+
bodies.get(Write.FAILED_BODY).setCoder(HealthcareIOErrorCoder.of(StringUtf8Coder.of()));
+ return Write.Result.in(input.getPipeline(), bodies);
+ }
+
+ /** The type Write Fhir fn. */
+ static class PatchResourcesFn extends DoFn<FhirPatchParameter, String> {
+
+ private static final Counter PATCH_RESOURCES_ERRORS =
+ Metrics.counter(
+ PatchResourcesFn.class, BASE_METRIC_PREFIX +
"patch_resources_error_count");
+ private static final Counter PATCH_RESOURCES_SUCCESS =
+ Metrics.counter(
+ PatchResourcesFn.class, BASE_METRIC_PREFIX +
"patch_resources_success_count");
+ private static final Distribution PATCH_RESOURCES_LATENCY_MS =
+ Metrics.distribution(
+ PatchResourcesFn.class, BASE_METRIC_PREFIX +
"patch_resources_latency_ms");
+
+ private transient HealthcareApiClient client;
+ private final ObjectMapper mapper = new ObjectMapper();
+
+ /**
+ * Initialize healthcare client.
+ *
+ * @throws IOException If the Healthcare client cannot be created.
+ */
+ @Setup
+ public void initClient() throws IOException {
+ this.client = new HttpHealthcareApiClient();
+ }
+
+ @ProcessElement
+ public void patchResources(ProcessContext context) {
+ FhirPatchParameter patchParameter = context.element();
Review comment:
Done, with a small numShards to help not exceed quota.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]