janeliulwq commented on a change in pull request #13395:
URL: https://github.com/apache/beam/pull/13395#discussion_r534564262



##########
File path: 
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/FhirIO.java
##########
@@ -1388,4 +1410,208 @@ public void deidentify(ProcessContext context)
       }
     }
   }
+
+  /** The type Search. */
+  public static class Search extends PTransform<PCollection<KV<String, 
Map<String, String>>>, FhirIO.Search.Result> {
+    private static final Logger LOG = LoggerFactory.getLogger(Search.class);
+
+    private final ValueProvider<String> fhirStore;
+
+    /**
+     * Instantiates a new Search.
+     *
+     * @param fhirStore the fhir store
+     */
+    Search(ValueProvider<String> fhirStore) {
+      this.fhirStore = fhirStore;
+    }
+
+    /**
+     * Instantiates a new Search.
+     *
+     * @param fhirStore the fhir store
+     */
+    Search(String fhirStore) {
+      this.fhirStore = StaticValueProvider.of(fhirStore);
+    }
+
+    /** The type Result. */
+    public static class Result implements POutput, PInput {
+      private PCollection<String> resources;
+
+      private PCollection<HealthcareIOError<String>> failedSearches;
+      PCollectionTuple pct;
+
+      /**
+       * Create FhirIO.Search.Result form PCollectionTuple with OUT and 
DEAD_LETTER tags.
+       *
+       * @param pct the pct
+       * @return the search result
+       * @throws IllegalArgumentException the illegal argument exception
+       */
+      static FhirIO.Search.Result of(PCollectionTuple pct) throws 
IllegalArgumentException {
+        if (pct.getAll()
+                .keySet()
+                .containsAll((Collection<?>) 
TupleTagList.of(OUT).and(DEAD_LETTER))) {
+          return new FhirIO.Search.Result(pct);
+        } else {
+          throw new IllegalArgumentException(
+                  "The PCollection tuple must have the FhirIO.Search.OUT "
+                          + "and FhirIO.Search.DEAD_LETTER tuple tags");
+        }
+      }
+
+      private Result(PCollectionTuple pct) {
+        this.pct = pct;
+        this.resources = pct.get(OUT);
+        this.failedSearches =
+                
pct.get(DEAD_LETTER).setCoder(HealthcareIOErrorCoder.of(StringUtf8Coder.of()));
+      }
+
+      /**
+       * Gets failed searches.
+       *
+       * @return the failed searches
+       */
+      public PCollection<HealthcareIOError<String>> getFailedSearches() {
+        return failedSearches;
+      }
+
+      /**
+       * Gets resources.
+       *
+       * @return the resources
+       */
+      public PCollection<String> getResources() {
+        return resources;
+      }
+
+      @Override
+      public Pipeline getPipeline() {
+        return this.pct.getPipeline();
+      }
+
+      @Override
+      public Map<TupleTag<?>, PValue> expand() {
+        return ImmutableMap.of(OUT, resources);
+      }
+
+      @Override
+      public void finishSpecifyingOutput(
+              String transformName, PInput input, PTransform<?, ?> transform) 
{}
+    }
+
+    /** The tag for the main output of Fhir Messages. */
+    public static final TupleTag<String> OUT = new TupleTag<String>() {};
+    /** The tag for the deadletter output of Fhir Messages. */
+    public static final TupleTag<HealthcareIOError<String>> DEAD_LETTER =
+            new TupleTag<HealthcareIOError<String>>() {};
+
+    @Override
+    public FhirIO.Search.Result expand(PCollection<KV<String, Map<String, 
String>>> input) {
+      return input.apply("Fetch Fhir messages", new 
SearchResourcesJsonString(this.fhirStore));
+    }
+
+    /**
+     * DoFn to fetch resources from an Google Cloud Healthcare FHIR store 
based on search request
+     *
+     * <p>This DoFn consumes a {@link PCollection} of search requests 
consisting of resource type
+     * and search parameters, and fetches all matching resources based on the 
search criteria and
+     * will output a {@link PCollectionTuple} which contains the output and 
dead-letter {@link
+     * PCollection}*.
+     *
+     * <p>The {@link PCollectionTuple} output will contain the following 
{@link PCollection}:
+     *
+     * <ul>
+     *   <li>{@link FhirIO.Search#OUT} - Contains all {@link PCollection} 
records successfully search
+     *       from the Fhir store.
+     *   <li>{@link FhirIO.Search#DEAD_LETTER} - Contains all {@link 
PCollection} of {@link
+     *       HealthcareIOError}* of failed searches from the Fhir store, with
+     *       error message and stacktrace.
+     * </ul>
+     */
+    static class SearchResourcesJsonString
+            extends PTransform<PCollection<KV<String, Map<String, String>>>, 
FhirIO.Search.Result> {
+
+      private final ValueProvider<String> fhirStore;
+
+      /** Instantiates a new Search Fhir resources DoFn. */
+      public SearchResourcesJsonString(ValueProvider<String> fhirStore) {
+        this.fhirStore = fhirStore;
+      }
+
+      @Override
+      public FhirIO.Search.Result expand(PCollection<KV<String, Map<String, 
String>>> resourceIds) {
+        return new FhirIO.Search.Result(
+                resourceIds.apply(
+                        ParDo.of(new SearchResourcesFn(this.fhirStore))
+                                .withOutputTags(FhirIO.Search.OUT, 
TupleTagList.of(FhirIO.Search.DEAD_LETTER))));
+      }
+
+      /** DoFn for searching messages from the Fhir store with error handling. 
*/
+      static class SearchResourcesFn extends DoFn<KV<String, Map<String, 
String>>, String> {
+
+        private Counter failedSearches =
+                Metrics.counter(SearchResourcesFn.class, 
"failed-fhir-searches");
+        private static final Logger LOG = 
LoggerFactory.getLogger(SearchResourcesFn.class);
+        private final Counter successfulSearches =
+                Metrics.counter(SearchResourcesFn.class, 
"successful-fhir-searches");
+        private HealthcareApiClient client;
+        private final ValueProvider<String> fhirStore;
+
+        /** Instantiates a new Fhir resources search fn. */
+        SearchResourcesFn(ValueProvider<String> fhirStore) {
+          this.fhirStore = fhirStore;
+        }
+
+        /**
+         * Instantiate healthcare client.
+         *
+         * @throws IOException the io exception
+         */
+        @Setup
+        public void instantiateHealthcareClient() throws IOException {
+          this.client = new HttpHealthcareApiClient();
+        }
+
+        /**
+         * Process element.
+         *
+         * @param context the context
+         */
+        @ProcessElement
+        public void processElement(ProcessContext context) {
+          KV<String, Map<String, String>> elementValues = context.element();
+          try {
+            context.output(searchResources(
+                    this.client, this.fhirStore.toString(), 
elementValues.getKey(), elementValues.getValue()));
+          } catch (Exception e) {
+            failedSearches.inc();
+            LOG.warn(
+                    String.format(
+                            "Error search FHIR messages writing to Dead Letter 
"
+                                    + "Queue. Cause: %s Stack Trace: %s",
+                            e.getMessage(), 
Throwables.getStackTraceAsString(e)));
+            context.output(FhirIO.Search.DEAD_LETTER, 
HealthcareIOError.of(this.fhirStore.toString(), e));
+          }
+        }
+
+        private String searchResources(HealthcareApiClient client, String 
fhirStore, String resourceType,
+                                       @Nullable Map<String, String> 
parameters)
+                throws IllegalArgumentException {
+          long startTime = System.currentTimeMillis();
+
+          HttpHealthcareApiClient.FhirResourcePages.FhirResourcePagesIterator 
iter =
+                  new 
HttpHealthcareApiClient.FhirResourcePages.FhirResourcePagesIterator(
+                          client, fhirStore, resourceType, parameters);
+          JsonArray result = new JsonArray();
+          while (iter.hasNext()) {
+            result.addAll(iter.next());
+          }
+          this.successfulSearches.inc();
+          return result.toString();

Review comment:
       I think for our use case, usually only one result will be produced from 
the iterator. Definitely makes sense to avoid one of the two calls, done.
   
   I assume you are talking about adding metrics on our end, but lmk if I 
misunderstood and there's anything to be added here. Thanks!




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to