pabloem commented on a change in pull request #14691:
URL: https://github.com/apache/beam/pull/14691#discussion_r626136094



##########
File path: 
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/FhirIO.java
##########
@@ -1417,8 +1428,69 @@ public void executeBundles(ProcessContext context) {
     }
   }
 
+  /** The type Patch resources. */
+  public static class PatchResources
+      extends PTransform<PCollection<FhirPatchParameter>, Write.Result> {
+    @Override
+    public FhirIO.Write.Result expand(PCollection<FhirPatchParameter> input) {
+      PCollectionTuple bodies =
+          input.apply(
+              ParDo.of(new PatchResourcesFn())
+                  .withOutputTags(Write.SUCCESSFUL_BODY, 
TupleTagList.of(Write.FAILED_BODY)));
+      bodies.get(Write.SUCCESSFUL_BODY).setCoder(StringUtf8Coder.of());
+      
bodies.get(Write.FAILED_BODY).setCoder(HealthcareIOErrorCoder.of(StringUtf8Coder.of()));
+      return Write.Result.in(input.getPipeline(), bodies);
+    }
+
+    /** The type Write Fhir fn. */
+    static class PatchResourcesFn extends DoFn<FhirPatchParameter, String> {
+
+      private static final Counter PATCH_RESOURCES_ERRORS =
+          Metrics.counter(
+              PatchResourcesFn.class, BASE_METRIC_PREFIX + 
"patch_resources_error_count");
+      private static final Counter PATCH_RESOURCES_SUCCESS =
+          Metrics.counter(
+              PatchResourcesFn.class, BASE_METRIC_PREFIX + 
"patch_resources_success_count");
+      private static final Distribution PATCH_RESOURCES_LATENCY_MS =
+          Metrics.distribution(
+              PatchResourcesFn.class, BASE_METRIC_PREFIX + 
"patch_resources_latency_ms");
+
+      private transient HealthcareApiClient client;
+      private final ObjectMapper mapper = new ObjectMapper();
+
+      /**
+       * Initialize healthcare client.
+       *
+       * @throws IOException If the Healthcare client cannot be created.
+       */
+      @Setup
+      public void initClient() throws IOException {
+        this.client = new HttpHealthcareApiClient();
+      }
+
+      @ProcessElement
+      public void patchResources(ProcessContext context) {
+        FhirPatchParameter patchParameter = context.element();

Review comment:
       This looks fine. I wonder if it helps to group them into batches, and 
use a thread pool/async calls to perform multiple requests at the same time? It 
may improve utilization and throughput of an individual worker.

##########
File path: 
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/FhirPatchParameter.java
##########
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.healthcare;
+
+import java.io.Serializable;
+import java.util.Map;
+import java.util.Objects;
+import org.apache.beam.sdk.coders.DefaultCoder;
+import org.checkerframework.checker.nullness.qual.Nullable;
+
+/**
+ * FhirPatchParemeter represents the parameters for a FHIR patch request, used 
as a parameter for
+ * {@link FhirIO.PatchResources}.
+ */
+@DefaultCoder(FhirPatchParameterCoder.class)
+public class FhirPatchParameter implements Serializable {

Review comment:
       it's not necessary, but you could replace this with an `@AutoValue` 
class that implements all the hashing / toString, etc. The AutoVAlue class can 
also have coders automatically set up.
   
   https://github.com/google/auto/blob/master/value/userguide/index.md

##########
File path: 
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/HealthcareApiClient.java
##########
@@ -161,14 +161,28 @@ Operation pollOperation(Operation operation, Long sleepMs)
   HttpBody executeFhirBundle(String fhirStore, String bundle)
       throws IOException, HealthcareHttpException;
 
+  /**
+   * Patch fhir resource http body.
+   *
+   * @param resourceName the resource name, in format
+   *     
projects/{p}/locations/{l}/datasets/{d}/fhirStores/{f}/fhir/{resourceType}[/{id}],
 id not
+   *     present when queryString is specified.
+   * @param patch the patch operation
+   * @param query optional query for conditional patches
+   * @return the http body
+   */
+  HttpBody patchFhirResource(String resourceName, String patch, @Nullable 
Map<String, String> query)
+      throws IOException, HealthcareHttpException;
+
   /**
    * Read fhir resource http body.
    *
-   * @param resourceId the resource
+   * @param resourceName the resource name, in format
+   *     
projects/{p}/locations/{l}/datasets/{d}/fhirStores/{f}/fhir/{resourceType}/{id}

Review comment:
       it may be good to update the Javadoc for the PTransform that expects the 
full resource name.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to