[
https://issues.apache.org/jira/browse/BEAM-14504?focusedWorklogId=775546&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-775546
]
ASF GitHub Bot logged work on BEAM-14504:
-----------------------------------------
Author: ASF GitHub Bot
Created on: 27/May/22 18:03
Start Date: 27/May/22 18:03
Worklog Time Spent: 10m
Work Description: msbukal commented on code in PR #17741:
URL: https://github.com/apache/beam/pull/17741#discussion_r883823754
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/FhirBundleResponse.java:
##########
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.healthcare;
+
+import com.google.auto.value.AutoValue;
+import java.util.Objects;
+import javax.annotation.Nullable;
+
+@AutoValue
+public abstract class FhirBundleResponse {
+
+ static FhirBundleResponse.Builder builder() {
+ return new AutoValue_FhirBundleResponse.Builder();
+ }
+
+ /**
+ * String representing the metadata of the Bundle to be written. Used to
pass metadata through the
+ * ExecuteBundles PTransform.
+ */
+ public abstract FhirBundleParameter getFhirBundleParameter();
+
+ /** FHIR R4 bundle resource object as a string. */
+ public abstract String getResponse();
+
+ public static FhirBundleResponse of(
+ FhirBundleParameter fhirBundleParameter, @Nullable String response) {
+ return FhirBundleResponse.builder()
+ .setFhirBundleParameter(fhirBundleParameter)
+ .setResponse(Objects.toString(response, ""))
+ .build();
+ }
+
+ public static FhirBundleResponse of(FhirBundleParameter fhirBundleParameter)
{
Review Comment:
Do we need this constructor?
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/FhirIO.java:
##########
@@ -1349,7 +1376,15 @@ public enum ContentStructure {
}
/** The type Execute bundles. */
- public static class ExecuteBundles extends Write {
+ public static class ExecuteBundles
+ extends PTransform<PCollection<FhirBundleParameter>,
ExecuteBundlesResult> {
+
+ /** The TupleTag used for bundles that were executed successfully. */
+ public static final TupleTag<FhirBundleResponse> SUCCESSFUL_BUNDLES = new
TupleTag<>();
+
+ /** The TupleTag used for bundles that failed to be executed for any
reason. */
+ public static final TupleTag<HealthcareIOError<FhirBundleResponse>>
FAILED_BUNDLES =
Review Comment:
I'd keep the HealthcareIOError as HealthcareIOError<FhirBundleParameter>
since you aren't adding a response to the FhirBundleResponse, so a user might
be confused why the response field is empty.
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/FhirIO.java:
##########
@@ -1492,20 +1513,23 @@ private void parseResponse(ProcessContext context,
String inputBody, HttpBody re
// 20X's are successes, otherwise failure.
if (statusCode / 100 == 2) {
success++;
- context.output(Write.SUCCESSFUL_BODY, entry.toString());
+ context.output(
+ SUCCESSFUL_BUNDLES, FhirBundleResponse.of(context.element(),
entry.toString()));
} else {
fail++;
context.output(
- Write.FAILED_BODY,
+ FAILED_BUNDLES,
HealthcareIOError.of(
- inputBody, HealthcareHttpException.of(statusCode,
entry.toString())));
+ FhirBundleResponse.of(context.element()),
+ HealthcareHttpException.of(statusCode,
entry.toString())));
}
}
EXECUTE_BUNDLE_RESOURCE_SUCCESS.inc(success);
EXECUTE_BUNDLE_RESOURCE_ERRORS.inc(fail);
} else if (bundleType.equals(BUNDLE_RESPONSE_TYPE_TRANSACTION)) {
EXECUTE_BUNDLE_RESOURCE_SUCCESS.inc(entries.size());
- context.output(Write.SUCCESSFUL_BODY, bundle.toString());
+ context.output(
+ SUCCESSFUL_BUNDLES, FhirBundleResponse.of(context.element(),
bundle.toString()));
Review Comment:
bundle.toString() -> bundle.getAsString()
I find getAsString tends to be cleaner, and doesn't include excess quotes.
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/FhirBundleResponse.java:
##########
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.healthcare;
+
+import com.google.auto.value.AutoValue;
+import java.util.Objects;
+import javax.annotation.Nullable;
+
+@AutoValue
+public abstract class FhirBundleResponse {
+
+ static FhirBundleResponse.Builder builder() {
+ return new AutoValue_FhirBundleResponse.Builder();
+ }
+
+ /**
+ * String representing the metadata of the Bundle to be written. Used to
pass metadata through the
Review Comment:
This comment is incorrect, please specify it's the input FhirBundleParameter.
##########
sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/healthcare/FhirIOWriteIT.java:
##########
@@ -121,7 +121,7 @@ public void testFhirIO_ExecuteBundle() throws IOException {
@Test
public void testFhirIO_ExecuteBundle_parseResponse() {
Review Comment:
Can you also add getSuccessfulBundles() calls to this test too, and verify
the responses? Just verifying they're not empty is fine.
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/FhirIO.java:
##########
@@ -1492,20 +1513,23 @@ private void parseResponse(ProcessContext context,
String inputBody, HttpBody re
// 20X's are successes, otherwise failure.
if (statusCode / 100 == 2) {
success++;
- context.output(Write.SUCCESSFUL_BODY, entry.toString());
+ context.output(
+ SUCCESSFUL_BUNDLES, FhirBundleResponse.of(context.element(),
entry.toString()));
} else {
fail++;
context.output(
- Write.FAILED_BODY,
+ FAILED_BUNDLES,
HealthcareIOError.of(
- inputBody, HealthcareHttpException.of(statusCode,
entry.toString())));
+ FhirBundleResponse.of(context.element()),
+ HealthcareHttpException.of(statusCode,
entry.toString())));
Review Comment:
entry.toString() -> entry.getAsString()
I find getAsString tends to be cleaner, and doesn't include excess quotes.
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/FhirBundleResponse.java:
##########
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.healthcare;
+
+import com.google.auto.value.AutoValue;
+import java.util.Objects;
+import javax.annotation.Nullable;
+
+@AutoValue
+public abstract class FhirBundleResponse {
+
+ static FhirBundleResponse.Builder builder() {
+ return new AutoValue_FhirBundleResponse.Builder();
+ }
+
+ /**
+ * String representing the metadata of the Bundle to be written. Used to
pass metadata through the
+ * ExecuteBundles PTransform.
+ */
+ public abstract FhirBundleParameter getFhirBundleParameter();
+
+ /** FHIR R4 bundle resource object as a string. */
Review Comment:
This comment is also wrong right? it's the HTTP response of the request, not
the resource.
Also specify that the value varies depending on BATCH vs TRANSACTION bundles
(entry vs whole response).
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/FhirIO.java:
##########
@@ -1524,6 +1548,101 @@ private int parseBundleStatus(String status) {
}
}
+ /**
+ * ExecuteBundlesResult contains both successfully executed bundles and
information help debugging
+ * failed executions (eg metadata & error msgs).
+ */
+ public static class ExecuteBundlesResult extends Write.AbstractResult {
+ private final Pipeline pipeline;
+ private final PCollection<FhirBundleResponse> successfulBundles;
+ private final PCollection<HealthcareIOError<FhirBundleResponse>>
failedBundles;
+
+ private ExecuteBundlesResult(
+ Pipeline pipeline,
+ PCollection<FhirBundleResponse> successfulBundles,
+ PCollection<HealthcareIOError<FhirBundleResponse>> failedBundles) {
+ this.pipeline = pipeline;
+ this.successfulBundles = successfulBundles;
+ this.failedBundles = failedBundles;
+ }
+
+ /**
+ * Entry point for the ExecuteBundlesResult, storing the successful and
failed bundles and their
+ * metadata.
+ */
+ public static ExecuteBundlesResult in(
+ Pipeline pipeline,
+ PCollection<FhirBundleResponse> successfulBundles,
+ PCollection<HealthcareIOError<FhirBundleResponse>> failedBundles) {
+ return new ExecuteBundlesResult(pipeline, successfulBundles,
failedBundles);
+ }
+
+ @Override
+ public PCollection<String> getSuccessfulBodies() {
+ return this.successfulBundles.apply(
+ MapElements.into(TypeDescriptors.strings())
+ .via(bundleResponse ->
bundleResponse.getFhirBundleParameter().getBundle()));
Review Comment:
Apply string coder at the end here.
Issue Time Tracking
-------------------
Worklog Id: (was: 775546)
Time Spent: 2h 50m (was: 2h 40m)
> Add support for including addittional parameters to executebundle method in
> fhirio.
> -----------------------------------------------------------------------------------
>
> Key: BEAM-14504
> URL: https://issues.apache.org/jira/browse/BEAM-14504
> Project: Beam
> Issue Type: Improvement
> Components: io-java-healthcare
> Reporter: Fathima Mohammed
> Assignee: Fathima Mohammed
> Priority: P2
> Time Spent: 2h 50m
> Remaining Estimate: 0h
>
> Add FhirBundleWithMetadata in executebundles method so that we can pass
> additional information like message id.
> FhirBundleWithMetadata represents a FHIR bundle, with it's metadata (eg. Hl7
> messageId) to be executed on the intermediate FHIR store.
--
This message was sent by Atlassian Jira
(v8.20.7#820007)