damondouglas commented on code in PR #28137:
URL: https://github.com/apache/beam/pull/28137#discussion_r1318974007
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/DicomIO.java:
##########
@@ -196,4 +281,138 @@ public ReadStudyMetadata.Result
expand(PCollection<String> input) {
TupleTagList.of(ReadStudyMetadata.ERROR_MESSAGE))));
}
}
+
+ /**
+ * Increments success and failure counters for an LRO. To be used after the
LRO has completed.
+ * This function leverages the fact that the LRO metadata is always of the
format: "counter": {
+ * "success": "1", "failure": "1" }
+ *
+ * @param operation LRO operation object.
+ * @param operationSuccessCounter the success counter for the operation.
+ * @param operationFailureCounter the failure counter for the operation.
+ * @param resourceSuccessCounter the success counter for individual
resources in the operation.
+ * @param resourceFailureCounter the failure counter for individual
resources in the operation.
+ */
+ private static void incrementLroCounters(
+ Operation operation,
+ Counter operationSuccessCounter,
+ Counter operationFailureCounter,
+ Counter resourceSuccessCounter,
+ Counter resourceFailureCounter) {
+ // Update operation counters.
+ com.google.api.services.healthcare.v1.model.Status error =
operation.getError();
+ if (error == null) {
+ operationSuccessCounter.inc();
+ LOG.debug(String.format("Operation %s finished successfully.",
operation.getName()));
+ } else {
+ operationFailureCounter.inc();
+ LOG.error(
+ String.format(
+ "Operation %s failed with error code: %d and message: %s.",
+ operation.getName(), error.getCode(), error.getMessage()));
+ }
+
+ // Update resource counters.
+ Map<String, Object> opMetadata = operation.getMetadata();
+ if (opMetadata.containsKey(LRO_COUNTER_KEY)) {
+ try {
+ Map<String, String> counters = (Map<String, String>)
opMetadata.get(LRO_COUNTER_KEY);
+ if (counters.containsKey(LRO_SUCCESS_KEY)) {
+
resourceSuccessCounter.inc(Long.parseLong(counters.get(LRO_SUCCESS_KEY)));
+ }
+ if (counters.containsKey(LRO_FAILURE_KEY)) {
+ Long numFailures = Long.parseLong(counters.get(LRO_FAILURE_KEY));
+ resourceFailureCounter.inc(numFailures);
+ if (numFailures > 0) {
+ LOG.error("Operation " + operation.getName() + " had " +
numFailures + " failures.");
+ }
+ }
+ } catch (Exception e) {
+ LOG.error("failed to increment LRO counters, error message: " +
e.getMessage());
+ }
+ }
+ }
+
+ /** Deidentify DICOM resources from a DICOM store to a destination DICOM
store. */
+ public static class Deidentify extends PTransform<PBegin,
PCollection<String>> {
Review Comment:
You bring up a really good point in that my recommendation for
`PTransform<PBegin, Deidentify.Result>` would break the consistency. I see we
have three decisions:
1) Keep `PTransform<PBegin, PCollection<String>`
OR
2) Change to `PTransform<PBegin, Deidentify.Result>`
And 2) above branches into a decision whether to:
3) Change `Deidentify extends PTransform<PBegin, PCollection<String>>` to
`PTransform<PBegin, Deidentify.Result>`
I'm going to advocate for the following:
- This PR implements 2) above.
- Create a new issue that tracks a proposal to do 3)
- Comment in this PR // TODO: change signature to PTransform<PBegin,
Deidentify.Result> See: <URL to GitHub issue>
The reason for 2) is that any time we anticipate an error that is caught and
thrown in a ProcessElement method, I'd argue that we should instead be emitting
into a dead letter / error PCollection following the
https://beam.apache.org/documentation/programming-guide/#additional-outputs
pattern.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]