This is an automated email from the ASF dual-hosted git repository.

pabloem pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 5e44a13  Merge pull request #15873 from [BEAM-13181] Remove Sharding 
from FhirIO.Import
5e44a13 is described below

commit 5e44a133ce69c95aa434536b73ae6f757c243cce
Author: Milena Bukal <[email protected]>
AuthorDate: Mon Nov 8 15:19:09 2021 -0500

    Merge pull request #15873 from [BEAM-13181] Remove Sharding from 
FhirIO.Import
    
    * Test FhirIO improvements
    
    * Remove batching from FhirIO.Import
    
    * Minor cleanup
    
    * Fix tmpGcsPath input
---
 .../apache/beam/sdk/io/gcp/healthcare/FhirIO.java  | 236 ++++++++++-----------
 1 file changed, 108 insertions(+), 128 deletions(-)

diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/FhirIO.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/FhirIO.java
index a55cfb4..609bb61 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/FhirIO.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/FhirIO.java
@@ -40,12 +40,9 @@ import java.util.Map;
 import java.util.NoSuchElementException;
 import java.util.Optional;
 import java.util.UUID;
-import java.util.concurrent.ThreadLocalRandom;
 import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.coders.IterableCoder;
 import org.apache.beam.sdk.coders.KvCoder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
-import org.apache.beam.sdk.coders.TextualIntegerCoder;
 import org.apache.beam.sdk.coders.VoidCoder;
 import org.apache.beam.sdk.io.FileIO;
 import org.apache.beam.sdk.io.FileSystems;
@@ -57,7 +54,6 @@ import org.apache.beam.sdk.io.fs.MatchResult.Status;
 import org.apache.beam.sdk.io.fs.MoveOptions.StandardMoveOptions;
 import org.apache.beam.sdk.io.fs.ResolveOptions.StandardResolveOptions;
 import org.apache.beam.sdk.io.fs.ResourceId;
-import org.apache.beam.sdk.io.fs.ResourceIdCoder;
 import 
org.apache.beam.sdk.io.gcp.healthcare.HttpHealthcareApiClient.HealthcareHttpException;
 import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
 import org.apache.beam.sdk.metrics.Counter;
@@ -67,12 +63,10 @@ import org.apache.beam.sdk.options.ValueProvider;
 import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
 import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.GroupIntoBatches;
 import org.apache.beam.sdk.transforms.MapElements;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.Wait;
-import org.apache.beam.sdk.transforms.WithKeys;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PBegin;
@@ -100,15 +94,15 @@ import org.slf4j.LoggerFactory;
  *
  * <h3>Reading</h3>
  *
