This is an automated email from the ASF dual-hosted git repository.
pabloem pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 5ce29e7 Merge pull request #15936 from [BEAM-13351] FhirIO
GetPatientEverything connector
5ce29e7 is described below
commit 5ce29e7a6f835edc02698ffdfd3dfb5e4287edd1
Author: Milena Bukal <[email protected]>
AuthorDate: Tue Nov 30 11:18:09 2021 -0400
Merge pull request #15936 from [BEAM-13351] FhirIO GetPatientEverything
connector
* GetPatientEverythig implementation
* Adding tests + comments
* Merge branch 'PATIENT_SOMETHING' of https://github.com/msbukal/beam into
PATIENT_SOMETHING
* Fix Presubmit warnings/errors
* Fix checkStyle
* Review fixes
* Review fixes 2
---
.../apache/beam/sdk/io/gcp/healthcare/FhirIO.java | 295 +++++++++++----------
.../io/gcp/healthcare/FhirIOPatientEverything.java | 237 +++++++++++++++++
.../sdk/io/gcp/healthcare/FhirSearchParameter.java | 41 ++-
.../sdk/io/gcp/healthcare/HealthcareApiClient.java | 18 +-
.../io/gcp/healthcare/HttpHealthcareApiClient.java | 225 ++++++++--------
.../gcp/healthcare/FhirIOPatientEverythingIT.java | 142 ++++++++++
.../beam/sdk/io/gcp/healthcare/FhirIOTest.java | 28 +-
.../beam/sdk/io/gcp/healthcare/FhirIOTestUtil.java | 27 +-
8 files changed, 741 insertions(+), 272 deletions(-)
diff --git
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/FhirIO.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/FhirIO.java
index 6ca2395..7e7d351 100644
---
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/FhirIO.java
+++
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/FhirIO.java
@@ -54,6 +54,8 @@ import org.apache.beam.sdk.io.fs.MatchResult.Status;
import org.apache.beam.sdk.io.fs.MoveOptions.StandardMoveOptions;
import org.apache.beam.sdk.io.fs.ResolveOptions.StandardResolveOptions;
import org.apache.beam.sdk.io.fs.ResourceId;
+import org.apache.beam.sdk.io.gcp.healthcare.FhirIO.Import.ContentStructure;
+import
org.apache.beam.sdk.io.gcp.healthcare.HttpHealthcareApiClient.FhirResourcePagesIterator;
import
org.apache.beam.sdk.io.gcp.healthcare.HttpHealthcareApiClient.HealthcareHttpException;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
import org.apache.beam.sdk.metrics.Counter;
@@ -94,20 +96,25 @@ import org.slf4j.LoggerFactory;
*
* <h3>Reading</h3>
*
- * <p>FHIR resources can be read with {@link FhirIO.Read}, which supports use
cases where you have
- * a ${@link PCollection} of message IDs. This is appropriate for reading the
Fhir notifications
- * from a Pub/Sub subscription with {@link PubsubIO#readStrings()} or in cases
where you have a
- * manually prepared list of messages that you need to process (e.g. in a text
file read with {@link
+ * <p>FHIR resources can be read with {@link FhirIO.Read}, which supports use
cases where you have a
+ * ${@link PCollection} of FHIR resource names in the format of
projects/{p}/locations/{l}/datasets/{d}/fhirStores/{f}/fhir/{resourceType}/{id}.
+ * This is appropriate for reading the Fhir notifications from
+ * a Pub/Sub subscription with {@link PubsubIO#readStrings()} or in cases
where you have a manually
+ * prepared list of resources that you need to process (e.g. in a text file
read with {@link
* org.apache.beam.sdk.io.TextIO}*) .
*
- * <p>Fetch Resource contents from Fhir Store based on the {@link PCollection}
of message ID
- * strings {@link FhirIO.Read.Result} where one can call {@link
Read.Result#getResources()} to
- * retrieve a {@link PCollection} containing the successfully fetched {@link
String}s and/or {@link
- * FhirIO.Read.Result#getFailedReads()}* to retrieve a {@link PCollection} of
{@link
- * HealthcareIOError}* containing the resource ID that could not be fetched
and the exception as a
+ * <p>Get Resource contents from the FHIR Store based on the {@link
PCollection} of FHIR resource name strings
+ * {@link FhirIO.Read.Result} where one can call {@link
Read.Result#getResources()} to retrieve a
+ * {@link PCollection} containing the successfully fetched json resources as
{@link String}s and/or {@link
+ * FhirIO.Read.Result#getFailedReads()} to retrieve a {@link PCollection} of
{@link HealthcareIOError}
+ * containing the resources that could not be fetched and the exception as a
* {@link HealthcareIOError}, this can be used to write to the dead letter
storage system of your
* choosing. This error handling is mainly to transparently surface errors
where the upstream {@link
- * PCollection}* contains IDs that are not valid or are not reachable due to
permissions issues.
+ * PCollection} contains FHIR resources that are not valid or are not
reachable due to permissions issues.
+ *
+ * Additionally, you can query an entire FHIR Patient resource's compartment
(resources that
+ * refer to the patient, and are referred to by the patient) by calling {@link
FhirIO.getPatientEverything} to
+ * execute a FHIR GetPatientEverythingRequest.
*
* <h3>Writing</h3>
*
@@ -157,12 +164,12 @@ import org.slf4j.LoggerFactory;
*
PubsubIO.readStrings().fromSubscription(options.getNotificationSubscription()))
* .apply(FhirIO.readResources());
*
- * // happily retrived messages
+ * // happily retrived resources
* PCollection<String> resources = readResult.getResources();
- * // message IDs that couldn't be retrieved + error context
+ * // resource paths that couldn't be retrieved + error context
* PCollection<HealthcareIOError<String>> failedReads =
readResult.getFailedReads();
*
- * failedReads.apply("Write Message IDs / Stacktrace for Failed Reads to
BigQuery",
+ * failedReads.apply("Write Resources / Stacktrace for Failed Reads to
BigQuery",
* BigQueryIO
* .write()
* .to(option.getBQFhirExecuteBundlesDeadLetterTable())
@@ -229,7 +236,7 @@ import org.slf4j.LoggerFactory;
})
public class FhirIO {
- private static final String BASE_METRIC_PREFIX = "fhirio/";
+ static final String BASE_METRIC_PREFIX = "fhirio/";
private static final String LRO_COUNTER_KEY = "counter";
private static final String LRO_SUCCESS_KEY = "success";
private static final String LRO_FAILURE_KEY = "failure";
@@ -372,6 +379,18 @@ public class FhirIO {
}
/**
+ * Get the patient compartment for a FHIR Patient using the
GetPatientEverything/$everything API.
+ *
+ * @see <a
+ *
href=https://cloud.google.com/healthcare-api/docs/reference/rest/v1/projects.locations.datasets.fhirStores.fhir/Patient-everything></a>
+ * @return the patient everything
+ * @see FhirIOPatientEverything
+ */
+ public static FhirIOPatientEverything getPatientEverything() {
+ return new FhirIOPatientEverything();
+ }
+
+ /**
* Increments success and failure counters for an LRO. To be used after the
LRO has completed.
* This function leverages the fact that the LRO metadata is always of the
format: "counter": {
* "success": "1", "failure": "1" }
@@ -494,22 +513,22 @@ public class FhirIO {
String transformName, PInput input, PTransform<?, ?> transform) {}
}
- /** The tag for the main output of Fhir Messages. */
+ /** The tag for the main output of FHIR resources. */
public static final TupleTag<String> OUT = new TupleTag<String>() {};
- /** The tag for the deadletter output of Fhir Messages. */
+ /** The tag for the deadletter output of FHIR resources. */
public static final TupleTag<HealthcareIOError<String>> DEAD_LETTER =
new TupleTag<HealthcareIOError<String>>() {};
@Override
public FhirIO.Read.Result expand(PCollection<String> input) {
- return input.apply("Fetch Fhir messages", new FetchResourceJsonString());
+ return input.apply("Read FHIR Resources", new GetResourceJsonString());
}
/**
- * DoFn to fetch a resource from an Google Cloud Healthcare FHIR store
based on resourceID
+ * DoFn to fetch a resource from an Google Cloud Healthcare FHIR store
based on resource path.
*
* <p>This DoFn consumes a {@link PCollection} of notifications {@link
String}s from the FHIR
- * store, and fetches the actual {@link String} object based on the id in
the notification and
+ * store, and fetches the actual {@link String} object based on the path
in the notification and
* will output a {@link PCollectionTuple} which contains the output and
dead-letter {@link
* PCollection}*.
*
@@ -519,15 +538,14 @@ public class FhirIO {
* <li>{@link FhirIO.Read#OUT} - Contains all {@link PCollection}
records successfully read
* from the Fhir store.
* <li>{@link FhirIO.Read#DEAD_LETTER} - Contains all {@link
PCollection} of {@link
- * HealthcareIOError}* of message IDs which failed to be fetched
from the Fhir store, with
+ * HealthcareIOError}* of resources which failed to be fetched from
the FHIR store, with
* error message and stacktrace.
* </ul>
*/
- static class FetchResourceJsonString
- extends PTransform<PCollection<String>, FhirIO.Read.Result> {
+ static class GetResourceJsonString extends PTransform<PCollection<String>,
FhirIO.Read.Result> {
- /** Instantiates a new Fetch Fhir message DoFn. */
- public FetchResourceJsonString() {}
+ /** Instantiates a new Get FHIR resource DoFn. */
+ public GetResourceJsonString() {}
@Override
public FhirIO.Read.Result expand(PCollection<String> resourceIds) {
@@ -537,7 +555,7 @@ public class FhirIO {
.withOutputTags(FhirIO.Read.OUT,
TupleTagList.of(FhirIO.Read.DEAD_LETTER))));
}
- /** DoFn for fetching messages from the Fhir store with error handling.
*/
+ /** DoFn for getting resources from the FHIR store with error handling.
*/
static class ReadResourceFn extends DoFn<String, String> {
private static final Logger LOG =
LoggerFactory.getLogger(ReadResourceFn.class);
@@ -553,7 +571,7 @@ public class FhirIO {
private HealthcareApiClient client;
private ObjectMapper mapper;
- /** Instantiates a new Hl 7 v 2 message get fn. */
+ /** Instantiates a new get FHIR resource fn. */
ReadResourceFn() {}
/**
@@ -581,22 +599,22 @@ public class FhirIO {
READ_RESOURCE_ERRORS.inc();
LOG.warn(
String.format(
- "Error fetching Fhir message with ID %s writing to Dead
Letter "
+ "Error fetching Fhir resource with ID %s writing to Dead
Letter "
+ "Queue. Cause: %s Stack Trace: %s",
resourceId, e.getMessage(),
Throwables.getStackTraceAsString(e)));
context.output(FhirIO.Read.DEAD_LETTER,
HealthcareIOError.of(resourceId, e));
}
}
- private String fetchResource(HealthcareApiClient client, String
resourceId)
+ private String fetchResource(HealthcareApiClient client, String
resourceName)
throws IOException, IllegalArgumentException {
long startTime = Instant.now().toEpochMilli();
- HttpBody resource = client.readFhirResource(resourceId);
+ HttpBody resource = client.readFhirResource(resourceName);
READ_RESOURCE_LATENCY_MS.update(Instant.now().toEpochMilli() -
startTime);
if (resource == null) {
- throw new IOException(String.format("GET request for %s returned
null", resourceId));
+ throw new IOException(String.format("GET request for %s returned
null", resourceName));
}
READ_RESOURCE_SUCCESS.inc();
return mapper.writeValueAsString(resource);
@@ -823,10 +841,10 @@ public class FhirIO {
}
/**
- * Create Method creates a single FHIR resource. @see <a
- *
href=https://cloud.google.com/healthcare/docs/reference/rest/v1/projects.locations.datasets.fhirStores.fhir/create></a>
+ * Import method for batch writing resources. @see <a
+ *
href=https://cloud.google.com/healthcare-api/docs/reference/rest/v1/projects.locations.datasets.fhirStores/import></a>
*
- * @param fhirStore the hl 7 v 2 store
+ * @param fhirStore the FHIR store
* @param gcsTempPath the gcs temp path
* @param gcsDeadLetterPath the gcs dead letter path
* @param contentStructure the content structure
@@ -873,7 +891,8 @@ public class FhirIO {
}
/**
- * Execute Bundle Method executes a batch of requests as a single
transaction @see <a
+ * Execute Bundle Method executes a batch of requests in batch or as a
single transaction @see
+ * <a
*
href=https://cloud.google.com/healthcare/docs/reference/rest/v1/projects.locations.datasets.fhirStores.fhir/executeBundle></a>.
*
* @param fhirStore the fhir store
@@ -887,7 +906,9 @@ public class FhirIO {
}
/**
- * Execute bundles write.
+ * Execute Bundle Method executes a batch of requests in batch or as a
single transaction @see
+ * <a
+ *
href=https://cloud.google.com/healthcare/docs/reference/rest/v1/projects.locations.datasets.fhirStores.fhir/executeBundle></a>.
*
* @param fhirStore the fhir store
* @return the write
@@ -921,15 +942,8 @@ public class FhirIO {
return input.apply(new Import(getFhirStore(), tempPath, deadPath,
contentStructure));
case EXECUTE_BUNDLE:
default:
- bundles =
- input.apply(
- "Execute FHIR Bundles",
- ParDo.of(new
ExecuteBundles.ExecuteBundlesFn(this.getFhirStore()))
- .withOutputTags(SUCCESSFUL_BODY,
TupleTagList.of(FAILED_BODY)));
- bundles.get(SUCCESSFUL_BODY).setCoder(StringUtf8Coder.of());
-
bundles.get(FAILED_BODY).setCoder(HealthcareIOErrorCoder.of(StringUtf8Coder.of()));
+ return input.apply(new ExecuteBundles(this.getFhirStore()));
}
- return Result.in(input.getPipeline(), bundles);
}
}
@@ -1159,7 +1173,7 @@ public class FhirIO {
String resource =
String.format(
"Failed to parse payload: %s as json at: %s : %s."
- + "Dropping message from batch import.",
+ + "Dropping resource from batch import.",
httpBody, e.getLocation().getCharOffset(), e.getMessage());
LOG.warn(resource);
context.output(
@@ -1326,7 +1340,7 @@ public class FhirIO {
}
/** The type Execute bundles. */
- public static class ExecuteBundles extends PTransform<PCollection<String>,
Write.Result> {
+ public static class ExecuteBundles extends Write {
private final ValueProvider<String> fhirStore;
@@ -1339,13 +1353,29 @@ public class FhirIO {
this.fhirStore = fhirStore;
}
- /**
- * Instantiates a new Execute bundles.
- *
- * @param fhirStore the fhir store
- */
- ExecuteBundles(String fhirStore) {
- this.fhirStore = StaticValueProvider.of(fhirStore);
+ @Override
+ ValueProvider<String> getFhirStore() {
+ return fhirStore;
+ }
+
+ @Override
+ WriteMethod getWriteMethod() {
+ return WriteMethod.EXECUTE_BUNDLE;
+ }
+
+ @Override
+ Optional<ContentStructure> getContentStructure() {
+ return Optional.empty();
+ }
+
+ @Override
+ Optional<ValueProvider<String>> getImportGcsTempPath() {
+ return Optional.empty();
+ }
+
+ @Override
+ Optional<ValueProvider<String>> getImportGcsDeadLetterPath() {
+ return Optional.empty();
}
@Override
@@ -1666,16 +1696,20 @@ public class FhirIO {
String transformName, PInput input, PTransform<?, ?> transform) {}
}
- /** The tag for the main output of Fhir Messages. */
+ /** The tag for the main output of FHIR Resources from a search. */
public static final TupleTag<KV<String, JsonArray>> OUT =
new TupleTag<KV<String, JsonArray>>() {};
- /** The tag for the deadletter output of Fhir Messages. */
+ /** The tag for the deadletter output of FHIR Resources. */
public static final TupleTag<HealthcareIOError<String>> DEAD_LETTER =
new TupleTag<HealthcareIOError<String>>() {};
@Override
public FhirIO.Search.Result expand(PCollection<FhirSearchParameter<T>>
input) {
- return input.apply("Fetch Fhir messages", new
SearchResourcesJsonString(this.fhirStore));
+ PCollectionTuple results =
+ input.apply(
+ ParDo.of(new SearchResourcesFn(this.fhirStore))
+ .withOutputTags(FhirIO.Search.OUT,
TupleTagList.of(FhirIO.Search.DEAD_LETTER)));
+ return FhirIO.Search.Result.of(results);
}
/**
@@ -1696,108 +1730,81 @@ public class FhirIO {
* stacktrace.
* </ul>
*/
- class SearchResourcesJsonString
- extends PTransform<PCollection<FhirSearchParameter<T>>,
FhirIO.Search.Result> {
+ class SearchResourcesFn extends DoFn<FhirSearchParameter<T>, KV<String,
JsonArray>> {
+ private final Counter searchResourcesErrorCount =
+ Metrics.counter(
+ SearchResourcesFn.class, BASE_METRIC_PREFIX +
"search_resource_error_count");
+ private final Counter searchResourcesSuccessCount =
+ Metrics.counter(
+ SearchResourcesFn.class, BASE_METRIC_PREFIX +
"search_resource_success_count");
+ private final Distribution searchResourcesLatencyMs =
+ Metrics.distribution(
+ SearchResourcesFn.class, BASE_METRIC_PREFIX +
"search_resource_latency_ms");
+ private final Logger log =
LoggerFactory.getLogger(SearchResourcesFn.class);
+
+ private HealthcareApiClient client;
private final ValueProvider<String> fhirStore;
- public SearchResourcesJsonString(ValueProvider<String> fhirStore) {
+ /** Instantiates a new FHIR resources search fn. */
+ SearchResourcesFn(ValueProvider<String> fhirStore) {
this.fhirStore = fhirStore;
}
- @Override
- public FhirIO.Search.Result expand(PCollection<FhirSearchParameter<T>>
resourceIds) {
- return new FhirIO.Search.Result(
- resourceIds.apply(
- ParDo.of(new SearchResourcesFn(this.fhirStore))
- .withOutputTags(
- FhirIO.Search.OUT,
TupleTagList.of(FhirIO.Search.DEAD_LETTER))));
+ /**
+ * Instantiate healthcare client.
+ *
+ * @throws IOException the io exception
+ */
+ @Setup
+ public void instantiateHealthcareClient() throws IOException {
+ this.client = new HttpHealthcareApiClient();
}
- /** DoFn for searching messages from the Fhir store with error handling.
*/
- class SearchResourcesFn extends DoFn<FhirSearchParameter<T>, KV<String,
JsonArray>> {
-
- private final Counter searchResourceErrors =
- Metrics.counter(
- SearchResourcesFn.class, BASE_METRIC_PREFIX +
"search_resource_error_count");
- private final Counter searchResourceSuccess =
- Metrics.counter(
- SearchResourcesFn.class, BASE_METRIC_PREFIX +
"search_resource_success_count");
- private final Distribution searchResourceLatencyMs =
- Metrics.distribution(
- SearchResourcesFn.class, BASE_METRIC_PREFIX +
"search_resource_latency_ms");
-
- private final Logger log =
LoggerFactory.getLogger(SearchResourcesFn.class);
- private HealthcareApiClient client;
- private final ValueProvider<String> fhirStore;
-
- /** Instantiates a new Fhir resources search fn. */
- SearchResourcesFn(ValueProvider<String> fhirStore) {
- this.fhirStore = fhirStore;
+ /**
+ * Process element.
+ *
+ * @param context the context
+ */
+ @ProcessElement
+ public void processElement(ProcessContext context) {
+ FhirSearchParameter<T> fhirSearchParameters = context.element();
+ try {
+ context.output(
+ KV.of(
+ fhirSearchParameters.getKey(),
+ searchResources(
+ fhirSearchParameters.getResourceType(),
fhirSearchParameters.getQueries())));
+ } catch (IllegalArgumentException | NoSuchElementException e) {
+ searchResourcesErrorCount.inc();
+ log.warn(
+ String.format(
+ "Error search FHIR resources writing to Dead Letter "
+ + "Queue. Cause: %s Stack Trace: %s",
+ e.getMessage(), Throwables.getStackTraceAsString(e)));
+ context.output(
+ FhirIO.Search.DEAD_LETTER,
HealthcareIOError.of(fhirSearchParameters.toString(), e));
}
+ }
- /**
- * Instantiate healthcare client.
- *
- * @throws IOException the io exception
- */
- @Setup
- public void instantiateHealthcareClient() throws IOException {
- this.client = new HttpHealthcareApiClient();
- }
+ private JsonArray searchResources(String resourceType, @Nullable
Map<String, T> parameters)
+ throws NoSuchElementException {
+ long start = Instant.now().toEpochMilli();
- /**
- * Process element.
- *
- * @param context the context
- */
- @ProcessElement
- public void processElement(ProcessContext context) {
- FhirSearchParameter<T> fhirSearchParameters = context.element();
- try {
- context.output(
- KV.of(
- fhirSearchParameters.getKey(),
- searchResources(
- this.client,
- this.fhirStore.toString(),
- fhirSearchParameters.getResourceType(),
- fhirSearchParameters.getQueries())));
- } catch (IllegalArgumentException | NoSuchElementException e) {
- searchResourceErrors.inc();
- log.warn(
- String.format(
- "Error search FHIR messages writing to Dead Letter "
- + "Queue. Cause: %s Stack Trace: %s",
- e.getMessage(), Throwables.getStackTraceAsString(e)));
- context.output(
- FhirIO.Search.DEAD_LETTER,
HealthcareIOError.of(this.fhirStore.toString(), e));
- }
+ HashMap<String, Object> parameterObjects = new HashMap<>();
+ if (parameters != null) {
+ parameters.forEach(parameterObjects::put);
}
-
- private JsonArray searchResources(
- HealthcareApiClient client,
- String fhirStore,
- String resourceType,
- @Nullable Map<String, T> parameters)
- throws NoSuchElementException {
- long start = Instant.now().toEpochMilli();
-
- HashMap<String, Object> parameterObjects = new HashMap<>();
- if (parameters != null) {
- parameters.forEach(parameterObjects::put);
- }
- HttpHealthcareApiClient.FhirResourcePages.FhirResourcePagesIterator
iter =
- new
HttpHealthcareApiClient.FhirResourcePages.FhirResourcePagesIterator(
- client, fhirStore, resourceType, parameterObjects);
- JsonArray result = new JsonArray();
- while (iter.hasNext()) {
- result.addAll(iter.next());
- }
-
searchResourceLatencyMs.update(java.time.Instant.now().toEpochMilli() - start);
- searchResourceSuccess.inc();
- return result;
+ FhirResourcePagesIterator iter =
+ FhirResourcePagesIterator.ofSearch(
+ client, fhirStore.toString(), resourceType, parameterObjects);
+ JsonArray result = new JsonArray();
+ while (iter.hasNext()) {
+ result.addAll(iter.next());
}
+ searchResourcesLatencyMs.update(java.time.Instant.now().toEpochMilli()
- start);
+ searchResourcesSuccessCount.inc();
+ return result;
}
}
}
diff --git
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/FhirIOPatientEverything.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/FhirIOPatientEverything.java
new file mode 100644
index 0000000..8d91536
--- /dev/null
+++
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/FhirIOPatientEverything.java
@@ -0,0 +1,237 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.healthcare;
+
+import static org.apache.beam.sdk.io.gcp.healthcare.FhirIO.BASE_METRIC_PREFIX;
+
+import com.google.auto.value.AutoValue;
+import com.google.gson.JsonArray;
+import java.io.IOException;
+import java.io.Serializable;
+import java.time.Instant;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import
org.apache.beam.sdk.io.gcp.healthcare.FhirIOPatientEverything.PatientEverythingParameter;
+import
org.apache.beam.sdk.io.gcp.healthcare.HttpHealthcareApiClient.FhirResourcePagesIterator;
+import org.apache.beam.sdk.metrics.Counter;
+import org.apache.beam.sdk.metrics.Distribution;
+import org.apache.beam.sdk.metrics.Metrics;
+import org.apache.beam.sdk.schemas.AutoValueSchema;
+import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionTuple;
+import org.apache.beam.sdk.values.PInput;
+import org.apache.beam.sdk.values.POutput;
+import org.apache.beam.sdk.values.PValue;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.sdk.values.TupleTagList;
+import
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Throwables;
+import
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
+import org.checkerframework.checker.nullness.qual.Nullable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** The type FhirIOPatientEverything for querying a FHIR Patient resource's
compartment. * */
+public class FhirIOPatientEverything
+ extends PTransform<PCollection<PatientEverythingParameter>,
FhirIOPatientEverything.Result> {
+
+ /** The tag for the main output of FHIR Resources from a
GetPatientEverything request. */
+ public static final TupleTag<JsonArray> OUT = new TupleTag<JsonArray>() {};
+ /** The tag for the deadletter output of FHIR Resources from a
GetPatientEverything request. */
+ public static final TupleTag<HealthcareIOError<String>> DEAD_LETTER =
+ new TupleTag<HealthcareIOError<String>>() {};
+
+ /**
+ * PatientEverythingParameter defines required attributes for a FHIR
GetPatientEverything request
+ * in {@link FhirIOPatientEverything}. *
+ */
+ @DefaultSchema(AutoValueSchema.class)
+ @AutoValue
+ public abstract static class PatientEverythingParameter implements
Serializable {
+
+ /**
+ * FHIR Patient resource name in the format of
+ *
projects/{p}/locations/{l}/datasets/{d}/fhirStores/{f}/fhir/{resourceType}/{id}.
+ */
+ abstract String getResourceName();
+ /** Optional filters for the request, eg. start, end, _type, _since,
_count */
+ abstract @Nullable Map<String, String> getFilters();
+
+ static Builder builder() {
+ return new
AutoValue_FhirIOPatientEverything_PatientEverythingParameter.Builder();
+ }
+
+ @AutoValue.Builder
+ abstract static class Builder {
+ abstract Builder setResourceName(String resourceName);
+
+ abstract Builder setFilters(Map<String, String> query);
+
+ abstract PatientEverythingParameter build();
+ }
+ }
+
+ /** The Result for a {@link FhirIOPatientEverything} request. */
+ public static class Result implements POutput, PInput {
+
+ private final PCollection<JsonArray> patientCompartments;
+ private final PCollection<HealthcareIOError<String>> failedReads;
+
+ PCollectionTuple pct;
+
+ /**
+ * Create FhirIOPatientEverything.Result form PCollectionTuple with OUT
and DEAD_LETTER tags.
+ *
+ * @param pct the pct
+ * @return the patient everything result
+ * @throws IllegalArgumentException the illegal argument exception
+ */
+ static Result of(PCollectionTuple pct) throws IllegalArgumentException {
+ if (pct.has(OUT) && pct.has(DEAD_LETTER)) {
+ return new Result(pct);
+ } else {
+ throw new IllegalArgumentException(
+ "The PCollection tuple must have the FhirIOPatientEverything.OUT "
+ + "and FhirIOPatientEverything.DEAD_LETTER tuple tags");
+ }
+ }
+
+ private Result(PCollectionTuple pct) {
+ this.pct = pct;
+ this.patientCompartments = pct.get(OUT).setCoder(JsonArrayCoder.of());
+ this.failedReads =
+
pct.get(DEAD_LETTER).setCoder(HealthcareIOErrorCoder.of(StringUtf8Coder.of()));
+ }
+
+ /**
+ * Gets failed reads.
+ *
+ * @return the failed reads
+ */
+ public PCollection<HealthcareIOError<String>> getFailedReads() {
+ return failedReads;
+ }
+
+ /**
+ * Gets the patient compartment responses for GetPatientEverything
requests.
+ *
+ * @return the read patient compartments
+ */
+ public PCollection<JsonArray> getPatientCompartments() {
+ return patientCompartments;
+ }
+
+ @Override
+ public Pipeline getPipeline() {
+ return this.pct.getPipeline();
+ }
+
+ @Override
+ public Map<TupleTag<?>, PValue> expand() {
+ return ImmutableMap.of(OUT, patientCompartments, DEAD_LETTER,
failedReads);
+ }
+
+ @Override
+ public void finishSpecifyingOutput(
+ String transformName, PInput input, PTransform<?, ?> transform) {}
+ }
+
+ @Override
+ public Result expand(PCollection<PatientEverythingParameter> input) {
+ PCollectionTuple results =
+ input.apply(
+ "GetPatientEverything",
+ ParDo.of(new GetPatientEverythingFn())
+ .withOutputTags(OUT, TupleTagList.of(DEAD_LETTER)));
+ return new Result(results);
+ }
+
+ /** GetPatientEverythingFn executes a GetPatientEverything request. */
+ static class GetPatientEverythingFn extends DoFn<PatientEverythingParameter,
JsonArray> {
+
+ private static final Counter GET_PATIENT_EVERYTHING_ERROR_COUNT =
+ Metrics.counter(
+ GetPatientEverythingFn.class,
+ BASE_METRIC_PREFIX + "get_patient_everything_error_count");
+ private static final Counter GET_PATIENT_EVERYTHING_SUCCESS_COUNT =
+ Metrics.counter(
+ GetPatientEverythingFn.class,
+ BASE_METRIC_PREFIX + "get_patient_everything_success_count");
+ private static final Distribution GET_PATIENT_EVERYTHING_LATENCY_MS =
+ Metrics.distribution(
+ GetPatientEverythingFn.class, BASE_METRIC_PREFIX +
"get_patient_everything_latency_ms");
+ private static final Logger LOG =
LoggerFactory.getLogger(GetPatientEverythingFn.class);
+
+ @SuppressWarnings("initialization.fields.uninitialized")
+ private HealthcareApiClient client;
+
+ /**
+ * Instantiate healthcare client.
+ *
+ * @throws IOException the io exception
+ */
+ @Setup
+ public void instantiateHealthcareClient() throws IOException {
+ this.client = new HttpHealthcareApiClient();
+ }
+
+ @ProcessElement
+ public void processElement(ProcessContext context) {
+ PatientEverythingParameter patientEverythingParameter =
context.element();
+ try {
+ context.output(
+ getPatientEverything(
+ patientEverythingParameter.getResourceName(),
+ patientEverythingParameter.getFilters()));
+ } catch (IllegalArgumentException | NoSuchElementException e) {
+ GET_PATIENT_EVERYTHING_ERROR_COUNT.inc();
+ LOG.warn(
+ String.format(
+ "Error executing GetPatientEverything: FHIR resources writing
to Dead Letter "
+ + "Queue. Cause: %s Stack Trace: %s",
+ e.getMessage(), Throwables.getStackTraceAsString(e)));
+ context.output(DEAD_LETTER,
HealthcareIOError.of(patientEverythingParameter.toString(), e));
+ }
+ }
+
+ private JsonArray getPatientEverything(
+ String resourceName, @Nullable Map<String, String> filters) {
+ long start = Instant.now().toEpochMilli();
+
+ HashMap<String, Object> filterObjects = new HashMap<>();
+ if (filters != null) {
+ filterObjects.putAll(filters);
+ }
+ FhirResourcePagesIterator iter =
+ FhirResourcePagesIterator.ofPatientEverything(client, resourceName,
filterObjects);
+ JsonArray result = new JsonArray();
+ while (iter.hasNext()) {
+ result.addAll(iter.next());
+ }
+
GET_PATIENT_EVERYTHING_LATENCY_MS.update(java.time.Instant.now().toEpochMilli()
- start);
+ GET_PATIENT_EVERYTHING_SUCCESS_COUNT.inc();
+ return result;
+ }
+ }
+}
diff --git
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/FhirSearchParameter.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/FhirSearchParameter.java
index 6f80d86..87692d8 100644
---
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/FhirSearchParameter.java
+++
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/FhirSearchParameter.java
@@ -29,10 +29,17 @@ import org.checkerframework.checker.nullness.qual.Nullable;
@DefaultCoder(FhirSearchParameterCoder.class)
public class FhirSearchParameter<T> {
+ /** FHIR resource type. */
private final String resourceType;
- // The key is used as a key for the search query, if there is source
information to propagate
- // through the pipeline.
+ /**
+ * The key is used as a key for the search query, if there is source
information to propagate
+ * through the pipeline.
+ */
private final String key;
+ /**
+ * The search query. For an OR search, put both query values in a single
string. For an AND
+ * search, use a list.
+ */
private final @Nullable Map<String, T> queries;
private FhirSearchParameter(
@@ -46,11 +53,28 @@ public class FhirSearchParameter<T> {
this.queries = queries;
}
+ /**
+ * Creates a FhirSearchParameter of type T.
+ *
+ * @param resourceType the FHIR resource type.
+ * @param key Key for the search query to key the results with.
+ * @param queries The search query.
+ * @param <T> The type of the search query value.
+ * @return The FhirSearchParameter of type T.
+ */
public static <T> FhirSearchParameter<T> of(
String resourceType, @Nullable String key, @Nullable Map<String, T>
queries) {
return new FhirSearchParameter<>(resourceType, key, queries);
}
+ /**
+ * Creates a FhirSearchParameter of type T, without a key.
+ *
+ * @param resourceType the FHIR resource type.
+ * @param queries The search query.
+ * @param <T> The type of the search query value.
+ * @return The FhirSearchParameter of type T.
+ */
public static <T> FhirSearchParameter<T> of(
String resourceType, @Nullable Map<String, T> queries) {
return new FhirSearchParameter<>(resourceType, null, queries);
@@ -89,15 +113,8 @@ public class FhirSearchParameter<T> {
@Override
public String toString() {
- return "FhirSearchParameter{"
- + "resourceType='"
- + resourceType
- + '\''
- + ", key='"
- + key
- + '\''
- + ", queries="
- + queries
- + '}';
+ return String.format(
+ "FhirSearchParameter{resourceType='%s', key='%s', queries='%s'}'",
+ resourceType, key, queries);
}
}
diff --git
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/HealthcareApiClient.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/HealthcareApiClient.java
index 13b2d5c..afda939 100644
---
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/HealthcareApiClient.java
+++
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/HealthcareApiClient.java
@@ -164,11 +164,12 @@ public interface HealthcareApiClient {
/**
* Read fhir resource http body.
*
- * @param resourceId the resource
+ * @param resourceName the resource name, in format
+ *
projects/{p}/locations/{l}/datasets/{d}/fhirStores/{f}/fhir/{resourceType}/{id}
* @return the http body
* @throws IOException the io exception
*/
- HttpBody readFhirResource(String resourceId) throws IOException;
+ HttpBody readFhirResource(String resourceName) throws IOException;
/**
* Search fhir resource http body.
@@ -187,6 +188,19 @@ public interface HealthcareApiClient {
throws IOException;
/**
+ * Fhir get patient everythhing http body.
+ *
+ * @param resourceName the resource name, in format
+ *
projects/{p}/locations/{l}/datasets/{d}/fhirStores/{f}/fhir/{resourceType}/{id}
+ * @param filters optional request filters
+ * @return the http body
+ * @throws IOException
+ */
+ HttpBody getPatientEverything(
+ String resourceName, @Nullable Map<String, Object> filters, String
pageToken)
+ throws IOException;
+
+ /**
* Create hl 7 v 2 store hl 7 v 2 store.
*
* @param dataset the dataset
diff --git
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/HttpHealthcareApiClient.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/HttpHealthcareApiClient.java
index 0fdd721..13dfe28 100644
---
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/HttpHealthcareApiClient.java
+++
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/HttpHealthcareApiClient.java
@@ -25,6 +25,7 @@ import com.google.api.client.http.javanet.NetHttpTransport;
import com.google.api.client.json.JsonFactory;
import com.google.api.client.json.jackson2.JacksonFactory;
import com.google.api.services.healthcare.v1.CloudHealthcare;
+import
com.google.api.services.healthcare.v1.CloudHealthcare.Projects.Locations.Datasets.FhirStores.Fhir.PatientEverything;
import
com.google.api.services.healthcare.v1.CloudHealthcare.Projects.Locations.Datasets.FhirStores.Fhir.Search;
import
com.google.api.services.healthcare.v1.CloudHealthcare.Projects.Locations.Datasets.Hl7V2Stores.Messages;
import com.google.api.services.healthcare.v1.CloudHealthcareScopes;
@@ -629,8 +630,15 @@ public class HttpHealthcareApiClient implements
HealthcareApiClient, Serializabl
}
@Override
- public HttpBody readFhirResource(String resourceId) throws IOException {
- return
client.projects().locations().datasets().fhirStores().fhir().read(resourceId).execute();
+ public HttpBody readFhirResource(String resourceName) throws IOException {
+ return client
+ .projects()
+ .locations()
+ .datasets()
+ .fhirStores()
+ .fhir()
+ .read(resourceName)
+ .execute();
}
@Override
@@ -652,6 +660,27 @@ public class HttpHealthcareApiClient implements
HealthcareApiClient, Serializabl
return search.execute();
}
+ @Override
+ public HttpBody getPatientEverything(
+ String resourceName, @Nullable Map<String, Object> filters, String
pageToken)
+ throws IOException {
+ PatientEverything patientEverything =
+ client
+ .projects()
+ .locations()
+ .datasets()
+ .fhirStores()
+ .fhir()
+ .patientEverything(resourceName);
+ if (filters != null && !filters.isEmpty()) {
+ filters.forEach(patientEverything::set);
+ }
+ if (pageToken != null && !pageToken.isEmpty()) {
+ patientEverything.set("_page_token", URLDecoder.decode(pageToken,
"UTF-8"));
+ }
+ return patientEverything.execute();
+ }
+
public static class AuthenticatedRetryInitializer extends
RetryHttpRequestInitializer {
GoogleCredentials credentials;
@@ -862,147 +891,135 @@ public class HttpHealthcareApiClient implements
HealthcareApiClient, Serializabl
}
}
- public static class FhirResourcePages implements Iterable<JsonArray> {
+ /** The type FhirResourcePagesIterator for methods which return paged
output. */
+ public static class FhirResourcePagesIterator implements Iterator<JsonArray>
{
+
+ public enum FhirMethod {
+ SEARCH,
+ PATIENT_EVERYTHING
+ }
+ private final FhirMethod fhirMethod;
private final String fhirStore;
private final String resourceType;
+ private final String resourceName;
private final Map<String, Object> parameters;
- private transient HealthcareApiClient client;
- /**
- * Instantiates a new Fhir resource pages.
- *
- * @param client the client
- * @param fhirStore the Fhir store
- * @param resourceType the Fhir resource type to search for
- * @param parameters the search parameters
- */
- FhirResourcePages(
+ private final HealthcareApiClient client;
+ private final ObjectMapper mapper;
+ private String pageToken;
+ private boolean isFirstRequest;
+
+ private FhirResourcePagesIterator(
+ FhirMethod fhirMethod,
HealthcareApiClient client,
String fhirStore,
String resourceType,
+ String resourceName,
@Nullable Map<String, Object> parameters) {
+ this.fhirMethod = fhirMethod;
this.client = client;
this.fhirStore = fhirStore;
this.resourceType = resourceType;
+ this.resourceName = resourceName;
this.parameters = parameters;
+ this.pageToken = null;
+ this.isFirstRequest = true;
+ this.mapper = new ObjectMapper();
}
/**
- * Make search request.
+ * Instantiates a new search FHIR resource pages iterator.
*
* @param client the client
* @param fhirStore the Fhir store
* @param resourceType the Fhir resource type to search for
- * @param parameters the search parameters
- * @param pageToken the page token
- * @return the search response
- * @throws IOException the io exception
+ * @param parameters the query parameters
*/
- public static HttpBody makeSearchRequest(
+ public static FhirResourcePagesIterator ofSearch(
HealthcareApiClient client,
String fhirStore,
String resourceType,
- @Nullable Map<String, Object> parameters,
- String pageToken)
- throws IOException {
- return client.searchFhirResource(fhirStore, resourceType, parameters,
pageToken);
+ @Nullable Map<String, Object> parameters) {
+ return new FhirResourcePagesIterator(
+ FhirMethod.SEARCH, client, fhirStore, resourceType, "", parameters);
}
- @Override
- public Iterator<JsonArray> iterator() {
+ /**
+ * Instantiates a new GetPatientEverything FHIR resource pages iterator.
+ *
+ * @param client the client
+ * @param resourceName the FHIR resource name
+ * @param parameters the filter parameters
+ */
+ public static FhirResourcePagesIterator ofPatientEverything(
+ HealthcareApiClient client, String resourceName, @Nullable Map<String,
Object> parameters) {
return new FhirResourcePagesIterator(
- this.client, this.fhirStore, this.resourceType, this.parameters);
+ FhirMethod.PATIENT_EVERYTHING, client, "", "", resourceName,
parameters);
}
- /** The type Fhir resource pages iterator. */
- public static class FhirResourcePagesIterator implements
Iterator<JsonArray> {
-
- private final String fhirStore;
- private final String resourceType;
- private final Map<String, Object> parameters;
- private HealthcareApiClient client;
- private String pageToken;
- private boolean isFirstRequest;
- private ObjectMapper mapper;
-
- /**
- * Instantiates a new Fhir resource pages iterator.
- *
- * @param client the client
- * @param fhirStore the Fhir store
- * @param resourceType the Fhir resource type to search for
- * @param parameters the search parameters
- */
- FhirResourcePagesIterator(
- HealthcareApiClient client,
- String fhirStore,
- String resourceType,
- @Nullable Map<String, Object> parameters) {
- this.client = client;
- this.fhirStore = fhirStore;
- this.resourceType = resourceType;
- this.parameters = parameters;
- this.pageToken = null;
- this.isFirstRequest = true;
- this.mapper = new ObjectMapper();
+ @Override
+ public boolean hasNext() throws NoSuchElementException {
+ if (!isFirstRequest) {
+ return this.pageToken != null && !this.pageToken.isEmpty();
}
+ try {
+ HttpBody response = executeFhirRequest();
+ JsonObject jsonResponse =
+
JsonParser.parseString(mapper.writeValueAsString(response)).getAsJsonObject();
+ JsonArray resources = jsonResponse.getAsJsonArray("entry");
+ return resources != null && resources.size() != 0;
+ } catch (IOException e) {
+ throw new NoSuchElementException(
+ String.format(
+ "Failed to list first page of FHIR resources from %s: %s",
+ fhirStore, e.getMessage()));
+ }
+ }
- @Override
- public boolean hasNext() throws NoSuchElementException {
- if (!isFirstRequest) {
- return this.pageToken != null && !this.pageToken.isEmpty();
- }
- try {
- HttpBody response =
- makeSearchRequest(client, fhirStore, resourceType, parameters,
this.pageToken);
- JsonObject jsonResponse =
-
JsonParser.parseString(mapper.writeValueAsString(response)).getAsJsonObject();
- JsonArray resources = jsonResponse.getAsJsonArray("entry");
- return resources != null && resources.size() != 0;
- } catch (IOException e) {
- throw new NoSuchElementException(
- String.format(
- "Failed to list first page of Fhir resources from %s: %s",
- fhirStore, e.getMessage()));
- }
+ @Override
+ public JsonArray next() throws NoSuchElementException {
+ try {
+ HttpBody response = executeFhirRequest();
+ this.isFirstRequest = false;
+ JsonObject jsonResponse =
+
JsonParser.parseString(mapper.writeValueAsString(response)).getAsJsonObject();
+ JsonArray links = jsonResponse.getAsJsonArray("link");
+ this.pageToken = parsePageToken(links);
+ JsonArray resources = jsonResponse.getAsJsonArray("entry");
+ return resources;
+ } catch (IOException e) {
+ this.pageToken = null;
+ throw new NoSuchElementException(
+ String.format("Error listing FHIR resources from %s: %s",
fhirStore, e.getMessage()));
}
+ }
- @Override
- public JsonArray next() throws NoSuchElementException {
- try {
- HttpBody response =
- makeSearchRequest(client, fhirStore, resourceType, parameters,
this.pageToken);
- this.isFirstRequest = false;
- JsonObject jsonResponse =
-
JsonParser.parseString(mapper.writeValueAsString(response)).getAsJsonObject();
- JsonArray links = jsonResponse.getAsJsonArray("link");
- this.pageToken = parsePageToken(links);
- JsonArray resources = jsonResponse.getAsJsonArray("entry");
- return resources;
- } catch (IOException e) {
- this.pageToken = null;
- throw new NoSuchElementException(
- String.format("Error listing Fhir resources from %s: %s",
fhirStore, e.getMessage()));
- }
+ private HttpBody executeFhirRequest() throws IOException {
+ switch (fhirMethod) {
+ case PATIENT_EVERYTHING:
+ return client.getPatientEverything(resourceName, parameters,
pageToken);
+ case SEARCH:
+ default:
+ return client.searchFhirResource(fhirStore, resourceType,
parameters, pageToken);
}
+ }
- private static String parsePageToken(JsonArray links) throws
MalformedURLException {
- for (JsonElement e : links) {
- JsonObject link = e.getAsJsonObject();
- if (link.get("relation").getAsString().equalsIgnoreCase("next")) {
- URL url = new URL(link.get("url").getAsString());
- List<String> parameters =
Splitter.on("&").splitToList(url.getQuery());
- for (String parameter : parameters) {
- List<String> parts =
Splitter.on("=").limit(2).splitToList(parameter);
- if (parts.get(0).equalsIgnoreCase("_page_token")) {
- return parts.get(1);
- }
+ private static String parsePageToken(JsonArray links) throws
MalformedURLException {
+ for (JsonElement e : links) {
+ JsonObject link = e.getAsJsonObject();
+ if (link.get("relation").getAsString().equalsIgnoreCase("next")) {
+ URL url = new URL(link.get("url").getAsString());
+ List<String> parameters =
Splitter.on("&").splitToList(url.getQuery());
+ for (String parameter : parameters) {
+ List<String> parts =
Splitter.on("=").limit(2).splitToList(parameter);
+ if (parts.get(0).equalsIgnoreCase("_page_token")) {
+ return parts.get(1);
}
}
}
- return "";
}
+ return "";
}
}
}
diff --git
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/healthcare/FhirIOPatientEverythingIT.java
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/healthcare/FhirIOPatientEverythingIT.java
new file mode 100644
index 0000000..323b4ad
--- /dev/null
+++
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/healthcare/FhirIOPatientEverythingIT.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.healthcare;
+
+import static
org.apache.beam.sdk.io.gcp.healthcare.HL7v2IOTestUtil.HEALTHCARE_DATASET_TEMPLATE;
+import static org.junit.Assert.assertNotEquals;
+
+import com.google.gson.JsonArray;
+import java.io.IOException;
+import java.security.SecureRandom;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import org.apache.beam.runners.direct.DirectOptions;
+import
org.apache.beam.sdk.io.gcp.healthcare.FhirIOPatientEverything.PatientEverythingParameter;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.values.PCollection;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+
+@RunWith(Parameterized.class)
+public class FhirIOPatientEverythingIT {
+
+ public String version;
+ private final String project;
+ private transient HealthcareApiClient client;
+ private static String healthcareDataset;
+ private static final String BASE_STORE_ID =
+ "FHIR_store_patient_everything_it_"
+ + System.currentTimeMillis()
+ + "_"
+ + new SecureRandom().nextInt(32);
+ private String fhirStoreId;
+
+ private List<PatientEverythingParameter> input = new ArrayList<>();
+
+ @Parameters(name = "{0}")
+ public static Collection<String> versions() {
+ return Arrays.asList("R4");
+ }
+
+ @Rule public transient TestPipeline pipeline = TestPipeline.create();
+
+ public FhirIOPatientEverythingIT(String version) {
+ this.version = version;
+ this.fhirStoreId = BASE_STORE_ID + version;
+ this.project =
+ TestPipeline.testingPipelineOptions()
+ .as(HealthcareStoreTestPipelineOptions.class)
+ .getStoreProjectId();
+ }
+
+ @Before
+ public void setup() throws Exception {
+ healthcareDataset = String.format(HEALTHCARE_DATASET_TEMPLATE, project);
+ if (client == null) {
+ this.client = new HttpHealthcareApiClient();
+ }
+ client.createFhirStore(healthcareDataset, fhirStoreId, version, "");
+
+ List<String> bundles = FhirIOTestUtil.BUNDLES.get(version);
+ List<String> prepopulatedResourceNames =
+ FhirIOTestUtil.executeFhirBundles(
+ client, healthcareDataset + "/fhirStores/" + fhirStoreId, bundles);
+
+ HashMap<String, String> filters = new HashMap<>();
+ // filters.put("_count", Integer.toString(50));
+
+ int requests = 0;
+ for (String resourceName : prepopulatedResourceNames) {
+ // Skip non-patient resource types.
+ if (!resourceName.contains("/fhir/Patient/")) {
+ continue;
+ }
+ input.add(
+ PatientEverythingParameter.builder()
+ .setResourceName(resourceName)
+ .setFilters(filters)
+ .build());
+
+ requests++;
+ if (requests > 50) {
+ break;
+ }
+ }
+ }
+
+ @After
+ public void teardown() throws IOException {
+ HealthcareApiClient client = new HttpHealthcareApiClient();
+ for (String version : versions()) {
+ client.deleteFhirStore(healthcareDataset + "/fhirStores/" +
BASE_STORE_ID + version);
+ }
+ }
+
+ @Test
+ public void testFhirIOPatientEverything() {
+ pipeline.getOptions().as(DirectOptions.class).setBlockOnRun(false);
+
+ PCollection<PatientEverythingParameter> everythingConfigs =
pipeline.apply(Create.of(input));
+ FhirIOPatientEverything.Result result =
everythingConfigs.apply(FhirIO.getPatientEverything());
+
+ // Verify that there are no failures.
+ PAssert.that(result.getFailedReads()).empty();
+ // Verify that none of the result resource sets are empty sets.
+ PCollection<JsonArray> resources = result.getPatientCompartments();
+ PAssert.that(resources)
+ .satisfies(
+ input -> {
+ for (JsonArray resource : input) {
+ assertNotEquals(0, resource.size());
+ }
+ return null;
+ });
+
+ pipeline.run().waitUntilFinish();
+ }
+}
diff --git
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/healthcare/FhirIOTest.java
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/healthcare/FhirIOTest.java
index 6326407..c85d1c4 100644
---
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/healthcare/FhirIOTest.java
+++
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/healthcare/FhirIOTest.java
@@ -22,6 +22,7 @@ import java.util.Collections;
import java.util.List;
import org.apache.beam.sdk.coders.ListCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
+import
org.apache.beam.sdk.io.gcp.healthcare.FhirIOPatientEverything.PatientEverythingParameter;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Count;
@@ -60,8 +61,7 @@ public class FhirIOTest {
@Test
public void test_FhirIO_failedSearches() {
- List<FhirSearchParameter<String>> input =
- Arrays.asList(FhirSearchParameter.of("resource-type-1", null));
+ FhirSearchParameter<String> input =
FhirSearchParameter.of("resource-type-1", null);
FhirIO.Search.Result searchResult =
pipeline
.apply(Create.of(input).withCoder(FhirSearchParameterCoder.of(StringUtf8Coder.of())))
@@ -73,7 +73,7 @@ public class FhirIOTest {
failed.apply(
MapElements.into(TypeDescriptors.strings()).via(HealthcareIOError::getDataResource));
- PAssert.that(failedMsgIds).containsInAnyOrder(Arrays.asList("bad-store"));
+ PAssert.that(failedMsgIds).containsInAnyOrder(input.toString());
PAssert.that(searchResult.getResources()).empty();
PAssert.that(searchResult.getKeyedResources()).empty();
pipeline.run();
@@ -81,8 +81,7 @@ public class FhirIOTest {
@Test
public void test_FhirIO_failedSearchesWithGenericParameters() {
- List<FhirSearchParameter<List<String>>> input =
- Arrays.asList(FhirSearchParameter.of("resource-type-1", null));
+ FhirSearchParameter<List<String>> input =
FhirSearchParameter.of("resource-type-1", null);
FhirIO.Search.Result searchResult =
pipeline
.apply(
@@ -98,7 +97,7 @@ public class FhirIOTest {
failed.apply(
MapElements.into(TypeDescriptors.strings()).via(HealthcareIOError::getDataResource));
- PAssert.that(failedMsgIds).containsInAnyOrder(Arrays.asList("bad-store"));
+ PAssert.that(failedMsgIds).containsInAnyOrder(input.toString());
PAssert.that(searchResult.getResources()).empty();
PAssert.that(searchResult.getKeyedResources()).empty();
pipeline.run();
@@ -131,5 +130,20 @@ public class FhirIOTest {
pipeline.run();
}
- private static final long NUM_ELEMENTS = 11;
+ @Test
+ public void test_FhirIO_failedPatientEverything() {
+ PatientEverythingParameter input =
+
PatientEverythingParameter.builder().setResourceName("bad-resource-name").build();
+ FhirIOPatientEverything.Result everythingResult =
+ pipeline.apply(Create.of(input)).apply(FhirIO.getPatientEverything());
+
+ PCollection<HealthcareIOError<String>> failed =
everythingResult.getFailedReads();
+ PCollection<String> failedEverything =
+ failed.apply(
+
MapElements.into(TypeDescriptors.strings()).via(HealthcareIOError::getDataResource));
+
+ PAssert.that(failedEverything).containsInAnyOrder(input.toString());
+ PAssert.that(everythingResult.getPatientCompartments()).empty();
+ pipeline.run();
+ }
}
diff --git
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/healthcare/FhirIOTestUtil.java
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/healthcare/FhirIOTestUtil.java
index c893f59..6f2a6ce 100644
---
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/healthcare/FhirIOTestUtil.java
+++
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/healthcare/FhirIOTestUtil.java
@@ -17,13 +17,18 @@
*/
package org.apache.beam.sdk.io.gcp.healthcare;
+import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.api.client.http.HttpHeaders;
import com.google.api.client.http.HttpRequestInitializer;
import com.google.api.client.http.javanet.NetHttpTransport;
import com.google.api.client.json.jackson2.JacksonFactory;
+import com.google.api.services.healthcare.v1.model.HttpBody;
import com.google.api.services.storage.Storage;
import com.google.api.services.storage.model.StorageObject;
import com.google.auth.oauth2.GoogleCredentials;
+import com.google.gson.JsonElement;
+import com.google.gson.JsonObject;
+import com.google.gson.JsonParser;
import java.io.File;
import java.io.IOException;
import java.net.URI;
@@ -41,6 +46,7 @@ import java.util.stream.Stream;
import
org.apache.beam.sdk.io.gcp.healthcare.HttpHealthcareApiClient.HealthcareHttpException;
class FhirIOTestUtil {
+ private static final ObjectMapper mapper = new ObjectMapper();
public static final String DEFAULT_TEMP_BUCKET =
"temp-storage-for-healthcare-io-tests";
private static Stream<String> readPrettyBundles(String version) {
@@ -82,15 +88,30 @@ class FhirIOTestUtil {
}
/** Populate the test resources into the FHIR store and returns a list of
resource IDs. */
- static void executeFhirBundles(HealthcareApiClient client, String fhirStore,
List<String> bundles)
+ static List<String> executeFhirBundles(
+ HealthcareApiClient client, String fhirStore, List<String> bundles)
throws IOException, HealthcareHttpException {
+ List<String> resourceNames = new ArrayList<>();
for (String bundle : bundles) {
- client.executeFhirBundle(fhirStore, bundle);
+ HttpBody resp = client.executeFhirBundle(fhirStore, bundle);
+
+ JsonObject jsonResponse =
JsonParser.parseString(resp.getData()).getAsJsonObject();
+ for (JsonElement entry : jsonResponse.getAsJsonArray("entry")) {
+ String location =
+ entry
+ .getAsJsonObject()
+ .getAsJsonObject("response")
+ .getAsJsonPrimitive("location")
+ .getAsString();
+ String resourceName =
+ location.substring(location.indexOf("project"),
location.indexOf("/_history"));
+ resourceNames.add(resourceName);
+ }
}
+ return resourceNames;
}
public static void tearDownTempBucket() throws IOException {
-
GoogleCredentials credentials = GoogleCredentials.getApplicationDefault();
HttpRequestInitializer requestInitializer =
request -> {