m-trieu commented on code in PR #30695:
URL: https://github.com/apache/beam/pull/30695#discussion_r1540231650


##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/failures/WorkFailureProcessor.java:
##########
@@ -103,37 +126,38 @@ private void executeWithDelay(long delayMs, Work work) {
   }
 
   private boolean shouldRetryLocally(String computationId, Work work, 
Throwable t) {
-    if (KeyTokenInvalidException.isKeyTokenInvalidException(t)) {
+    Throwable parsedException = t instanceof UserCodeException ? t.getCause() 
: t;
+    if (KeyTokenInvalidException.isKeyTokenInvalidException(parsedException)) {
       LOG.debug(
           "Execution of work for computation '{}' on key '{}' failed due to 
token expiration. "
               + "Work will not be retried locally.",
           computationId,
           work.getWorkItem().getKey().toStringUtf8());
-    } else if (WorkItemCancelledException.isWorkItemCancelledException(t)) {
+    } else if 
(WorkItemCancelledException.isWorkItemCancelledException(parsedException)) {
       LOG.debug(
           "Execution of work for computation '{}' on key '{}' failed. "
               + "Work will not be retried locally.",
           computationId,
           work.getWorkItem().getShardingKey());
     } else {
-      LastExceptionDataProvider.reportException(t);
+      LastExceptionDataProvider.reportException(parsedException);
       LOG.debug("Failed work: {}", work);
       Duration elapsedTimeSinceStart = new Duration(work.getStartTime(), 
clock.get());
-      if (!failureReporter.reportFailure(computationId, work.getWorkItem(), 
t)) {
+      if (!failureTracker.trackFailure(computationId, work.getWorkItem(), 
parsedException)) {
         LOG.error(
             "Execution of work for computation '{}' on key '{}' failed with 
uncaught exception, "
                 + "and Windmill indicated not to retry locally.",
             computationId,
             work.getWorkItem().getKey().toStringUtf8(),
-            t);
-      } else if (isOutOfMemoryError(t)) {
+            parsedException);
+      } else if (isOutOfMemoryError(parsedException)) {
         LOG.error(
             "Execution of work for computation '{}' for key '{}' failed with 
out-of-memory. "
                 + "Work will not be retried locally. Heap dump {}.",
             computationId,
             work.getWorkItem().getKey().toStringUtf8(),
-            getHeapDumpLog(),
-            t);
+            dumpHeap(),

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to