- * <p>FHIR resources can be read with {@link FhirIO.Read}, which supports use 
cases where you have a
- * ${@link PCollection} of message IDs. This is appropriate for reading the 
Fhir notifications from
- * a Pub/Sub subscription with {@link PubsubIO#readStrings()} or in cases 
where you have a manually
- * prepared list of messages that you need to process (e.g. in a text file 
read with {@link
+ * <p>FHIR resources can be read with {@link FhirIO.Read}, which supports use 
cases where you have
+ * a ${@link PCollection} of message IDs. This is appropriate for reading the 
Fhir notifications
+ * from a Pub/Sub subscription with {@link PubsubIO#readStrings()} or in cases 
where you have a
+ * manually prepared list of messages that you need to process (e.g. in a text 
file read with {@link
  * org.apache.beam.sdk.io.TextIO}*) .
  *
- * <p>Fetch Resource contents from Fhir Store based on the {@link PCollection} 
of message ID strings
- * {@link FhirIO.Read.Result} where one can call {@link 
Read.Result#getResources()} to retrieve a
- * {@link PCollection} containing the successfully fetched {@link String}s 
and/or {@link
+ * <p>Fetch Resource contents from Fhir Store based on the {@link PCollection} 
of message ID
+ * strings {@link FhirIO.Read.Result} where one can call {@link 
Read.Result#getResources()} to
+ * retrieve a {@link PCollection} containing the successfully fetched {@link 
String}s and/or {@link
  * FhirIO.Read.Result#getFailedReads()}* to retrieve a {@link PCollection} of 
{@link
  * HealthcareIOError}* containing the resource ID that could not be fetched 
and the exception as a
  * {@link HealthcareIOError}, this can be used to write to the dead letter 
storage system of your
@@ -119,8 +113,8 @@ import org.slf4j.LoggerFactory;
  *
  * <p>Write Resources can be written to FHIR with two different methods: 
Import or Execute Bundle.
  *
- * <p>Execute Bundle This is best for use cases where you are writing to a 
non-empty FHIR store with
- * other clients or otherwise need referential integrity (e.g. A Streaming 
HL7v2 to FHIR ETL
+ * <p>Execute Bundle This is best for use cases where you are writing to a 
non-empty FHIR store
+ * with other clients or otherwise need referential integrity (e.g. A 
Streaming HL7v2 to FHIR ETL
  * pipeline).
  *
  * <p>Import This is best for use cases where you are populating an empty FHIR 
store with no other
@@ -133,34 +127,28 @@ import org.slf4j.LoggerFactory;
  * resources are in ndjson (newline delimited json) of FHIR resources. It is 
important that when
  * using export you give the appropriate permissions to the Google Cloud 
Healthcare Service Agent.
  *
- * <p>Deidentify This is to de-identify FHIR resources from a source FHIR 
store and write the result
- * to a destination FHIR store. It is important that the destination store 
must already exist.
+ * <p>Deidentify This is to de-identify FHIR resources from a source FHIR 
store and write the
+ * result to a destination FHIR store. It is important that the destination 
store must already
+ * exist.
  *
  * <p>Search This is to search FHIR resources within a given FHIR store. The 
inputs are individual
  * FHIR Search queries, represented by the FhirSearchParameter class. The 
outputs are results of
  * each Search, represented as a Json array of FHIR resources in string form, 
with pagination
  * handled, and an optional input key.
  *
- * @see <a
- *     
href=>https://cloud.google.com/healthcare/docs/reference/rest/v1/projects.locations.datasets.fhirStores.fhir/executeBundle></a>
- * @see <a
- *     
href=>https://cloud.google.com/healthcare/docs/how-tos/permissions-healthcare-api-gcp-products#fhir_store_cloud_storage_permissions></a>
- * @see <a
- *     
href=>https://cloud.google.com/healthcare/docs/reference/rest/v1/projects.locations.datasets.fhirStores/import></a>
- * @see <a
- *     
href=>https://cloud.google.com/healthcare/docs/reference/rest/v1/projects.locations.datasets.fhirStores/export></a>
- * @see <a
- *     
href=>https://cloud.google.com/healthcare/docs/reference/rest/v1/projects.locations.datasets.fhirStores/deidentify></a>
- * @see <a
- *     
href=>https://cloud.google.com/healthcare/docs/reference/rest/v1/projects.locations.datasets.fhirStores/search></a>
- *     A {@link PCollection} of {@link String} can be ingested into an Fhir 
store using {@link
- *     FhirIO.Write#fhirStoresImport(String, String, String, 
FhirIO.Import.ContentStructure)} This
- *     will return a {@link FhirIO.Write.Result} on which you can call {@link
- *     FhirIO.Write.Result#getFailedBodies()} to retrieve a {@link 
PCollection} of {@link
- *     HealthcareIOError} containing the {@link String} that failed to be 
ingested and the
- *     exception.
- *     <p>Example
- *     <pre>{@code
+ * @see <a 
href=>https://cloud.google.com/healthcare/docs/reference/rest/v1/projects.locations.datasets.fhirStores.fhir/executeBundle></a>
+ * @see <a 
href=>https://cloud.google.com/healthcare/docs/how-tos/permissions-healthcare-api-gcp-products#fhir_store_cloud_storage_permissions></a>
+ * @see <a 
href=>https://cloud.google.com/healthcare/docs/reference/rest/v1/projects.locations.datasets.fhirStores/import></a>
+ * @see <a 
href=>https://cloud.google.com/healthcare/docs/reference/rest/v1/projects.locations.datasets.fhirStores/export></a>
+ * @see <a 
href=>https://cloud.google.com/healthcare/docs/reference/rest/v1/projects.locations.datasets.fhirStores/deidentify></a>
+ * @see <a 
href=>https://cloud.google.com/healthcare/docs/reference/rest/v1/projects.locations.datasets.fhirStores/search></a>
+ * A {@link PCollection} of {@link String} can be ingested into an Fhir store 
using {@link
+ * FhirIO.Write#fhirStoresImport(String, String, String, 
FhirIO.Import.ContentStructure)} This will
+ * return a {@link FhirIO.Write.Result} on which you can call {@link
+ * FhirIO.Write.Result#getFailedBodies()} to retrieve a {@link PCollection} of 
{@link
+ * HealthcareIOError} containing the {@link String} that failed to be ingested 
and the exception.
+ * <p>Example
+ * <pre>{@code
  * Pipeline pipeline = ...
  *
  * // Tail the FHIR store by retrieving resources based on Pub/Sub 
notifications.
@@ -240,6 +228,7 @@ import org.slf4j.LoggerFactory;
   "nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-10402)
 })
 public class FhirIO {
+
   private static final String BASE_METRIC_PREFIX = "fhirio/";
   private static final String LRO_COUNTER_KEY = "counter";
   private static final String LRO_SUCCESS_KEY = "success";
@@ -415,6 +404,7 @@ public class FhirIO {
 
   /** The type Read. */
   public static class Read extends PTransform<PCollection<String>, 
FhirIO.Read.Result> {
+
     private static final Logger LOG = LoggerFactory.getLogger(Read.class);
 
     /** Instantiates a new Read. */
@@ -422,6 +412,7 @@ public class FhirIO {
 
     /** The type Result. */
     public static class Result implements POutput, PInput {
+
       private PCollection<String> resources;
 
       private PCollection<HealthcareIOError<String>> failedReads;
@@ -628,6 +619,7 @@ public class FhirIO {
 
     /** The type Result. */
     public static class Result implements POutput {
+
       private final Pipeline pipeline;
       private final PCollection<String> successfulBodies;
       private final PCollection<HealthcareIOError<String>> failedBodies;
@@ -932,12 +924,12 @@ public class FhirIO {
    */
   public static class Import extends Write {
 
+    private static final Logger LOG = LoggerFactory.getLogger(Import.class);
+
     private final ValueProvider<String> fhirStore;
     private final ValueProvider<String> deadLetterGcsPath;
+    private final ValueProvider<String> tempGcsPath;
     private final ContentStructure contentStructure;
-    private static final int DEFAULT_FILES_PER_BATCH = 10000;
-    private static final Logger LOG = LoggerFactory.getLogger(Import.class);
-    private ValueProvider<String> tempGcsPath;
 
     /*
      * Instantiates a new Import.
@@ -1029,29 +1021,22 @@ public class FhirIO {
           getImportGcsTempPath()
               
.orElse(StaticValueProvider.of(input.getPipeline().getOptions().getTempLocation()));
 
-      // Write bundles of String to GCS
+      // Write input json in batches to GCS.
       PCollectionTuple writeTmpFileResults =
           input.apply(
-              "Write nd json to GCS",
-              ParDo.of(new WriteBundlesToFilesFn(fhirStore, tempPath, 
deadLetterGcsPath))
+              "Write input to GCS",
+              ParDo.of(new WriteBatchToFilesFn(tempGcsPath))
                   .withOutputTags(Write.TEMP_FILES, 
TupleTagList.of(Write.FAILED_BODY)));
 
       PCollection<HealthcareIOError<String>> failedBodies =
           writeTmpFileResults
               .get(Write.FAILED_BODY)
               .setCoder(HealthcareIOErrorCoder.of(StringUtf8Coder.of()));
-      int numShards = 100;
+
       PCollection<HealthcareIOError<String>> failedFiles =
           writeTmpFileResults
               .get(Write.TEMP_FILES)
               .apply(
-                  "Shard files", // to paralelize group into batches
-                  WithKeys.of(elm -> ThreadLocalRandom.current().nextInt(0, 
numShards)))
-              .setCoder(KvCoder.of(TextualIntegerCoder.of(), 
ResourceIdCoder.of()))
-              .apply("Assemble File Batches", 
GroupIntoBatches.ofSize(DEFAULT_FILES_PER_BATCH))
-              .setCoder(
-                  KvCoder.of(TextualIntegerCoder.of(), 
IterableCoder.of(ResourceIdCoder.of())))
-              .apply(
                   "Import Batches",
                   ParDo.of(new ImportFn(fhirStore, tempPath, 
deadLetterGcsPath, contentStructure)))
               .setCoder(HealthcareIOErrorCoder.of(StringUtf8Coder.of()));
@@ -1094,71 +1079,51 @@ public class FhirIO {
       return Write.Result.in(input.getPipeline(), failedBodies, failedFiles);
     }
 
-    /** The Write bundles to new line delimited json files. */
-    static class WriteBundlesToFilesFn extends DoFn<String, ResourceId> {
+    /**
+     * WriteToFilesFn writes the input JSON to files in the provided temporary 
GCS location.
+     * Multiple inputs are written to the same file, and therefore the input 
JSON is additionally
+     * processed into NDJSON. The size of a single file is determined 
according to the size of a
+     * batch's window (large for bounded PCollections).
+     */
+    static class WriteBatchToFilesFn extends DoFn<String, ResourceId> {
 
-      private final ValueProvider<String> fhirStore;
       private final ValueProvider<String> tempGcsPath;
-      private final ValueProvider<String> deadLetterGcsPath;
+
       private ObjectMapper mapper;
+
       private ResourceId resourceId;
       private WritableByteChannel ndJsonChannel;
       private BoundedWindow window;
 
-      private transient HealthcareApiClient client;
-      private static final Logger LOG = 
LoggerFactory.getLogger(WriteBundlesToFilesFn.class);
-
-      WriteBundlesToFilesFn(
-          ValueProvider<String> fhirStore,
-          ValueProvider<String> tempGcsPath,
-          ValueProvider<String> deadLetterGcsPath) {
-        this.fhirStore = fhirStore;
-        this.tempGcsPath = tempGcsPath;
-        this.deadLetterGcsPath = deadLetterGcsPath;
-      }
-
       /**
-       * Instantiates a new Import fn.
+       * Writes batches of NDJSON to the gcs path.
        *
-       * @param fhirStore the fhir store
-       * @param tempGcsPath the temp gcs path
-       * @param deadLetterGcsPath the dead letter gcs path
+       * @param tempGcsPath the gcs path to write files to
        */
-      WriteBundlesToFilesFn(String fhirStore, String tempGcsPath, String 
deadLetterGcsPath) {
-        this.fhirStore = StaticValueProvider.of(fhirStore);
-        this.tempGcsPath = StaticValueProvider.of(tempGcsPath);
-        this.deadLetterGcsPath = StaticValueProvider.of(deadLetterGcsPath);
+      WriteBatchToFilesFn(ValueProvider<String> tempGcsPath) {
+        this.tempGcsPath = tempGcsPath;
       }
 
-      /**
-       * Init client.
-       *
-       * @throws IOException the io exception
-       */
       @Setup
-      public void initClient() throws IOException {
-        this.client = new HttpHealthcareApiClient();
+      public void init() throws IOException {
+        this.mapper = new ObjectMapper();
       }
 
       /**
-       * Init batch.
+       * Init the NDJSON file for the current batch.
        *
        * @throws IOException the io exception
        */
       @StartBundle
       public void initFile() throws IOException {
-        // Write each bundle to newline delimited JSON file.
-        String filename = String.format("fhirImportBatch-%s.ndjson", 
UUID.randomUUID().toString());
+        String filename = String.format("fhirImportBatch-%s.ndjson", 
UUID.randomUUID());
         ResourceId tempDir = 
FileSystems.matchNewResource(this.tempGcsPath.get(), true);
         this.resourceId = tempDir.resolve(filename, 
StandardResolveOptions.RESOLVE_FILE);
         this.ndJsonChannel = FileSystems.create(resourceId, 
"application/ld+json");
-        if (mapper == null) {
-          this.mapper = new ObjectMapper();
-        }
       }
 
       /**
-       * Add to batch.
+       * Add the input JSON to the batch, converting to NDJSON in the process.
        *
        * @param context the context
        * @throws IOException the io exception
@@ -1177,7 +1142,7 @@ public class FhirIO {
               String.format(
                   "Failed to parse payload: %s as json at: %s : %s."
                       + "Dropping message from batch import.",
-                  httpBody.toString(), e.getLocation().getCharOffset(), 
e.getMessage());
+                  httpBody, e.getLocation().getCharOffset(), e.getMessage());
           LOG.warn(resource);
           context.output(
               Write.FAILED_BODY, HealthcareIOError.of(httpBody, new 
IOException(resource)));
@@ -1192,27 +1157,33 @@ public class FhirIO {
        */
       @FinishBundle
       public void closeFile(FinishBundleContext context) throws IOException {
-        // Write the file with all elements in this bundle to GCS.
+        // Write the file with all elements in this batch to GCS.
         ndJsonChannel.close();
         context.output(resourceId, window.maxTimestamp(), window);
       }
     }
 
     /** Import batches of new line delimited json files to FHIR Store. */
-    static class ImportFn
-        extends DoFn<KV<Integer, Iterable<ResourceId>>, 
HealthcareIOError<String>> {
+    static class ImportFn extends DoFn<ResourceId, HealthcareIOError<String>> {
 
       private static final Counter IMPORT_ERRORS =
           Metrics.counter(ImportFn.class, BASE_METRIC_PREFIX + 
"resources_imported_failure_count");
       private static final Counter IMPORT_SUCCESS =
           Metrics.counter(ImportFn.class, BASE_METRIC_PREFIX + 
"resources_imported_success_count");
       private static final Logger LOG = 
LoggerFactory.getLogger(ImportFn.class);
+
+      private final ValueProvider<String> fhirStore;
       private final ValueProvider<String> tempGcsPath;
       private final ValueProvider<String> deadLetterGcsPath;
-      private ResourceId tempDir;
       private final ContentStructure contentStructure;
+
+      private ResourceId tempDir;
       private HealthcareApiClient client;
-      private final ValueProvider<String> fhirStore;
+
+      private BoundedWindow window;
+      private List<ResourceId> files;
+      private List<ResourceId> tempDestinations;
+      private List<ResourceId> deadLetterDestinations;
 
       ImportFn(
           ValueProvider<String> fhirStore,
@@ -1231,41 +1202,48 @@ public class FhirIO {
 
       @Setup
       public void init() throws IOException {
+        client = new HttpHealthcareApiClient();
+      }
+
+      @StartBundle
+      public void initBatch() {
         tempDir =
             FileSystems.matchNewResource(tempGcsPath.get(), true)
                 .resolve(
-                    String.format("tmp-%s", UUID.randomUUID().toString()),
+                    String.format("tmp-%s", UUID.randomUUID()),
                     StandardResolveOptions.RESOLVE_DIRECTORY);
-        client = new HttpHealthcareApiClient();
+
+        files = new ArrayList<>();
+        tempDestinations = new ArrayList<>();
+        deadLetterDestinations = new ArrayList<>();
       }
 
-      /**
-       * Move files to a temporary subdir (to provide common prefix) to 
execute import with single
-       * GCS URI.
-       */
       @ProcessElement
-      public void importBatch(
-          @Element KV<Integer, Iterable<ResourceId>> element,
-          OutputReceiver<HealthcareIOError<String>> output)
-          throws IOException {
-        Iterable<ResourceId> batch = element.getValue();
-        List<ResourceId> tempDestinations = new ArrayList<>();
-        List<ResourceId> deadLetterDestinations = new ArrayList<>();
-        assert batch != null;
-        for (ResourceId file : batch) {
-          tempDestinations.add(
-              tempDir.resolve(file.getFilename(), 
StandardResolveOptions.RESOLVE_FILE));
-          deadLetterDestinations.add(
-              FileSystems.matchNewResource(deadLetterGcsPath.get(), true)
-                  .resolve(file.getFilename(), 
StandardResolveOptions.RESOLVE_FILE));
-        }
-        // Ignore missing files since this might be a retry, which means files
-        // should have been copied over.
-        FileSystems.copy(
-            ImmutableList.copyOf(batch),
+      public void process(ProcessContext context, BoundedWindow window) throws 
IOException {
+        this.window = window;
+
+        ResourceId file = context.element();
+        assert file != null;
+        files.add(file);
+        tempDestinations.add(
+            tempDir.resolve(file.getFilename(), 
StandardResolveOptions.RESOLVE_FILE));
+        deadLetterDestinations.add(
+            FileSystems.matchNewResource(deadLetterGcsPath.get(), true)
+                .resolve(file.getFilename(), 
StandardResolveOptions.RESOLVE_FILE));
+      }
+
+      @FinishBundle
+      public void importBatch(FinishBundleContext context) throws IOException {
+        // Move files to a temporary subdir (to provide common prefix) to 
execute import with single
+        // GCS URI and allow for retries.
+        // IGNORE_MISSING_FILES ignores missing source files, we enable this 
as if this is a retry
+        // files should have already been moved over.
+        FileSystems.rename(
+            ImmutableList.copyOf(files),
             tempDestinations,
             StandardMoveOptions.IGNORE_MISSING_FILES);
-        // Check whether any temporary files are not present.
+        // Even in a retry we need to check that all temporary files are 
present in the temporary
+        // destination.
         boolean hasMissingFile =
             FileSystems.matchResources(tempDestinations).stream()
                 .anyMatch((MatchResult r) -> r.status() != Status.OK);
@@ -1273,6 +1251,7 @@ public class FhirIO {
           throw new IllegalStateException("Not all temporary files are present 
for importing.");
         }
         ResourceId importUri = tempDir.resolve("*", 
StandardResolveOptions.RESOLVE_FILE);
+
         try {
           // Blocking fhirStores.import request.
           assert contentStructure != null;
@@ -1290,14 +1269,10 @@ public class FhirIO {
           LOG.warn(
               String.format(
                   "Failed to import %s with error: %s. Moving to deadletter 
path %s",
-                  importUri.toString(), e.getMessage(), 
deadLetterResourceId.toString()));
+                  importUri, e.getMessage(), deadLetterResourceId.toString()));
           FileSystems.rename(tempDestinations, deadLetterDestinations);
-          output.output(HealthcareIOError.of(importUri.toString(), e));
-        } finally {
-          // If we've reached this point files have either been successfully 
import to FHIR store
-          // or moved to Dead Letter Queue.
-          // Clean up original files for this batch on GCS.
-          FileSystems.delete(ImmutableList.copyOf(batch));
+          context.output(
+              HealthcareIOError.of(importUri.toString(), e), 
window.maxTimestamp(), window);
         }
       }
     }
@@ -1326,6 +1301,7 @@ public class FhirIO {
 
   /** The type Execute bundles. */
   public static class ExecuteBundles extends PTransform<PCollection<String>, 
Write.Result> {
+
     private final ValueProvider<String> fhirStore;
 
     /**
@@ -1415,6 +1391,7 @@ public class FhirIO {
 
   /** Export FHIR resources from a FHIR store to new line delimited json files 
on GCS. */
   public static class Export extends PTransform<PBegin, PCollection<String>> {
+
     private final ValueProvider<String> fhirStore;
     private final ValueProvider<String> exportGcsUriPrefix;
 
@@ -1477,6 +1454,7 @@ public class FhirIO {
 
   /** Deidentify FHIR resources from a FHIR store to a destination FHIR store. 
*/
   public static class Deidentify extends PTransform<PBegin, 
PCollection<String>> {
+
     private final ValueProvider<String> sourceFhirStore;
     private final ValueProvider<String> destinationFhirStore;
     private final ValueProvider<DeidentifyConfig> deidConfig;
@@ -1547,6 +1525,7 @@ public class FhirIO {
   /** The type Search. */
   public static class Search<T>
       extends PTransform<PCollection<FhirSearchParameter<T>>, 
FhirIO.Search.Result> {
+
     private static final Logger LOG = LoggerFactory.getLogger(Search.class);
 
     private final ValueProvider<String> fhirStore;
@@ -1560,6 +1539,7 @@ public class FhirIO {
     }
 
     public static class Result implements POutput, PInput {
+
       private PCollection<KV<String, JsonArray>> keyedResources;
       private PCollection<JsonArray> resources;
 

Reply via email to