This is an automated email from the ASF dual-hosted git repository.

pabloem pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 5ce29e7  Merge pull request #15936 from [BEAM-13351] FhirIO 
GetPatientEverything connector
5ce29e7 is described below

commit 5ce29e7a6f835edc02698ffdfd3dfb5e4287edd1
Author: Milena Bukal <[email protected]>
AuthorDate: Tue Nov 30 11:18:09 2021 -0400

    Merge pull request #15936 from [BEAM-13351] FhirIO GetPatientEverything 
connector
    
    * GetPatientEverythig implementation
    
    * Adding tests + comments
    
    * Merge branch 'PATIENT_SOMETHING' of https://github.com/msbukal/beam into 
PATIENT_SOMETHING
    
    * Fix Presubmit warnings/errors
    
    * Fix checkStyle
    
    * Review fixes
    
    * Review fixes 2
---
 .../apache/beam/sdk/io/gcp/healthcare/FhirIO.java  | 295 +++++++++++----------
 .../io/gcp/healthcare/FhirIOPatientEverything.java | 237 +++++++++++++++++
 .../sdk/io/gcp/healthcare/FhirSearchParameter.java |  41 ++-
 .../sdk/io/gcp/healthcare/HealthcareApiClient.java |  18 +-
 .../io/gcp/healthcare/HttpHealthcareApiClient.java | 225 ++++++++--------
 .../gcp/healthcare/FhirIOPatientEverythingIT.java  | 142 ++++++++++
 .../beam/sdk/io/gcp/healthcare/FhirIOTest.java     |  28 +-
 .../beam/sdk/io/gcp/healthcare/FhirIOTestUtil.java |  27 +-
 8 files changed, 741 insertions(+), 272 deletions(-)

diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/FhirIO.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/FhirIO.java
index 6ca2395..7e7d351 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/FhirIO.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/FhirIO.java
@@ -54,6 +54,8 @@ import org.apache.beam.sdk.io.fs.MatchResult.Status;
 import org.apache.beam.sdk.io.fs.MoveOptions.StandardMoveOptions;
 import org.apache.beam.sdk.io.fs.ResolveOptions.StandardResolveOptions;
 import org.apache.beam.sdk.io.fs.ResourceId;
+import org.apache.beam.sdk.io.gcp.healthcare.FhirIO.Import.ContentStructure;
+import 
org.apache.beam.sdk.io.gcp.healthcare.HttpHealthcareApiClient.FhirResourcePagesIterator;
 import 
org.apache.beam.sdk.io.gcp.healthcare.HttpHealthcareApiClient.HealthcareHttpException;
 import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
 import org.apache.beam.sdk.metrics.Counter;
@@ -94,20 +96,25 @@ import org.slf4j.LoggerFactory;
  *
  * <h3>Reading</h3>
  *
