This is an automated email from the ASF dual-hosted git repository.
pabloem pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 5e44a13 Merge pull request #15873 from [BEAM-13181] Remove Sharding
from FhirIO.Import
5e44a13 is described below
commit 5e44a133ce69c95aa434536b73ae6f757c243cce
Author: Milena Bukal <[email protected]>
AuthorDate: Mon Nov 8 15:19:09 2021 -0500
Merge pull request #15873 from [BEAM-13181] Remove Sharding from
FhirIO.Import
* Test FhirIO improvements
* Remove batching from FhirIO.Import
* Minor cleanup
* Fix tmpGcsPath input
---
.../apache/beam/sdk/io/gcp/healthcare/FhirIO.java | 236 ++++++++++-----------
1 file changed, 108 insertions(+), 128 deletions(-)
diff --git
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/FhirIO.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/FhirIO.java
index a55cfb4..609bb61 100644
---
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/FhirIO.java
+++
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/FhirIO.java
@@ -40,12 +40,9 @@ import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Optional;
import java.util.UUID;
-import java.util.concurrent.ThreadLocalRandom;
import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.coders.IterableCoder;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
-import org.apache.beam.sdk.coders.TextualIntegerCoder;
import org.apache.beam.sdk.coders.VoidCoder;
import org.apache.beam.sdk.io.FileIO;
import org.apache.beam.sdk.io.FileSystems;
@@ -57,7 +54,6 @@ import org.apache.beam.sdk.io.fs.MatchResult.Status;
import org.apache.beam.sdk.io.fs.MoveOptions.StandardMoveOptions;
import org.apache.beam.sdk.io.fs.ResolveOptions.StandardResolveOptions;
import org.apache.beam.sdk.io.fs.ResourceId;
-import org.apache.beam.sdk.io.fs.ResourceIdCoder;
import
org.apache.beam.sdk.io.gcp.healthcare.HttpHealthcareApiClient.HealthcareHttpException;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
import org.apache.beam.sdk.metrics.Counter;
@@ -67,12 +63,10 @@ import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.GroupIntoBatches;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.Wait;
-import org.apache.beam.sdk.transforms.WithKeys;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PBegin;
@@ -100,15 +94,15 @@ import org.slf4j.LoggerFactory;
*
* <h3>Reading</h3>
*
- * <p>FHIR resources can be read with {@link FhirIO.Read}, which supports use
cases where you have a
- * ${@link PCollection} of message IDs. This is appropriate for reading the
Fhir notifications from
- * a Pub/Sub subscription with {@link PubsubIO#readStrings()} or in cases
where you have a manually
- * prepared list of messages that you need to process (e.g. in a text file
read with {@link
+ * <p>FHIR resources can be read with {@link FhirIO.Read}, which supports use
cases where you have
+ * a ${@link PCollection} of message IDs. This is appropriate for reading the
Fhir notifications
+ * from a Pub/Sub subscription with {@link PubsubIO#readStrings()} or in cases
where you have a
+ * manually prepared list of messages that you need to process (e.g. in a text
file read with {@link
* org.apache.beam.sdk.io.TextIO}*) .
*
- * <p>Fetch Resource contents from Fhir Store based on the {@link PCollection}
of message ID strings
- * {@link FhirIO.Read.Result} where one can call {@link
Read.Result#getResources()} to retrieve a
- * {@link PCollection} containing the successfully fetched {@link String}s
and/or {@link
+ * <p>Fetch Resource contents from Fhir Store based on the {@link PCollection}
of message ID
+ * strings {@link FhirIO.Read.Result} where one can call {@link
Read.Result#getResources()} to
+ * retrieve a {@link PCollection} containing the successfully fetched {@link
String}s and/or {@link
* FhirIO.Read.Result#getFailedReads()}* to retrieve a {@link PCollection} of
{@link
* HealthcareIOError}* containing the resource ID that could not be fetched
and the exception as a
* {@link HealthcareIOError}, this can be used to write to the dead letter
storage system of your
@@ -119,8 +113,8 @@ import org.slf4j.LoggerFactory;
*
* <p>Write Resources can be written to FHIR with two different methods:
Import or Execute Bundle.
*
- * <p>Execute Bundle This is best for use cases where you are writing to a
non-empty FHIR store with
- * other clients or otherwise need referential integrity (e.g. A Streaming
HL7v2 to FHIR ETL
+ * <p>Execute Bundle This is best for use cases where you are writing to a
non-empty FHIR store
+ * with other clients or otherwise need referential integrity (e.g. A
Streaming HL7v2 to FHIR ETL
* pipeline).
*
* <p>Import This is best for use cases where you are populating an empty FHIR
store with no other
@@ -133,34 +127,28 @@ import org.slf4j.LoggerFactory;
* resources are in ndjson (newline delimited json) of FHIR resources. It is
important that when
* using export you give the appropriate permissions to the Google Cloud
Healthcare Service Agent.
*
- * <p>Deidentify This is to de-identify FHIR resources from a source FHIR
store and write the result
- * to a destination FHIR store. It is important that the destination store
must already exist.
+ * <p>Deidentify This is to de-identify FHIR resources from a source FHIR
store and write the
+ * result to a destination FHIR store. It is important that the destination
store must already
+ * exist.
*
* <p>Search This is to search FHIR resources within a given FHIR store. The
inputs are individual
* FHIR Search queries, represented by the FhirSearchParameter class. The
outputs are results of
* each Search, represented as a Json array of FHIR resources in string form,
with pagination
* handled, and an optional input key.
*
- * @see <a
- *
href=>https://cloud.google.com/healthcare/docs/reference/rest/v1/projects.locations.datasets.fhirStores.fhir/executeBundle></a>
- * @see <a
- *
href=>https://cloud.google.com/healthcare/docs/how-tos/permissions-healthcare-api-gcp-products#fhir_store_cloud_storage_permissions></a>
- * @see <a
- *
href=>https://cloud.google.com/healthcare/docs/reference/rest/v1/projects.locations.datasets.fhirStores/import></a>
- * @see <a
- *
href=>https://cloud.google.com/healthcare/docs/reference/rest/v1/projects.locations.datasets.fhirStores/export></a>
- * @see <a
- *
href=>https://cloud.google.com/healthcare/docs/reference/rest/v1/projects.locations.datasets.fhirStores/deidentify></a>
- * @see <a
- *
href=>https://cloud.google.com/healthcare/docs/reference/rest/v1/projects.locations.datasets.fhirStores/search></a>
- * A {@link PCollection} of {@link String} can be ingested into an Fhir
store using {@link
- * FhirIO.Write#fhirStoresImport(String, String, String,
FhirIO.Import.ContentStructure)} This
- * will return a {@link FhirIO.Write.Result} on which you can call {@link
- * FhirIO.Write.Result#getFailedBodies()} to retrieve a {@link
PCollection} of {@link
- * HealthcareIOError} containing the {@link String} that failed to be
ingested and the
- * exception.
- * <p>Example
- * <pre>{@code
+ * @see <a
href=>https://cloud.google.com/healthcare/docs/reference/rest/v1/projects.locations.datasets.fhirStores.fhir/executeBundle></a>
+ * @see <a
href=>https://cloud.google.com/healthcare/docs/how-tos/permissions-healthcare-api-gcp-products#fhir_store_cloud_storage_permissions></a>
+ * @see <a
href=>https://cloud.google.com/healthcare/docs/reference/rest/v1/projects.locations.datasets.fhirStores/import></a>
+ * @see <a
href=>https://cloud.google.com/healthcare/docs/reference/rest/v1/projects.locations.datasets.fhirStores/export></a>
+ * @see <a
href=>https://cloud.google.com/healthcare/docs/reference/rest/v1/projects.locations.datasets.fhirStores/deidentify></a>
+ * @see <a
href=>https://cloud.google.com/healthcare/docs/reference/rest/v1/projects.locations.datasets.fhirStores/search></a>
+ * A {@link PCollection} of {@link String} can be ingested into an Fhir store
using {@link
+ * FhirIO.Write#fhirStoresImport(String, String, String,
FhirIO.Import.ContentStructure)} This will
+ * return a {@link FhirIO.Write.Result} on which you can call {@link
+ * FhirIO.Write.Result#getFailedBodies()} to retrieve a {@link PCollection} of
{@link
+ * HealthcareIOError} containing the {@link String} that failed to be ingested
and the exception.
+ * <p>Example
+ * <pre>{@code
* Pipeline pipeline = ...
*
* // Tail the FHIR store by retrieving resources based on Pub/Sub
notifications.
@@ -240,6 +228,7 @@ import org.slf4j.LoggerFactory;
"nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-10402)
})
public class FhirIO {
+
private static final String BASE_METRIC_PREFIX = "fhirio/";
private static final String LRO_COUNTER_KEY = "counter";
private static final String LRO_SUCCESS_KEY = "success";
@@ -415,6 +404,7 @@ public class FhirIO {
/** The type Read. */
public static class Read extends PTransform<PCollection<String>,
FhirIO.Read.Result> {
+
private static final Logger LOG = LoggerFactory.getLogger(Read.class);
/** Instantiates a new Read. */
@@ -422,6 +412,7 @@ public class FhirIO {
/** The type Result. */
public static class Result implements POutput, PInput {
+
private PCollection<String> resources;
private PCollection<HealthcareIOError<String>> failedReads;
@@ -628,6 +619,7 @@ public class FhirIO {
/** The type Result. */
public static class Result implements POutput {
+
private final Pipeline pipeline;
private final PCollection<String> successfulBodies;
private final PCollection<HealthcareIOError<String>> failedBodies;
@@ -932,12 +924,12 @@ public class FhirIO {
*/
public static class Import extends Write {
+ private static final Logger LOG = LoggerFactory.getLogger(Import.class);
+
private final ValueProvider<String> fhirStore;
private final ValueProvider<String> deadLetterGcsPath;
+ private final ValueProvider<String> tempGcsPath;
private final ContentStructure contentStructure;
- private static final int DEFAULT_FILES_PER_BATCH = 10000;
- private static final Logger LOG = LoggerFactory.getLogger(Import.class);
- private ValueProvider<String> tempGcsPath;
/*
* Instantiates a new Import.
@@ -1029,29 +1021,22 @@ public class FhirIO {
getImportGcsTempPath()
.orElse(StaticValueProvider.of(input.getPipeline().getOptions().getTempLocation()));
- // Write bundles of String to GCS
+ // Write input json in batches to GCS.
PCollectionTuple writeTmpFileResults =
input.apply(
- "Write nd json to GCS",
- ParDo.of(new WriteBundlesToFilesFn(fhirStore, tempPath,
deadLetterGcsPath))
+ "Write input to GCS",
+ ParDo.of(new WriteBatchToFilesFn(tempGcsPath))
.withOutputTags(Write.TEMP_FILES,
TupleTagList.of(Write.FAILED_BODY)));
PCollection<HealthcareIOError<String>> failedBodies =
writeTmpFileResults
.get(Write.FAILED_BODY)
.setCoder(HealthcareIOErrorCoder.of(StringUtf8Coder.of()));
- int numShards = 100;
+
PCollection<HealthcareIOError<String>> failedFiles =
writeTmpFileResults
.get(Write.TEMP_FILES)
.apply(
- "Shard files", // to paralelize group into batches
- WithKeys.of(elm -> ThreadLocalRandom.current().nextInt(0,
numShards)))
- .setCoder(KvCoder.of(TextualIntegerCoder.of(),
ResourceIdCoder.of()))
- .apply("Assemble File Batches",
GroupIntoBatches.ofSize(DEFAULT_FILES_PER_BATCH))
- .setCoder(
- KvCoder.of(TextualIntegerCoder.of(),
IterableCoder.of(ResourceIdCoder.of())))
- .apply(
"Import Batches",
ParDo.of(new ImportFn(fhirStore, tempPath,
deadLetterGcsPath, contentStructure)))
.setCoder(HealthcareIOErrorCoder.of(StringUtf8Coder.of()));
@@ -1094,71 +1079,51 @@ public class FhirIO {
return Write.Result.in(input.getPipeline(), failedBodies, failedFiles);
}
- /** The Write bundles to new line delimited json files. */
- static class WriteBundlesToFilesFn extends DoFn<String, ResourceId> {
+ /**
+ * WriteToFilesFn writes the input JSON to files in the provided temporary
GCS location.
+ * Multiple inputs are written to the same file, and therefore the input
JSON is additionally
+ * processed into NDJSON. The size of a single file is determined
according to the size of a
+ * batch's window (large for bounded PCollections).
+ */
+ static class WriteBatchToFilesFn extends DoFn<String, ResourceId> {
- private final ValueProvider<String> fhirStore;
private final ValueProvider<String> tempGcsPath;
- private final ValueProvider<String> deadLetterGcsPath;
+
private ObjectMapper mapper;
+
private ResourceId resourceId;
private WritableByteChannel ndJsonChannel;
private BoundedWindow window;
- private transient HealthcareApiClient client;
- private static final Logger LOG =
LoggerFactory.getLogger(WriteBundlesToFilesFn.class);
-
- WriteBundlesToFilesFn(
- ValueProvider<String> fhirStore,
- ValueProvider<String> tempGcsPath,
- ValueProvider<String> deadLetterGcsPath) {
- this.fhirStore = fhirStore;
- this.tempGcsPath = tempGcsPath;
- this.deadLetterGcsPath = deadLetterGcsPath;
- }
-
/**
- * Instantiates a new Import fn.
+ * Writes batches of NDJSON to the gcs path.
*
- * @param fhirStore the fhir store
- * @param tempGcsPath the temp gcs path
- * @param deadLetterGcsPath the dead letter gcs path
+ * @param tempGcsPath the gcs path to write files to
*/
- WriteBundlesToFilesFn(String fhirStore, String tempGcsPath, String
deadLetterGcsPath) {
- this.fhirStore = StaticValueProvider.of(fhirStore);
- this.tempGcsPath = StaticValueProvider.of(tempGcsPath);
- this.deadLetterGcsPath = StaticValueProvider.of(deadLetterGcsPath);
+ WriteBatchToFilesFn(ValueProvider<String> tempGcsPath) {
+ this.tempGcsPath = tempGcsPath;
}
- /**
- * Init client.
- *
- * @throws IOException the io exception
- */
@Setup
- public void initClient() throws IOException {
- this.client = new HttpHealthcareApiClient();
+ public void init() throws IOException {
+ this.mapper = new ObjectMapper();
}
/**
- * Init batch.
+ * Init the NDJSON file for the current batch.
*
* @throws IOException the io exception
*/
@StartBundle
public void initFile() throws IOException {
- // Write each bundle to newline delimited JSON file.
- String filename = String.format("fhirImportBatch-%s.ndjson",
UUID.randomUUID().toString());
+ String filename = String.format("fhirImportBatch-%s.ndjson",
UUID.randomUUID());
ResourceId tempDir =
FileSystems.matchNewResource(this.tempGcsPath.get(), true);
this.resourceId = tempDir.resolve(filename,
StandardResolveOptions.RESOLVE_FILE);
this.ndJsonChannel = FileSystems.create(resourceId,
"application/ld+json");
- if (mapper == null) {
- this.mapper = new ObjectMapper();
- }
}
/**
- * Add to batch.
+ * Add the input JSON to the batch, converting to NDJSON in the process.
*
* @param context the context
* @throws IOException the io exception
@@ -1177,7 +1142,7 @@ public class FhirIO {
String.format(
"Failed to parse payload: %s as json at: %s : %s."
+ "Dropping message from batch import.",
- httpBody.toString(), e.getLocation().getCharOffset(),
e.getMessage());
+ httpBody, e.getLocation().getCharOffset(), e.getMessage());
LOG.warn(resource);
context.output(
Write.FAILED_BODY, HealthcareIOError.of(httpBody, new
IOException(resource)));
@@ -1192,27 +1157,33 @@ public class FhirIO {
*/
@FinishBundle
public void closeFile(FinishBundleContext context) throws IOException {
- // Write the file with all elements in this bundle to GCS.
+ // Write the file with all elements in this batch to GCS.
ndJsonChannel.close();
context.output(resourceId, window.maxTimestamp(), window);
}
}
/** Import batches of new line delimited json files to FHIR Store. */
- static class ImportFn
- extends DoFn<KV<Integer, Iterable<ResourceId>>,
HealthcareIOError<String>> {
+ static class ImportFn extends DoFn<ResourceId, HealthcareIOError<String>> {
private static final Counter IMPORT_ERRORS =
Metrics.counter(ImportFn.class, BASE_METRIC_PREFIX +
"resources_imported_failure_count");
private static final Counter IMPORT_SUCCESS =
Metrics.counter(ImportFn.class, BASE_METRIC_PREFIX +
"resources_imported_success_count");
private static final Logger LOG =
LoggerFactory.getLogger(ImportFn.class);
+
+ private final ValueProvider<String> fhirStore;
private final ValueProvider<String> tempGcsPath;
private final ValueProvider<String> deadLetterGcsPath;
- private ResourceId tempDir;
private final ContentStructure contentStructure;
+
+ private ResourceId tempDir;
private HealthcareApiClient client;
- private final ValueProvider<String> fhirStore;
+
+ private BoundedWindow window;
+ private List<ResourceId> files;
+ private List<ResourceId> tempDestinations;
+ private List<ResourceId> deadLetterDestinations;
ImportFn(
ValueProvider<String> fhirStore,
@@ -1231,41 +1202,48 @@ public class FhirIO {
@Setup
public void init() throws IOException {
+ client = new HttpHealthcareApiClient();
+ }
+
+ @StartBundle
+ public void initBatch() {
tempDir =
FileSystems.matchNewResource(tempGcsPath.get(), true)
.resolve(
- String.format("tmp-%s", UUID.randomUUID().toString()),
+ String.format("tmp-%s", UUID.randomUUID()),
StandardResolveOptions.RESOLVE_DIRECTORY);
- client = new HttpHealthcareApiClient();
+
+ files = new ArrayList<>();
+ tempDestinations = new ArrayList<>();
+ deadLetterDestinations = new ArrayList<>();
}
- /**
- * Move files to a temporary subdir (to provide common prefix) to
execute import with single
- * GCS URI.
- */
@ProcessElement
- public void importBatch(
- @Element KV<Integer, Iterable<ResourceId>> element,
- OutputReceiver<HealthcareIOError<String>> output)
- throws IOException {
- Iterable<ResourceId> batch = element.getValue();
- List<ResourceId> tempDestinations = new ArrayList<>();
- List<ResourceId> deadLetterDestinations = new ArrayList<>();
- assert batch != null;
- for (ResourceId file : batch) {
- tempDestinations.add(
- tempDir.resolve(file.getFilename(),
StandardResolveOptions.RESOLVE_FILE));
- deadLetterDestinations.add(
- FileSystems.matchNewResource(deadLetterGcsPath.get(), true)
- .resolve(file.getFilename(),
StandardResolveOptions.RESOLVE_FILE));
- }
- // Ignore missing files since this might be a retry, which means files
- // should have been copied over.
- FileSystems.copy(
- ImmutableList.copyOf(batch),
+ public void process(ProcessContext context, BoundedWindow window) throws
IOException {
+ this.window = window;
+
+ ResourceId file = context.element();
+ assert file != null;
+ files.add(file);
+ tempDestinations.add(
+ tempDir.resolve(file.getFilename(),
StandardResolveOptions.RESOLVE_FILE));
+ deadLetterDestinations.add(
+ FileSystems.matchNewResource(deadLetterGcsPath.get(), true)
+ .resolve(file.getFilename(),
StandardResolveOptions.RESOLVE_FILE));
+ }
+
+ @FinishBundle
+ public void importBatch(FinishBundleContext context) throws IOException {
+ // Move files to a temporary subdir (to provide common prefix) to
execute import with single
+ // GCS URI and allow for retries.
+ // IGNORE_MISSING_FILES ignores missing source files, we enable this
as if this is a retry
+ // files should have already been moved over.
+ FileSystems.rename(
+ ImmutableList.copyOf(files),
tempDestinations,
StandardMoveOptions.IGNORE_MISSING_FILES);
- // Check whether any temporary files are not present.
+ // Even in a retry we need to check that all temporary files are
present in the temporary
+ // destination.
boolean hasMissingFile =
FileSystems.matchResources(tempDestinations).stream()
.anyMatch((MatchResult r) -> r.status() != Status.OK);
@@ -1273,6 +1251,7 @@ public class FhirIO {
throw new IllegalStateException("Not all temporary files are present
for importing.");
}
ResourceId importUri = tempDir.resolve("*",
StandardResolveOptions.RESOLVE_FILE);
+
try {
// Blocking fhirStores.import request.
assert contentStructure != null;
@@ -1290,14 +1269,10 @@ public class FhirIO {
LOG.warn(
String.format(
"Failed to import %s with error: %s. Moving to deadletter
path %s",
- importUri.toString(), e.getMessage(),
deadLetterResourceId.toString()));
+ importUri, e.getMessage(), deadLetterResourceId.toString()));
FileSystems.rename(tempDestinations, deadLetterDestinations);
- output.output(HealthcareIOError.of(importUri.toString(), e));
- } finally {
- // If we've reached this point files have either been successfully
import to FHIR store
- // or moved to Dead Letter Queue.
- // Clean up original files for this batch on GCS.
- FileSystems.delete(ImmutableList.copyOf(batch));
+ context.output(
+ HealthcareIOError.of(importUri.toString(), e),
window.maxTimestamp(), window);
}
}
}
@@ -1326,6 +1301,7 @@ public class FhirIO {
/** The type Execute bundles. */
public static class ExecuteBundles extends PTransform<PCollection<String>,
Write.Result> {
+
private final ValueProvider<String> fhirStore;
/**
@@ -1415,6 +1391,7 @@ public class FhirIO {
/** Export FHIR resources from a FHIR store to new line delimited json files
on GCS. */
public static class Export extends PTransform<PBegin, PCollection<String>> {
+
private final ValueProvider<String> fhirStore;
private final ValueProvider<String> exportGcsUriPrefix;
@@ -1477,6 +1454,7 @@ public class FhirIO {
/** Deidentify FHIR resources from a FHIR store to a destination FHIR store.
*/
public static class Deidentify extends PTransform<PBegin,
PCollection<String>> {
+
private final ValueProvider<String> sourceFhirStore;
private final ValueProvider<String> destinationFhirStore;
private final ValueProvider<DeidentifyConfig> deidConfig;
@@ -1547,6 +1525,7 @@ public class FhirIO {
/** The type Search. */
public static class Search<T>
extends PTransform<PCollection<FhirSearchParameter<T>>,
FhirIO.Search.Result> {
+
private static final Logger LOG = LoggerFactory.getLogger(Search.class);
private final ValueProvider<String> fhirStore;
@@ -1560,6 +1539,7 @@ public class FhirIO {
}
public static class Result implements POutput, PInput {
+
private PCollection<KV<String, JsonArray>> keyedResources;
private PCollection<JsonArray> resources;