lastomato commented on a change in pull request #13395:
URL: https://github.com/apache/beam/pull/13395#discussion_r530527697
##########
File path:
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/FhirIO.java
##########
@@ -1388,4 +1410,208 @@ public void deidentify(ProcessContext context)
}
}
}
+
+ /** The type Search. */
+ public static class Search extends PTransform<PCollection<KV<String,
Map<String, String>>>, FhirIO.Search.Result> {
+ private static final Logger LOG = LoggerFactory.getLogger(Search.class);
+
+ private final ValueProvider<String> fhirStore;
+
+ /**
+ * Instantiates a new Search.
+ *
+ * @param fhirStore the fhir store
+ */
+ Search(ValueProvider<String> fhirStore) {
+ this.fhirStore = fhirStore;
+ }
+
+ /**
+ * Instantiates a new Search.
+ *
+ * @param fhirStore the fhir store
+ */
+ Search(String fhirStore) {
+ this.fhirStore = StaticValueProvider.of(fhirStore);
+ }
+
+ /** The type Result. */
+ public static class Result implements POutput, PInput {
+ private PCollection<String> resources;
+
+ private PCollection<HealthcareIOError<String>> failedSearches;
+ PCollectionTuple pct;
+
+ /**
+ * Create FhirIO.Search.Result form PCollectionTuple with OUT and
DEAD_LETTER tags.
+ *
+ * @param pct the pct
+ * @return the search result
+ * @throws IllegalArgumentException the illegal argument exception
+ */
+ static FhirIO.Search.Result of(PCollectionTuple pct) throws
IllegalArgumentException {
+ if (pct.getAll()
+ .keySet()
+ .containsAll((Collection<?>)
TupleTagList.of(OUT).and(DEAD_LETTER))) {
+ return new FhirIO.Search.Result(pct);
+ } else {
+ throw new IllegalArgumentException(
+ "The PCollection tuple must have the FhirIO.Search.OUT "
+ + "and FhirIO.Search.DEAD_LETTER tuple tags");
+ }
+ }
+
+ private Result(PCollectionTuple pct) {
+ this.pct = pct;
+ this.resources = pct.get(OUT);
+ this.failedSearches =
+
pct.get(DEAD_LETTER).setCoder(HealthcareIOErrorCoder.of(StringUtf8Coder.of()));
+ }
+
+ /**
+ * Gets failed searches.
+ *
+ * @return the failed searches
+ */
+ public PCollection<HealthcareIOError<String>> getFailedSearches() {
+ return failedSearches;
+ }
+
+ /**
+ * Gets resources.
+ *
+ * @return the resources
+ */
+ public PCollection<String> getResources() {
+ return resources;
+ }
+
+ @Override
+ public Pipeline getPipeline() {
+ return this.pct.getPipeline();
+ }
+
+ @Override
+ public Map<TupleTag<?>, PValue> expand() {
+ return ImmutableMap.of(OUT, resources);
+ }
+
+ @Override
+ public void finishSpecifyingOutput(
+ String transformName, PInput input, PTransform<?, ?> transform)
{}
+ }
+
+ /** The tag for the main output of Fhir Messages. */
+ public static final TupleTag<String> OUT = new TupleTag<String>() {};
+ /** The tag for the deadletter output of Fhir Messages. */
+ public static final TupleTag<HealthcareIOError<String>> DEAD_LETTER =
+ new TupleTag<HealthcareIOError<String>>() {};
+
+ @Override
+ public FhirIO.Search.Result expand(PCollection<KV<String, Map<String,
String>>> input) {
+ return input.apply("Fetch Fhir messages", new
SearchResourcesJsonString(this.fhirStore));
+ }
+
+ /**
+ * DoFn to fetch resources from an Google Cloud Healthcare FHIR store
based on search request
+ *
+ * <p>This DoFn consumes a {@link PCollection} of search requests
consisting of resource type
+ * and search parameters, and fetches all matching resources based on the
search criteria and
+ * will output a {@link PCollectionTuple} which contains the output and
dead-letter {@link
+ * PCollection}*.
+ *
+ * <p>The {@link PCollectionTuple} output will contain the following
{@link PCollection}:
+ *
+ * <ul>
+ * <li>{@link FhirIO.Search#OUT} - Contains all {@link PCollection}
records successfully search
+ * from the Fhir store.
+ * <li>{@link FhirIO.Search#DEAD_LETTER} - Contains all {@link
PCollection} of {@link
+ * HealthcareIOError}* of failed searches from the Fhir store, with
+ * error message and stacktrace.
+ * </ul>
+ */
+ static class SearchResourcesJsonString
+ extends PTransform<PCollection<KV<String, Map<String, String>>>,
FhirIO.Search.Result> {
+
+ private final ValueProvider<String> fhirStore;
+
+ /** Instantiates a new Search Fhir resources DoFn. */
+ public SearchResourcesJsonString(ValueProvider<String> fhirStore) {
+ this.fhirStore = fhirStore;
+ }
+
+ @Override
+ public FhirIO.Search.Result expand(PCollection<KV<String, Map<String,
String>>> resourceIds) {
+ return new FhirIO.Search.Result(
+ resourceIds.apply(
+ ParDo.of(new SearchResourcesFn(this.fhirStore))
+ .withOutputTags(FhirIO.Search.OUT,
TupleTagList.of(FhirIO.Search.DEAD_LETTER))));
+ }
+
+ /** DoFn for searching messages from the Fhir store with error handling.
*/
+ static class SearchResourcesFn extends DoFn<KV<String, Map<String,
String>>, String> {
+
+ private Counter failedSearches =
+ Metrics.counter(SearchResourcesFn.class,
"failed-fhir-searches");
+ private static final Logger LOG =
LoggerFactory.getLogger(SearchResourcesFn.class);
+ private final Counter successfulSearches =
+ Metrics.counter(SearchResourcesFn.class,
"successful-fhir-searches");
+ private HealthcareApiClient client;
+ private final ValueProvider<String> fhirStore;
+
+ /** Instantiates a new Fhir resources search fn. */
+ SearchResourcesFn(ValueProvider<String> fhirStore) {
+ this.fhirStore = fhirStore;
+ }
+
+ /**
+ * Instantiate healthcare client.
+ *
+ * @throws IOException the io exception
+ */
+ @Setup
+ public void instantiateHealthcareClient() throws IOException {
+ this.client = new HttpHealthcareApiClient();
+ }
+
+ /**
+ * Process element.
+ *
+ * @param context the context
+ */
+ @ProcessElement
+ public void processElement(ProcessContext context) {
+ KV<String, Map<String, String>> elementValues = context.element();
+ try {
+ context.output(searchResources(
+ this.client, this.fhirStore.toString(),
elementValues.getKey(), elementValues.getValue()));
+ } catch (Exception e) {
Review comment:
They will be handled by code that knows how to handle them. For example,
the code here probably should not be responsible for an OutOfMemoryError.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]