- * <p>FHIR resources can be read with {@link FhirIO.Read}, which supports use 
cases where you have
- * a ${@link PCollection} of message IDs. This is appropriate for reading the 
Fhir notifications
- * from a Pub/Sub subscription with {@link PubsubIO#readStrings()} or in cases 
where you have a
- * manually prepared list of messages that you need to process (e.g. in a text 
file read with {@link
+ * <p>FHIR resources can be read with {@link FhirIO.Read}, which supports use 
cases where you have a
+ * ${@link PCollection} of FHIR resource names in the format of 
projects/{p}/locations/{l}/datasets/{d}/fhirStores/{f}/fhir/{resourceType}/{id}.
+ * This is appropriate for reading the Fhir notifications from
+ * a Pub/Sub subscription with {@link PubsubIO#readStrings()} or in cases 
where you have a manually
+ * prepared list of resources that you need to process (e.g. in a text file 
read with {@link
  * org.apache.beam.sdk.io.TextIO}*) .
  *
- * <p>Fetch Resource contents from Fhir Store based on the {@link PCollection} 
of message ID
- * strings {@link FhirIO.Read.Result} where one can call {@link 
Read.Result#getResources()} to
- * retrieve a {@link PCollection} containing the successfully fetched {@link 
String}s and/or {@link
- * FhirIO.Read.Result#getFailedReads()}* to retrieve a {@link PCollection} of 
{@link
- * HealthcareIOError}* containing the resource ID that could not be fetched 
and the exception as a
+ * <p>Get Resource contents from the FHIR Store based on the {@link 
PCollection} of FHIR resource name strings
+ * {@link FhirIO.Read.Result} where one can call {@link 
Read.Result#getResources()} to retrieve a
+ * {@link PCollection} containing the successfully fetched json resources as 
{@link String}s and/or {@link
+ * FhirIO.Read.Result#getFailedReads()} to retrieve a {@link PCollection} of 
{@link HealthcareIOError}
+ * containing the resources that could not be fetched and the exception as a
  * {@link HealthcareIOError}, this can be used to write to the dead letter 
storage system of your
  * choosing. This error handling is mainly to transparently surface errors 
where the upstream {@link
- * PCollection}* contains IDs that are not valid or are not reachable due to 
permissions issues.
+ * PCollection} contains FHIR resources that are not valid or are not 
reachable due to permissions issues.
+ *
+ * Additionally, you can query an entire FHIR Patient resource's compartment 
(resources that
+ * refer to the patient, and are referred to by the patient) by calling {@link 
FhirIO.getPatientEverything} to
+ * execute a FHIR GetPatientEverythingRequest.
  *
  * <h3>Writing</h3>
  *
@@ -157,12 +164,12 @@ import org.slf4j.LoggerFactory;
  *     
PubsubIO.readStrings().fromSubscription(options.getNotificationSubscription()))
  *   .apply(FhirIO.readResources());
  *
- * // happily retrived messages
+ * // happily retrived resources
  * PCollection<String> resources = readResult.getResources();
- * // message IDs that couldn't be retrieved + error context
+ * // resource paths that couldn't be retrieved + error context
  * PCollection<HealthcareIOError<String>> failedReads = 
readResult.getFailedReads();
  *
- * failedReads.apply("Write Message IDs / Stacktrace for Failed Reads to 
BigQuery",
+ * failedReads.apply("Write Resources / Stacktrace for Failed Reads to 
BigQuery",
  *     BigQueryIO
  *         .write()
  *         .to(option.getBQFhirExecuteBundlesDeadLetterTable())
@@ -229,7 +236,7 @@ import org.slf4j.LoggerFactory;
 })
 public class FhirIO {
 
-  private static final String BASE_METRIC_PREFIX = "fhirio/";
+  static final String BASE_METRIC_PREFIX = "fhirio/";
   private static final String LRO_COUNTER_KEY = "counter";
   private static final String LRO_SUCCESS_KEY = "success";
   private static final String LRO_FAILURE_KEY = "failure";
@@ -372,6 +379,18 @@ public class FhirIO {
   }
 
   /**
+   * Get the patient compartment for a FHIR Patient using the 
GetPatientEverything/$everything API.
+   *
+   * @see <a
+   *     
href=https://cloud.google.com/healthcare-api/docs/reference/rest/v1/projects.locations.datasets.fhirStores.fhir/Patient-everything></a>
+   * @return the patient everything
+   * @see FhirIOPatientEverything
+   */
+  public static FhirIOPatientEverything getPatientEverything() {
+    return new FhirIOPatientEverything();
+  }
+
+  /**
    * Increments success and failure counters for an LRO. To be used after the 
LRO has completed.
    * This function leverages the fact that the LRO metadata is always of the 
format: "counter": {
    * "success": "1", "failure": "1" }
@@ -494,22 +513,22 @@ public class FhirIO {
           String transformName, PInput input, PTransform<?, ?> transform) {}
     }
 
-    /** The tag for the main output of Fhir Messages. */
+    /** The tag for the main output of FHIR resources. */
     public static final TupleTag<String> OUT = new TupleTag<String>() {};
-    /** The tag for the deadletter output of Fhir Messages. */
+    /** The tag for the deadletter output of FHIR resources. */
     public static final TupleTag<HealthcareIOError<String>> DEAD_LETTER =
         new TupleTag<HealthcareIOError<String>>() {};
 
     @Override
     public FhirIO.Read.Result expand(PCollection<String> input) {
-      return input.apply("Fetch Fhir messages", new FetchResourceJsonString());
+      return input.apply("Read FHIR Resources", new GetResourceJsonString());
     }
 
     /**
-     * DoFn to fetch a resource from an Google Cloud Healthcare FHIR store 
based on resourceID
+     * DoFn to fetch a resource from an Google Cloud Healthcare FHIR store 
based on resource path.
      *
      * <p>This DoFn consumes a {@link PCollection} of notifications {@link 
String}s from the FHIR
-     * store, and fetches the actual {@link String} object based on the id in 
the notification and
+     * store, and fetches the actual {@link String} object based on the path 
in the notification and
      * will output a {@link PCollectionTuple} which contains the output and 
dead-letter {@link
      * PCollection}*.
      *
@@ -519,15 +538,14 @@ public class FhirIO {
      *   <li>{@link FhirIO.Read#OUT} - Contains all {@link PCollection} 
records successfully read
      *       from the Fhir store.
      *   <li>{@link FhirIO.Read#DEAD_LETTER} - Contains all {@link 
PCollection} of {@link
-     *       HealthcareIOError}* of message IDs which failed to be fetched 
from the Fhir store, with
+     *       HealthcareIOError}* of resources which failed to be fetched from 
the FHIR store, with
      *       error message and stacktrace.
      * </ul>
      */
-    static class FetchResourceJsonString
-        extends PTransform<PCollection<String>, FhirIO.Read.Result> {
+    static class GetResourceJsonString extends PTransform<PCollection<String>, 
FhirIO.Read.Result> {
 
-      /** Instantiates a new Fetch Fhir message DoFn. */
-      public FetchResourceJsonString() {}
+      /** Instantiates a new Get FHIR resource DoFn. */
+      public GetResourceJsonString() {}
 
       @Override
       public FhirIO.Read.Result expand(PCollection<String> resourceIds) {
@@ -537,7 +555,7 @@ public class FhirIO {
                     .withOutputTags(FhirIO.Read.OUT, 
TupleTagList.of(FhirIO.Read.DEAD_LETTER))));
       }
 
-      /** DoFn for fetching messages from the Fhir store with error handling. 
*/
+      /** DoFn for getting resources from the FHIR store with error handling. 
*/
       static class ReadResourceFn extends DoFn<String, String> {
 
         private static final Logger LOG = 
LoggerFactory.getLogger(ReadResourceFn.class);
@@ -553,7 +571,7 @@ public class FhirIO {
         private HealthcareApiClient client;
         private ObjectMapper mapper;
 
-        /** Instantiates a new Hl 7 v 2 message get fn. */
+        /** Instantiates a new get FHIR resource fn. */
         ReadResourceFn() {}
 
         /**
@@ -581,22 +599,22 @@ public class FhirIO {
             READ_RESOURCE_ERRORS.inc();
             LOG.warn(
                 String.format(
-                    "Error fetching Fhir message with ID %s writing to Dead 
Letter "
+                    "Error fetching Fhir resource with ID %s writing to Dead 
Letter "
                         + "Queue. Cause: %s Stack Trace: %s",
                     resourceId, e.getMessage(), 
Throwables.getStackTraceAsString(e)));
             context.output(FhirIO.Read.DEAD_LETTER, 
HealthcareIOError.of(resourceId, e));
           }
         }
 
-        private String fetchResource(HealthcareApiClient client, String 
resourceId)
+        private String fetchResource(HealthcareApiClient client, String 
resourceName)
             throws IOException, IllegalArgumentException {
           long startTime = Instant.now().toEpochMilli();
 
-          HttpBody resource = client.readFhirResource(resourceId);
+          HttpBody resource = client.readFhirResource(resourceName);
           READ_RESOURCE_LATENCY_MS.update(Instant.now().toEpochMilli() - 
startTime);
 
           if (resource == null) {
-            throw new IOException(String.format("GET request for %s returned 
null", resourceId));
+            throw new IOException(String.format("GET request for %s returned 
null", resourceName));
           }
           READ_RESOURCE_SUCCESS.inc();
           return mapper.writeValueAsString(resource);
@@ -823,10 +841,10 @@ public class FhirIO {
     }
 
     /**
-     * Create Method creates a single FHIR resource. @see <a
-     * 
href=https://cloud.google.com/healthcare/docs/reference/rest/v1/projects.locations.datasets.fhirStores.fhir/create></a>
+     * Import method for batch writing resources. @see <a
+     * 
href=https://cloud.google.com/healthcare-api/docs/reference/rest/v1/projects.locations.datasets.fhirStores/import></a>
      *
-     * @param fhirStore the hl 7 v 2 store
+     * @param fhirStore the FHIR store
      * @param gcsTempPath the gcs temp path
      * @param gcsDeadLetterPath the gcs dead letter path
      * @param contentStructure the content structure
@@ -873,7 +891,8 @@ public class FhirIO {
     }
 
     /**
-     * Execute Bundle Method executes a batch of requests as a single 
transaction @see <a
+     * Execute Bundle Method executes a batch of requests in batch or as a 
single transaction @see
+     * <a
      * 
href=https://cloud.google.com/healthcare/docs/reference/rest/v1/projects.locations.datasets.fhirStores.fhir/executeBundle></a>.
      *
      * @param fhirStore the fhir store
@@ -887,7 +906,9 @@ public class FhirIO {
     }
 
     /**
-     * Execute bundles write.
+     * Execute Bundle Method executes a batch of requests in batch or as a 
single transaction @see
+     * <a
+     * 
href=https://cloud.google.com/healthcare/docs/reference/rest/v1/projects.locations.datasets.fhirStores.fhir/executeBundle></a>.
      *
      * @param fhirStore the fhir store
      * @return the write
@@ -921,15 +942,8 @@ public class FhirIO {
           return input.apply(new Import(getFhirStore(), tempPath, deadPath, 
contentStructure));
         case EXECUTE_BUNDLE:
         default:
-          bundles =
-              input.apply(
-                  "Execute FHIR Bundles",
-                  ParDo.of(new 
ExecuteBundles.ExecuteBundlesFn(this.getFhirStore()))
-                      .withOutputTags(SUCCESSFUL_BODY, 
TupleTagList.of(FAILED_BODY)));
-          bundles.get(SUCCESSFUL_BODY).setCoder(StringUtf8Coder.of());
-          
bundles.get(FAILED_BODY).setCoder(HealthcareIOErrorCoder.of(StringUtf8Coder.of()));
+          return input.apply(new ExecuteBundles(this.getFhirStore()));
       }
-      return Result.in(input.getPipeline(), bundles);
     }
   }
 
@@ -1159,7 +1173,7 @@ public class FhirIO {
           String resource =
               String.format(
                   "Failed to parse payload: %s as json at: %s : %s."
-                      + "Dropping message from batch import.",
+                      + "Dropping resource from batch import.",
                   httpBody, e.getLocation().getCharOffset(), e.getMessage());
           LOG.warn(resource);
           context.output(
@@ -1326,7 +1340,7 @@ public class FhirIO {
   }
 
   /** The type Execute bundles. */
-  public static class ExecuteBundles extends PTransform<PCollection<String>, 
Write.Result> {
+  public static class ExecuteBundles extends Write {
 
     private final ValueProvider<String> fhirStore;
 
@@ -1339,13 +1353,29 @@ public class FhirIO {
       this.fhirStore = fhirStore;
     }
 
-    /**
-     * Instantiates a new Execute bundles.
-     *
-     * @param fhirStore the fhir store
-     */
-    ExecuteBundles(String fhirStore) {
-      this.fhirStore = StaticValueProvider.of(fhirStore);
+    @Override
+    ValueProvider<String> getFhirStore() {
+      return fhirStore;
+    }
+
+    @Override
+    WriteMethod getWriteMethod() {
+      return WriteMethod.EXECUTE_BUNDLE;
+    }
+
+    @Override
+    Optional<ContentStructure> getContentStructure() {
+      return Optional.empty();
+    }
+
+    @Override
+    Optional<ValueProvider<String>> getImportGcsTempPath() {
+      return Optional.empty();
+    }
+
+    @Override
+    Optional<ValueProvider<String>> getImportGcsDeadLetterPath() {
+      return Optional.empty();
     }
 
     @Override
@@ -1666,16 +1696,20 @@ public class FhirIO {
           String transformName, PInput input, PTransform<?, ?> transform) {}
     }
 
-    /** The tag for the main output of Fhir Messages. */
+    /** The tag for the main output of FHIR Resources from a search. */
     public static final TupleTag<KV<String, JsonArray>> OUT =
         new TupleTag<KV<String, JsonArray>>() {};
-    /** The tag for the deadletter output of Fhir Messages. */
+    /** The tag for the deadletter output of FHIR Resources. */
     public static final TupleTag<HealthcareIOError<String>> DEAD_LETTER =
         new TupleTag<HealthcareIOError<String>>() {};
 
     @Override
     public FhirIO.Search.Result expand(PCollection<FhirSearchParameter<T>> 
input) {
-      return input.apply("Fetch Fhir messages", new 
SearchResourcesJsonString(this.fhirStore));
+      PCollectionTuple results =
+          input.apply(
+              ParDo.of(new SearchResourcesFn(this.fhirStore))
+                  .withOutputTags(FhirIO.Search.OUT, 
TupleTagList.of(FhirIO.Search.DEAD_LETTER)));
+      return FhirIO.Search.Result.of(results);
     }
 
     /**
@@ -1696,108 +1730,81 @@ public class FhirIO {
      *       stacktrace.
      * </ul>
      */
-    class SearchResourcesJsonString
-        extends PTransform<PCollection<FhirSearchParameter<T>>, 
FhirIO.Search.Result> {
+    class SearchResourcesFn extends DoFn<FhirSearchParameter<T>, KV<String, 
JsonArray>> {
 
+      private final Counter searchResourcesErrorCount =
+          Metrics.counter(
+              SearchResourcesFn.class, BASE_METRIC_PREFIX + 
"search_resource_error_count");
+      private final Counter searchResourcesSuccessCount =
+          Metrics.counter(
+              SearchResourcesFn.class, BASE_METRIC_PREFIX + 
"search_resource_success_count");
+      private final Distribution searchResourcesLatencyMs =
+          Metrics.distribution(
+              SearchResourcesFn.class, BASE_METRIC_PREFIX + 
"search_resource_latency_ms");
+      private final Logger log = 
LoggerFactory.getLogger(SearchResourcesFn.class);
+
+      private HealthcareApiClient client;
       private final ValueProvider<String> fhirStore;
 
-      public SearchResourcesJsonString(ValueProvider<String> fhirStore) {
+      /** Instantiates a new FHIR resources search fn. */
+      SearchResourcesFn(ValueProvider<String> fhirStore) {
         this.fhirStore = fhirStore;
       }
 
-      @Override
-      public FhirIO.Search.Result expand(PCollection<FhirSearchParameter<T>> 
resourceIds) {
-        return new FhirIO.Search.Result(
-            resourceIds.apply(
-                ParDo.of(new SearchResourcesFn(this.fhirStore))
-                    .withOutputTags(
-                        FhirIO.Search.OUT, 
TupleTagList.of(FhirIO.Search.DEAD_LETTER))));
+      /**
+       * Instantiate healthcare client.
+       *
+       * @throws IOException the io exception
+       */
+      @Setup
+      public void instantiateHealthcareClient() throws IOException {
+        this.client = new HttpHealthcareApiClient();
       }
 
-      /** DoFn for searching messages from the Fhir store with error handling. 
*/
-      class SearchResourcesFn extends DoFn<FhirSearchParameter<T>, KV<String, 
JsonArray>> {
-
-        private final Counter searchResourceErrors =
-            Metrics.counter(
-                SearchResourcesFn.class, BASE_METRIC_PREFIX + 
"search_resource_error_count");
-        private final Counter searchResourceSuccess =
-            Metrics.counter(
-                SearchResourcesFn.class, BASE_METRIC_PREFIX + 
"search_resource_success_count");
-        private final Distribution searchResourceLatencyMs =
-            Metrics.distribution(
-                SearchResourcesFn.class, BASE_METRIC_PREFIX + 
"search_resource_latency_ms");
-
-        private final Logger log = 
LoggerFactory.getLogger(SearchResourcesFn.class);
-        private HealthcareApiClient client;
-        private final ValueProvider<String> fhirStore;
-
-        /** Instantiates a new Fhir resources search fn. */
-        SearchResourcesFn(ValueProvider<String> fhirStore) {
-          this.fhirStore = fhirStore;
+      /**
+       * Process element.
+       *
+       * @param context the context
+       */
+      @ProcessElement
+      public void processElement(ProcessContext context) {
+        FhirSearchParameter<T> fhirSearchParameters = context.element();
+        try {
+          context.output(
+              KV.of(
+                  fhirSearchParameters.getKey(),
+                  searchResources(
+                      fhirSearchParameters.getResourceType(), 
fhirSearchParameters.getQueries())));
+        } catch (IllegalArgumentException | NoSuchElementException e) {
+          searchResourcesErrorCount.inc();
+          log.warn(
+              String.format(
+                  "Error search FHIR resources writing to Dead Letter "
+                      + "Queue. Cause: %s Stack Trace: %s",
+                  e.getMessage(), Throwables.getStackTraceAsString(e)));
+          context.output(
+              FhirIO.Search.DEAD_LETTER, 
HealthcareIOError.of(fhirSearchParameters.toString(), e));
         }
+      }
 
-        /**
-         * Instantiate healthcare client.
-         *
-         * @throws IOException the io exception
-         */
-        @Setup
-        public void instantiateHealthcareClient() throws IOException {
-          this.client = new HttpHealthcareApiClient();
-        }
+      private JsonArray searchResources(String resourceType, @Nullable 
Map<String, T> parameters)
+          throws NoSuchElementException {
+        long start = Instant.now().toEpochMilli();
 
-        /**
-         * Process element.
-         *
-         * @param context the context
-         */
-        @ProcessElement
-        public void processElement(ProcessContext context) {
-          FhirSearchParameter<T> fhirSearchParameters = context.element();
-          try {
-            context.output(
-                KV.of(
-                    fhirSearchParameters.getKey(),
-                    searchResources(
-                        this.client,
-                        this.fhirStore.toString(),
-                        fhirSearchParameters.getResourceType(),
-                        fhirSearchParameters.getQueries())));
-          } catch (IllegalArgumentException | NoSuchElementException e) {
-            searchResourceErrors.inc();
-            log.warn(
-                String.format(
-                    "Error search FHIR messages writing to Dead Letter "
-                        + "Queue. Cause: %s Stack Trace: %s",
-                    e.getMessage(), Throwables.getStackTraceAsString(e)));
-            context.output(
-                FhirIO.Search.DEAD_LETTER, 
HealthcareIOError.of(this.fhirStore.toString(), e));
-          }
+        HashMap<String, Object> parameterObjects = new HashMap<>();
+        if (parameters != null) {
+          parameters.forEach(parameterObjects::put);
         }
-
-        private JsonArray searchResources(
-            HealthcareApiClient client,
-            String fhirStore,
-            String resourceType,
-            @Nullable Map<String, T> parameters)
-            throws NoSuchElementException {
-          long start = Instant.now().toEpochMilli();
-
-          HashMap<String, Object> parameterObjects = new HashMap<>();
-          if (parameters != null) {
-            parameters.forEach(parameterObjects::put);
-          }
-          HttpHealthcareApiClient.FhirResourcePages.FhirResourcePagesIterator 
iter =
-              new 
HttpHealthcareApiClient.FhirResourcePages.FhirResourcePagesIterator(
-                  client, fhirStore, resourceType, parameterObjects);
-          JsonArray result = new JsonArray();
-          while (iter.hasNext()) {
-            result.addAll(iter.next());
-          }
-          
searchResourceLatencyMs.update(java.time.Instant.now().toEpochMilli() - start);
-          searchResourceSuccess.inc();
-          return result;
+        FhirResourcePagesIterator iter =
+            FhirResourcePagesIterator.ofSearch(
+                client, fhirStore.toString(), resourceType, parameterObjects);
+        JsonArray result = new JsonArray();
+        while (iter.hasNext()) {
+          result.addAll(iter.next());
         }
+        searchResourcesLatencyMs.update(java.time.Instant.now().toEpochMilli() 
- start);
+        searchResourcesSuccessCount.inc();
+        return result;
       }
     }
   }
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/FhirIOPatientEverything.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/FhirIOPatientEverything.java
new file mode 100644
index 0000000..8d91536
--- /dev/null
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/FhirIOPatientEverything.java
@@ -0,0 +1,237 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.healthcare;
+
+import static org.apache.beam.sdk.io.gcp.healthcare.FhirIO.BASE_METRIC_PREFIX;
+
+import com.google.auto.value.AutoValue;
+import com.google.gson.JsonArray;
+import java.io.IOException;
+import java.io.Serializable;
+import java.time.Instant;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import 
org.apache.beam.sdk.io.gcp.healthcare.FhirIOPatientEverything.PatientEverythingParameter;
+import 
org.apache.beam.sdk.io.gcp.healthcare.HttpHealthcareApiClient.FhirResourcePagesIterator;
+import org.apache.beam.sdk.metrics.Counter;
+import org.apache.beam.sdk.metrics.Distribution;
+import org.apache.beam.sdk.metrics.Metrics;
+import org.apache.beam.sdk.schemas.AutoValueSchema;
+import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionTuple;
+import org.apache.beam.sdk.values.PInput;
+import org.apache.beam.sdk.values.POutput;
+import org.apache.beam.sdk.values.PValue;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.sdk.values.TupleTagList;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Throwables;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
+import org.checkerframework.checker.nullness.qual.Nullable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** The type FhirIOPatientEverything for querying a FHIR Patient resource's 
compartment. * */
+public class FhirIOPatientEverything
+    extends PTransform<PCollection<PatientEverythingParameter>, 
FhirIOPatientEverything.Result> {
+
+  /** The tag for the main output of FHIR Resources from a 
GetPatientEverything request. */
+  public static final TupleTag<JsonArray> OUT = new TupleTag<JsonArray>() {};
+  /** The tag for the deadletter output of FHIR Resources from a 
GetPatientEverything request. */
+  public static final TupleTag<HealthcareIOError<String>> DEAD_LETTER =
+      new TupleTag<HealthcareIOError<String>>() {};
+
+  /**
+   * PatientEverythingParameter defines required attributes for a FHIR 
GetPatientEverything request
+   * in {@link FhirIOPatientEverything}. *
+   */
+  @DefaultSchema(AutoValueSchema.class)
+  @AutoValue
+  public abstract static class PatientEverythingParameter implements 
Serializable {
+
+    /**
+     * FHIR Patient resource name in the format of
+     * 
projects/{p}/locations/{l}/datasets/{d}/fhirStores/{f}/fhir/{resourceType}/{id}.
+     */
+    abstract String getResourceName();
+    /** Optional filters for the request, eg. start, end, _type, _since, 
_count */
+    abstract @Nullable Map<String, String> getFilters();
+
+    static Builder builder() {
+      return new 
AutoValue_FhirIOPatientEverything_PatientEverythingParameter.Builder();
+    }
+
+    @AutoValue.Builder
+    abstract static class Builder {
+      abstract Builder setResourceName(String resourceName);
+
+      abstract Builder setFilters(Map<String, String> query);
+
+      abstract PatientEverythingParameter build();
+    }
+  }
+
+  /** The Result for a {@link FhirIOPatientEverything} request. */
+  public static class Result implements POutput, PInput {
+
+    private final PCollection<JsonArray> patientCompartments;
+    private final PCollection<HealthcareIOError<String>> failedReads;
+
+    PCollectionTuple pct;
+
+    /**
+     * Create FhirIOPatientEverything.Result form PCollectionTuple with OUT 
and DEAD_LETTER tags.
+     *
+     * @param pct the pct
+     * @return the patient everything result
+     * @throws IllegalArgumentException the illegal argument exception
+     */
+    static Result of(PCollectionTuple pct) throws IllegalArgumentException {
+      if (pct.has(OUT) && pct.has(DEAD_LETTER)) {
+        return new Result(pct);
+      } else {
+        throw new IllegalArgumentException(
+            "The PCollection tuple must have the FhirIOPatientEverything.OUT "
+                + "and FhirIOPatientEverything.DEAD_LETTER tuple tags");
+      }
+    }
+
+    private Result(PCollectionTuple pct) {
+      this.pct = pct;
+      this.patientCompartments = pct.get(OUT).setCoder(JsonArrayCoder.of());
+      this.failedReads =
+          
pct.get(DEAD_LETTER).setCoder(HealthcareIOErrorCoder.of(StringUtf8Coder.of()));
+    }
+
+    /**
+     * Gets failed reads.
+     *
+     * @return the failed reads
+     */
+    public PCollection<HealthcareIOError<String>> getFailedReads() {
+      return failedReads;
+    }
+
+    /**
+     * Gets the patient compartment responses for GetPatientEverything 
requests.
+     *
+     * @return the read patient compartments
+     */
+    public PCollection<JsonArray> getPatientCompartments() {
+      return patientCompartments;
+    }
+
+    @Override
+    public Pipeline getPipeline() {
+      return this.pct.getPipeline();
+    }
+
+    @Override
+    public Map<TupleTag<?>, PValue> expand() {
+      return ImmutableMap.of(OUT, patientCompartments, DEAD_LETTER, 
failedReads);
+    }
+
+    @Override
+    public void finishSpecifyingOutput(
+        String transformName, PInput input, PTransform<?, ?> transform) {}
+  }
+
+  @Override
+  public Result expand(PCollection<PatientEverythingParameter> input) {
+    PCollectionTuple results =
+        input.apply(
+            "GetPatientEverything",
+            ParDo.of(new GetPatientEverythingFn())
+                .withOutputTags(OUT, TupleTagList.of(DEAD_LETTER)));
+    return new Result(results);
+  }
+
+  /** GetPatientEverythingFn executes a GetPatientEverything request. */
+  static class GetPatientEverythingFn extends DoFn<PatientEverythingParameter, 
JsonArray> {
+
+    private static final Counter GET_PATIENT_EVERYTHING_ERROR_COUNT =
+        Metrics.counter(
+            GetPatientEverythingFn.class,
+            BASE_METRIC_PREFIX + "get_patient_everything_error_count");
+    private static final Counter GET_PATIENT_EVERYTHING_SUCCESS_COUNT =
+        Metrics.counter(
+            GetPatientEverythingFn.class,
+            BASE_METRIC_PREFIX + "get_patient_everything_success_count");
+    private static final Distribution GET_PATIENT_EVERYTHING_LATENCY_MS =
+        Metrics.distribution(
+            GetPatientEverythingFn.class, BASE_METRIC_PREFIX + 
"get_patient_everything_latency_ms");
+    private static final Logger LOG = 
LoggerFactory.getLogger(GetPatientEverythingFn.class);
+
+    @SuppressWarnings("initialization.fields.uninitialized")
+    private HealthcareApiClient client;
+
+    /**
+     * Instantiate healthcare client.
+     *
+     * @throws IOException the io exception
+     */
+    @Setup
+    public void instantiateHealthcareClient() throws IOException {
+      this.client = new HttpHealthcareApiClient();
+    }
+
+    @ProcessElement
+    public void processElement(ProcessContext context) {
+      PatientEverythingParameter patientEverythingParameter = 
context.element();
+      try {
+        context.output(
+            getPatientEverything(
+                patientEverythingParameter.getResourceName(),
+                patientEverythingParameter.getFilters()));
+      } catch (IllegalArgumentException | NoSuchElementException e) {
+        GET_PATIENT_EVERYTHING_ERROR_COUNT.inc();
+        LOG.warn(
+            String.format(
+                "Error executing GetPatientEverything: FHIR resources writing 
to Dead Letter "
+                    + "Queue. Cause: %s Stack Trace: %s",
+                e.getMessage(), Throwables.getStackTraceAsString(e)));
+        context.output(DEAD_LETTER, 
HealthcareIOError.of(patientEverythingParameter.toString(), e));
+      }
+    }
+
+    private JsonArray getPatientEverything(
+        String resourceName, @Nullable Map<String, String> filters) {
+      long start = Instant.now().toEpochMilli();
+
+      HashMap<String, Object> filterObjects = new HashMap<>();
+      if (filters != null) {
+        filterObjects.putAll(filters);
+      }
+      FhirResourcePagesIterator iter =
+          FhirResourcePagesIterator.ofPatientEverything(client, resourceName, 
filterObjects);
+      JsonArray result = new JsonArray();
+      while (iter.hasNext()) {
+        result.addAll(iter.next());
+      }
+      
GET_PATIENT_EVERYTHING_LATENCY_MS.update(java.time.Instant.now().toEpochMilli() 
- start);
+      GET_PATIENT_EVERYTHING_SUCCESS_COUNT.inc();
+      return result;
+    }
+  }
+}
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/FhirSearchParameter.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/FhirSearchParameter.java
index 6f80d86..87692d8 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/FhirSearchParameter.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/FhirSearchParameter.java
@@ -29,10 +29,17 @@ import org.checkerframework.checker.nullness.qual.Nullable;
 @DefaultCoder(FhirSearchParameterCoder.class)
 public class FhirSearchParameter<T> {
 
+  /** FHIR resource type. */
   private final String resourceType;
-  // The key is used as a key for the search query, if there is source 
information to propagate
-  // through the pipeline.
+  /**
+   * The key is used as a key for the search query, if there is source 
information to propagate
+   * through the pipeline.
+   */
   private final String key;
+  /**
+   * The search query. For an OR search, put both query values in a single 
string. For an AND
+   * search, use a list.
+   */
   private final @Nullable Map<String, T> queries;
 
   private FhirSearchParameter(
@@ -46,11 +53,28 @@ public class FhirSearchParameter<T> {
     this.queries = queries;
   }
 
+  /**
+   * Creates a FhirSearchParameter of type T.
+   *
+   * @param resourceType the FHIR resource type.
+   * @param key Key for the search query to key the results with.
+   * @param queries The search query.
+   * @param <T> The type of the search query value.
+   * @return The FhirSearchParameter of type T.
+   */
   public static <T> FhirSearchParameter<T> of(
       String resourceType, @Nullable String key, @Nullable Map<String, T> 
queries) {
     return new FhirSearchParameter<>(resourceType, key, queries);
   }
 
+  /**
+   * Creates a FhirSearchParameter of type T, without a key.
+   *
+   * @param resourceType the FHIR resource type.
+   * @param queries The search query.
+   * @param <T> The type of the search query value.
+   * @return The FhirSearchParameter of type T.
+   */
   public static <T> FhirSearchParameter<T> of(
       String resourceType, @Nullable Map<String, T> queries) {
     return new FhirSearchParameter<>(resourceType, null, queries);
@@ -89,15 +113,8 @@ public class FhirSearchParameter<T> {
 
   @Override
   public String toString() {
-    return "FhirSearchParameter{"
-        + "resourceType='"
-        + resourceType
-        + '\''
-        + ", key='"
-        + key
-        + '\''
-        + ", queries="
-        + queries
-        + '}';
+    return String.format(
+        "FhirSearchParameter{resourceType='%s', key='%s', queries='%s'}'",
+        resourceType, key, queries);
   }
 }
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/HealthcareApiClient.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/HealthcareApiClient.java
index 13b2d5c..afda939 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/HealthcareApiClient.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/HealthcareApiClient.java
@@ -164,11 +164,12 @@ public interface HealthcareApiClient {
   /**
    * Read fhir resource http body.
    *
-   * @param resourceId the resource
+   * @param resourceName the resource name, in format
+   *     
projects/{p}/locations/{l}/datasets/{d}/fhirStores/{f}/fhir/{resourceType}/{id}
    * @return the http body
    * @throws IOException the io exception
    */
-  HttpBody readFhirResource(String resourceId) throws IOException;
+  HttpBody readFhirResource(String resourceName) throws IOException;
 
   /**
    * Search fhir resource http body.
@@ -187,6 +188,19 @@ public interface HealthcareApiClient {
       throws IOException;
 
   /**
+   * Fhir get patient everythhing http body.
+   *
+   * @param resourceName the resource name, in format
+   *     
projects/{p}/locations/{l}/datasets/{d}/fhirStores/{f}/fhir/{resourceType}/{id}
+   * @param filters optional request filters
+   * @return the http body
+   * @throws IOException
+   */
+  HttpBody getPatientEverything(
+      String resourceName, @Nullable Map<String, Object> filters, String 
pageToken)
+      throws IOException;
+
+  /**
    * Create hl 7 v 2 store hl 7 v 2 store.
    *
    * @param dataset the dataset
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/HttpHealthcareApiClient.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/HttpHealthcareApiClient.java
index 0fdd721..13dfe28 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/HttpHealthcareApiClient.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/HttpHealthcareApiClient.java
@@ -25,6 +25,7 @@ import com.google.api.client.http.javanet.NetHttpTransport;
 import com.google.api.client.json.JsonFactory;
 import com.google.api.client.json.jackson2.JacksonFactory;
 import com.google.api.services.healthcare.v1.CloudHealthcare;
+import 
com.google.api.services.healthcare.v1.CloudHealthcare.Projects.Locations.Datasets.FhirStores.Fhir.PatientEverything;
 import 
com.google.api.services.healthcare.v1.CloudHealthcare.Projects.Locations.Datasets.FhirStores.Fhir.Search;
 import 
com.google.api.services.healthcare.v1.CloudHealthcare.Projects.Locations.Datasets.Hl7V2Stores.Messages;
 import com.google.api.services.healthcare.v1.CloudHealthcareScopes;
@@ -629,8 +630,15 @@ public class HttpHealthcareApiClient implements 
HealthcareApiClient, Serializabl
   }
 
   @Override
-  public HttpBody readFhirResource(String resourceId) throws IOException {
-    return 
client.projects().locations().datasets().fhirStores().fhir().read(resourceId).execute();
+  public HttpBody readFhirResource(String resourceName) throws IOException {
+    return client
+        .projects()
+        .locations()
+        .datasets()
+        .fhirStores()
+        .fhir()
+        .read(resourceName)
+        .execute();
   }
 
   @Override
@@ -652,6 +660,27 @@ public class HttpHealthcareApiClient implements 
HealthcareApiClient, Serializabl
     return search.execute();
   }
 
+  @Override
+  public HttpBody getPatientEverything(
+      String resourceName, @Nullable Map<String, Object> filters, String 
pageToken)
+      throws IOException {
+    PatientEverything patientEverything =
+        client
+            .projects()
+            .locations()
+            .datasets()
+            .fhirStores()
+            .fhir()
+            .patientEverything(resourceName);
+    if (filters != null && !filters.isEmpty()) {
+      filters.forEach(patientEverything::set);
+    }
+    if (pageToken != null && !pageToken.isEmpty()) {
+      patientEverything.set("_page_token", URLDecoder.decode(pageToken, 
"UTF-8"));
+    }
+    return patientEverything.execute();
+  }
+
   public static class AuthenticatedRetryInitializer extends 
RetryHttpRequestInitializer {
 
     GoogleCredentials credentials;
@@ -862,147 +891,135 @@ public class HttpHealthcareApiClient implements 
HealthcareApiClient, Serializabl
     }
   }
 
-  public static class FhirResourcePages implements Iterable<JsonArray> {
+  /** The type FhirResourcePagesIterator for methods which return paged 
output. */
+  public static class FhirResourcePagesIterator implements Iterator<JsonArray> 
{
+
+    public enum FhirMethod {
+      SEARCH,
+      PATIENT_EVERYTHING
+    }
 
+    private final FhirMethod fhirMethod;
     private final String fhirStore;
     private final String resourceType;
+    private final String resourceName;
     private final Map<String, Object> parameters;
-    private transient HealthcareApiClient client;
 
-    /**
-     * Instantiates a new Fhir resource pages.
-     *
-     * @param client the client
-     * @param fhirStore the Fhir store
-     * @param resourceType the Fhir resource type to search for
-     * @param parameters the search parameters
-     */
-    FhirResourcePages(
+    private final HealthcareApiClient client;
+    private final ObjectMapper mapper;
+    private String pageToken;
+    private boolean isFirstRequest;
+
+    private FhirResourcePagesIterator(
+        FhirMethod fhirMethod,
         HealthcareApiClient client,
         String fhirStore,
         String resourceType,
+        String resourceName,
         @Nullable Map<String, Object> parameters) {
+      this.fhirMethod = fhirMethod;
       this.client = client;
       this.fhirStore = fhirStore;
       this.resourceType = resourceType;
+      this.resourceName = resourceName;
       this.parameters = parameters;
+      this.pageToken = null;
+      this.isFirstRequest = true;
+      this.mapper = new ObjectMapper();
     }
 
     /**
-     * Make search request.
+     * Instantiates a new search FHIR resource pages iterator.
      *
      * @param client the client
      * @param fhirStore the Fhir store
      * @param resourceType the Fhir resource type to search for
-     * @param parameters the search parameters
-     * @param pageToken the page token
-     * @return the search response
-     * @throws IOException the io exception
+     * @param parameters the query parameters
      */
-    public static HttpBody makeSearchRequest(
+    public static FhirResourcePagesIterator ofSearch(
         HealthcareApiClient client,
         String fhirStore,
         String resourceType,
-        @Nullable Map<String, Object> parameters,
-        String pageToken)
-        throws IOException {
-      return client.searchFhirResource(fhirStore, resourceType, parameters, 
pageToken);
+        @Nullable Map<String, Object> parameters) {
+      return new FhirResourcePagesIterator(
+          FhirMethod.SEARCH, client, fhirStore, resourceType, "", parameters);
     }
 
-    @Override
-    public Iterator<JsonArray> iterator() {
+    /**
+     * Instantiates a new GetPatientEverything FHIR resource pages iterator.
+     *
+     * @param client the client
+     * @param resourceName the FHIR resource name
+     * @param parameters the filter parameters
+     */
+    public static FhirResourcePagesIterator ofPatientEverything(
+        HealthcareApiClient client, String resourceName, @Nullable Map<String, 
Object> parameters) {
       return new FhirResourcePagesIterator(
-          this.client, this.fhirStore, this.resourceType, this.parameters);
+          FhirMethod.PATIENT_EVERYTHING, client, "", "", resourceName, 
parameters);
     }
 
-    /** The type Fhir resource pages iterator. */
-    public static class FhirResourcePagesIterator implements 
Iterator<JsonArray> {
-
-      private final String fhirStore;
-      private final String resourceType;
-      private final Map<String, Object> parameters;
-      private HealthcareApiClient client;
-      private String pageToken;
-      private boolean isFirstRequest;
-      private ObjectMapper mapper;
-
-      /**
-       * Instantiates a new Fhir resource pages iterator.
-       *
-       * @param client the client
-       * @param fhirStore the Fhir store
-       * @param resourceType the Fhir resource type to search for
-       * @param parameters the search parameters
-       */
-      FhirResourcePagesIterator(
-          HealthcareApiClient client,
-          String fhirStore,
-          String resourceType,
-          @Nullable Map<String, Object> parameters) {
-        this.client = client;
-        this.fhirStore = fhirStore;
-        this.resourceType = resourceType;
-        this.parameters = parameters;
-        this.pageToken = null;
-        this.isFirstRequest = true;
-        this.mapper = new ObjectMapper();
+    @Override
+    public boolean hasNext() throws NoSuchElementException {
+      if (!isFirstRequest) {
+        return this.pageToken != null && !this.pageToken.isEmpty();
       }
+      try {
+        HttpBody response = executeFhirRequest();
+        JsonObject jsonResponse =
+            
JsonParser.parseString(mapper.writeValueAsString(response)).getAsJsonObject();
+        JsonArray resources = jsonResponse.getAsJsonArray("entry");
+        return resources != null && resources.size() != 0;
+      } catch (IOException e) {
+        throw new NoSuchElementException(
+            String.format(
+                "Failed to list first page of FHIR resources from %s: %s",
+                fhirStore, e.getMessage()));
+      }
+    }
 
-      @Override
-      public boolean hasNext() throws NoSuchElementException {
-        if (!isFirstRequest) {
-          return this.pageToken != null && !this.pageToken.isEmpty();
-        }
-        try {
-          HttpBody response =
-              makeSearchRequest(client, fhirStore, resourceType, parameters, 
this.pageToken);
-          JsonObject jsonResponse =
-              
JsonParser.parseString(mapper.writeValueAsString(response)).getAsJsonObject();
-          JsonArray resources = jsonResponse.getAsJsonArray("entry");
-          return resources != null && resources.size() != 0;
-        } catch (IOException e) {
-          throw new NoSuchElementException(
-              String.format(
-                  "Failed to list first page of Fhir resources from %s: %s",
-                  fhirStore, e.getMessage()));
-        }
+    @Override
+    public JsonArray next() throws NoSuchElementException {
+      try {
+        HttpBody response = executeFhirRequest();
+        this.isFirstRequest = false;
+        JsonObject jsonResponse =
+            
JsonParser.parseString(mapper.writeValueAsString(response)).getAsJsonObject();
+        JsonArray links = jsonResponse.getAsJsonArray("link");
+        this.pageToken = parsePageToken(links);
+        JsonArray resources = jsonResponse.getAsJsonArray("entry");
+        return resources;
+      } catch (IOException e) {
+        this.pageToken = null;
+        throw new NoSuchElementException(
+            String.format("Error listing FHIR resources from %s: %s", 
fhirStore, e.getMessage()));
       }
+    }
 
-      @Override
-      public JsonArray next() throws NoSuchElementException {
-        try {
-          HttpBody response =
-              makeSearchRequest(client, fhirStore, resourceType, parameters, 
this.pageToken);
-          this.isFirstRequest = false;
-          JsonObject jsonResponse =
-              
JsonParser.parseString(mapper.writeValueAsString(response)).getAsJsonObject();
-          JsonArray links = jsonResponse.getAsJsonArray("link");
-          this.pageToken = parsePageToken(links);
-          JsonArray resources = jsonResponse.getAsJsonArray("entry");
-          return resources;
-        } catch (IOException e) {
-          this.pageToken = null;
-          throw new NoSuchElementException(
-              String.format("Error listing Fhir resources from %s: %s", 
fhirStore, e.getMessage()));
-        }
+    private HttpBody executeFhirRequest() throws IOException {
+      switch (fhirMethod) {
+        case PATIENT_EVERYTHING:
+          return client.getPatientEverything(resourceName, parameters, 
pageToken);
+        case SEARCH:
+        default:
+          return client.searchFhirResource(fhirStore, resourceType, 
parameters, pageToken);
       }
+    }
 
-      private static String parsePageToken(JsonArray links) throws 
MalformedURLException {
-        for (JsonElement e : links) {
-          JsonObject link = e.getAsJsonObject();
-          if (link.get("relation").getAsString().equalsIgnoreCase("next")) {
-            URL url = new URL(link.get("url").getAsString());
-            List<String> parameters = 
Splitter.on("&").splitToList(url.getQuery());
-            for (String parameter : parameters) {
-              List<String> parts = 
Splitter.on("=").limit(2).splitToList(parameter);
-              if (parts.get(0).equalsIgnoreCase("_page_token")) {
-                return parts.get(1);
-              }
+    private static String parsePageToken(JsonArray links) throws 
MalformedURLException {
+      for (JsonElement e : links) {
+        JsonObject link = e.getAsJsonObject();
+        if (link.get("relation").getAsString().equalsIgnoreCase("next")) {
+          URL url = new URL(link.get("url").getAsString());
+          List<String> parameters = 
Splitter.on("&").splitToList(url.getQuery());
+          for (String parameter : parameters) {
+            List<String> parts = 
Splitter.on("=").limit(2).splitToList(parameter);
+            if (parts.get(0).equalsIgnoreCase("_page_token")) {
+              return parts.get(1);
             }
           }
         }
-        return "";
       }
+      return "";
     }
   }
 }
diff --git 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/healthcare/FhirIOPatientEverythingIT.java
 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/healthcare/FhirIOPatientEverythingIT.java
new file mode 100644
index 0000000..323b4ad
--- /dev/null
+++ 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/healthcare/FhirIOPatientEverythingIT.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.healthcare;
+
+import static 
org.apache.beam.sdk.io.gcp.healthcare.HL7v2IOTestUtil.HEALTHCARE_DATASET_TEMPLATE;
+import static org.junit.Assert.assertNotEquals;
+
+import com.google.gson.JsonArray;
+import java.io.IOException;
+import java.security.SecureRandom;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import org.apache.beam.runners.direct.DirectOptions;
+import 
org.apache.beam.sdk.io.gcp.healthcare.FhirIOPatientEverything.PatientEverythingParameter;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.values.PCollection;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+
+@RunWith(Parameterized.class)
+public class FhirIOPatientEverythingIT {
+
+  public String version;
+  private final String project;
+  private transient HealthcareApiClient client;
+  private static String healthcareDataset;
+  private static final String BASE_STORE_ID =
+      "FHIR_store_patient_everything_it_"
+          + System.currentTimeMillis()
+          + "_"
+          + new SecureRandom().nextInt(32);
+  private String fhirStoreId;
+
+  private List<PatientEverythingParameter> input = new ArrayList<>();
+
+  @Parameters(name = "{0}")
+  public static Collection<String> versions() {
+    return Arrays.asList("R4");
+  }
+
+  @Rule public transient TestPipeline pipeline = TestPipeline.create();
+
+  public FhirIOPatientEverythingIT(String version) {
+    this.version = version;
+    this.fhirStoreId = BASE_STORE_ID + version;
+    this.project =
+        TestPipeline.testingPipelineOptions()
+            .as(HealthcareStoreTestPipelineOptions.class)
+            .getStoreProjectId();
+  }
+
+  @Before
+  public void setup() throws Exception {
+    healthcareDataset = String.format(HEALTHCARE_DATASET_TEMPLATE, project);
+    if (client == null) {
+      this.client = new HttpHealthcareApiClient();
+    }
+    client.createFhirStore(healthcareDataset, fhirStoreId, version, "");
+
+    List<String> bundles = FhirIOTestUtil.BUNDLES.get(version);
+    List<String> prepopulatedResourceNames =
+        FhirIOTestUtil.executeFhirBundles(
+            client, healthcareDataset + "/fhirStores/" + fhirStoreId, bundles);
+
+    HashMap<String, String> filters = new HashMap<>();
+    // filters.put("_count", Integer.toString(50));
+
+    int requests = 0;
+    for (String resourceName : prepopulatedResourceNames) {
+      // Skip non-patient resource types.
+      if (!resourceName.contains("/fhir/Patient/")) {
+        continue;
+      }
+      input.add(
+          PatientEverythingParameter.builder()
+              .setResourceName(resourceName)
+              .setFilters(filters)
+              .build());
+
+      requests++;
+      if (requests > 50) {
+        break;
+      }
+    }
+  }
+
+  @After
+  public void teardown() throws IOException {
+    HealthcareApiClient client = new HttpHealthcareApiClient();
+    for (String version : versions()) {
+      client.deleteFhirStore(healthcareDataset + "/fhirStores/" + 
BASE_STORE_ID + version);
+    }
+  }
+
+  @Test
+  public void testFhirIOPatientEverything() {
+    pipeline.getOptions().as(DirectOptions.class).setBlockOnRun(false);
+
+    PCollection<PatientEverythingParameter> everythingConfigs = 
pipeline.apply(Create.of(input));
+    FhirIOPatientEverything.Result result = 
everythingConfigs.apply(FhirIO.getPatientEverything());
+
+    // Verify that there are no failures.
+    PAssert.that(result.getFailedReads()).empty();
+    // Verify that none of the result resource sets are empty sets.
+    PCollection<JsonArray> resources = result.getPatientCompartments();
+    PAssert.that(resources)
+        .satisfies(
+            input -> {
+              for (JsonArray resource : input) {
+                assertNotEquals(0, resource.size());
+              }
+              return null;
+            });
+
+    pipeline.run().waitUntilFinish();
+  }
+}
diff --git 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/healthcare/FhirIOTest.java
 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/healthcare/FhirIOTest.java
index 6326407..c85d1c4 100644
--- 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/healthcare/FhirIOTest.java
+++ 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/healthcare/FhirIOTest.java
@@ -22,6 +22,7 @@ import java.util.Collections;
 import java.util.List;
 import org.apache.beam.sdk.coders.ListCoder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
+import 
org.apache.beam.sdk.io.gcp.healthcare.FhirIOPatientEverything.PatientEverythingParameter;
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.transforms.Count;
@@ -60,8 +61,7 @@ public class FhirIOTest {
 
   @Test
   public void test_FhirIO_failedSearches() {
-    List<FhirSearchParameter<String>> input =
-        Arrays.asList(FhirSearchParameter.of("resource-type-1", null));
+    FhirSearchParameter<String> input = 
FhirSearchParameter.of("resource-type-1", null);
     FhirIO.Search.Result searchResult =
         pipeline
             
.apply(Create.of(input).withCoder(FhirSearchParameterCoder.of(StringUtf8Coder.of())))
@@ -73,7 +73,7 @@ public class FhirIOTest {
         failed.apply(
             
MapElements.into(TypeDescriptors.strings()).via(HealthcareIOError::getDataResource));
 
-    PAssert.that(failedMsgIds).containsInAnyOrder(Arrays.asList("bad-store"));
+    PAssert.that(failedMsgIds).containsInAnyOrder(input.toString());
     PAssert.that(searchResult.getResources()).empty();
     PAssert.that(searchResult.getKeyedResources()).empty();
     pipeline.run();
@@ -81,8 +81,7 @@ public class FhirIOTest {
 
   @Test
   public void test_FhirIO_failedSearchesWithGenericParameters() {
-    List<FhirSearchParameter<List<String>>> input =
-        Arrays.asList(FhirSearchParameter.of("resource-type-1", null));
+    FhirSearchParameter<List<String>> input = 
FhirSearchParameter.of("resource-type-1", null);
     FhirIO.Search.Result searchResult =
         pipeline
             .apply(
@@ -98,7 +97,7 @@ public class FhirIOTest {
         failed.apply(
             
MapElements.into(TypeDescriptors.strings()).via(HealthcareIOError::getDataResource));
 
-    PAssert.that(failedMsgIds).containsInAnyOrder(Arrays.asList("bad-store"));
+    PAssert.that(failedMsgIds).containsInAnyOrder(input.toString());
     PAssert.that(searchResult.getResources()).empty();
     PAssert.that(searchResult.getKeyedResources()).empty();
     pipeline.run();
@@ -131,5 +130,20 @@ public class FhirIOTest {
     pipeline.run();
   }
 
-  private static final long NUM_ELEMENTS = 11;
+  @Test
+  public void test_FhirIO_failedPatientEverything() {
+    PatientEverythingParameter input =
+        
PatientEverythingParameter.builder().setResourceName("bad-resource-name").build();
+    FhirIOPatientEverything.Result everythingResult =
+        pipeline.apply(Create.of(input)).apply(FhirIO.getPatientEverything());
+
+    PCollection<HealthcareIOError<String>> failed = 
everythingResult.getFailedReads();
+    PCollection<String> failedEverything =
+        failed.apply(
+            
MapElements.into(TypeDescriptors.strings()).via(HealthcareIOError::getDataResource));
+
+    PAssert.that(failedEverything).containsInAnyOrder(input.toString());
+    PAssert.that(everythingResult.getPatientCompartments()).empty();
+    pipeline.run();
+  }
 }
diff --git 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/healthcare/FhirIOTestUtil.java
 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/healthcare/FhirIOTestUtil.java
index c893f59..6f2a6ce 100644
--- 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/healthcare/FhirIOTestUtil.java
+++ 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/healthcare/FhirIOTestUtil.java
@@ -17,13 +17,18 @@
  */
 package org.apache.beam.sdk.io.gcp.healthcare;
 
+import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.api.client.http.HttpHeaders;
 import com.google.api.client.http.HttpRequestInitializer;
 import com.google.api.client.http.javanet.NetHttpTransport;
 import com.google.api.client.json.jackson2.JacksonFactory;
+import com.google.api.services.healthcare.v1.model.HttpBody;
 import com.google.api.services.storage.Storage;
 import com.google.api.services.storage.model.StorageObject;
 import com.google.auth.oauth2.GoogleCredentials;
+import com.google.gson.JsonElement;
+import com.google.gson.JsonObject;
+import com.google.gson.JsonParser;
 import java.io.File;
 import java.io.IOException;
 import java.net.URI;
@@ -41,6 +46,7 @@ import java.util.stream.Stream;
 import 
org.apache.beam.sdk.io.gcp.healthcare.HttpHealthcareApiClient.HealthcareHttpException;
 
 class FhirIOTestUtil {
+  private static final ObjectMapper mapper = new ObjectMapper();
   public static final String DEFAULT_TEMP_BUCKET = 
"temp-storage-for-healthcare-io-tests";
 
   private static Stream<String> readPrettyBundles(String version) {
@@ -82,15 +88,30 @@ class FhirIOTestUtil {
   }
 
   /** Populate the test resources into the FHIR store and returns a list of 
resource IDs. */
-  static void executeFhirBundles(HealthcareApiClient client, String fhirStore, 
List<String> bundles)
+  static List<String> executeFhirBundles(
+      HealthcareApiClient client, String fhirStore, List<String> bundles)
       throws IOException, HealthcareHttpException {
+    List<String> resourceNames = new ArrayList<>();
     for (String bundle : bundles) {
-      client.executeFhirBundle(fhirStore, bundle);
+      HttpBody resp = client.executeFhirBundle(fhirStore, bundle);
+
+      JsonObject jsonResponse = 
JsonParser.parseString(resp.getData()).getAsJsonObject();
+      for (JsonElement entry : jsonResponse.getAsJsonArray("entry")) {
+        String location =
+            entry
+                .getAsJsonObject()
+                .getAsJsonObject("response")
+                .getAsJsonPrimitive("location")
+                .getAsString();
+        String resourceName =
+            location.substring(location.indexOf("project"), 
location.indexOf("/_history"));
+        resourceNames.add(resourceName);
+      }
     }
+    return resourceNames;
   }
 
   public static void tearDownTempBucket() throws IOException {
-
     GoogleCredentials credentials = GoogleCredentials.getApplicationDefault();
     HttpRequestInitializer requestInitializer =
         request -> {

Reply via email to