rdblue commented on a change in pull request #153: [Baseline] Apply baseline 
linting to iceberg-core
URL: https://github.com/apache/incubator-iceberg/pull/153#discussion_r276801272
 
 

 ##########
 File path: core/src/main/java/org/apache/iceberg/util/Tasks.java
 ##########
 @@ -197,80 +199,94 @@ public boolean run(Task<I, RuntimeException> task) {
       Iterator<I> iterator = items.iterator();
       boolean threw = true;
       try {
+        tryRunTasksSingleThreaded(task, succeeded, exceptions, iterator);
+        threw = false;
+      } finally {
+        handleSingleThreadedTaskFailures(succeeded, exceptions, iterator, 
threw);
+      }
+
+      if (throwFailureWhenFinished && !exceptions.isEmpty()) {
+        Tasks.throwOne(exceptions, exceptionClass);
+      } else if (throwFailureWhenFinished && threw) {
+        throw new RuntimeException(
+            "Task set failed with an uncaught throwable");
+      }
+
+      return !threw;
+    }
+
+    private void handleSingleThreadedTaskFailures(
+        List<I> succeeded, List<Throwable> exceptions, Iterator<I> iterator, 
boolean threw) {
+      // threw handles exceptions that were *not* caught by the catch block,
+      // and exceptions that were caught and possibly handled by onFailure
+      // are kept in exceptions.
+      if (threw || !exceptions.isEmpty()) {
+        revertSingleThreadedTaskIfNecessary(succeeded);
+        abortSingleThreadedTaskIfNecessary(iterator);
+      }
+    }
+
+    private void abortSingleThreadedTaskIfNecessary(Iterator<I> iterator) {
+      if (abortTask != null) {
+        boolean failed = false;
         while (iterator.hasNext()) {
-          I item = iterator.next();
           try {
-            runTaskWithRetry(task, item);
-            succeeded.add(item);
-
+            abortTask.run(iterator.next());
           } catch (Exception e) {
-            exceptions.add(e);
-
-            if (onFailure != null) {
-              try {
-                onFailure.run(item, e);
-              } catch (Exception failException) {
-                e.addSuppressed(failException);
-                LOG.error("Failed to clean up on failure", e);
-                // keep going
-              }
-            }
+            failed = true;
+            LOG.error("Failed to abort task", e);
+            // keep going
+          }
+          if (stopAbortsOnFailure && failed) {
+            break;
+          }
+        }
+      }
+    }
 
-            if (stopOnFailure) {
-              break;
-            }
+    private void revertSingleThreadedTaskIfNecessary(List<I> succeeded) {
+      if (revertTask != null) {
+        boolean failed = false;
+        for (I item : succeeded) {
+          try {
+            revertTask.run(item);
+          } catch (Exception e) {
+            failed = true;
+            LOG.error("Failed to revert task", e);
+            // keep going
+          }
+          if (stopRevertsOnFailure && failed) {
+            break;
           }
         }
+      }
+    }
 
-        threw = false;
+    private <E extends Exception> void tryRunTasksSingleThreaded(
+        Task<I, E> task, List<I> succeeded, List<Throwable> exceptions, 
Iterator<I> iterator) {
 
 Review comment:
   I'm not a big fan of out variables passed into methods like this. It's 
better to return the exceptions that were thrown.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to