rdblue commented on a change in pull request #153: [Baseline] Apply baseline 
linting to iceberg-core
URL: https://github.com/apache/incubator-iceberg/pull/153#discussion_r276797739
 
 

 ##########
 File path: core/src/main/java/org/apache/iceberg/util/Tasks.java
 ##########
 @@ -385,128 +401,117 @@ public void run() {
       return !taskFailed.get();
     }
 
-    private <E extends Exception> void runTaskWithRetry(Task<I, E> task, I 
item)
-        throws E {
+    private <E extends Exception> void runTaskWithRetry(Task<I, E> task, I 
item) throws E {
       long start = System.currentTimeMillis();
       int attempt = 0;
       while (true) {
         attempt += 1;
         try {
           task.run(item);
           break;
-
         } catch (Exception e) {
-          long durationMs = System.currentTimeMillis() - start;
-          if (attempt >= maxAttempts || (durationMs > maxDurationMs && attempt 
> 1)) {
-            if (durationMs > maxDurationMs) {
-              LOG.info("Stopping retries after {} ms", durationMs);
-            }
+          if (shouldPropagate(start, attempt, e)) {
             throw e;
-          }
-
-          if (onlyRetryExceptions != null) {
-            // if onlyRetryExceptions are present, then this retries if one is 
found
-            boolean matchedRetryException = false;
-            for (Class<? extends Exception> exClass : onlyRetryExceptions) {
-              if (exClass.isInstance(e)) {
-                matchedRetryException = true;
-              }
-            }
-            if (!matchedRetryException) {
-              throw e;
-            }
-
           } else {
-            // otherwise, always retry unless one of the stop exceptions is 
found
-            for (Class<? extends Exception> exClass : stopRetryExceptions) {
-              if (exClass.isInstance(e)) {
-                throw e;
-              }
+            int delayMs = (int) Math.min(
+                minSleepTimeMs * Math.pow(scaleFactor, attempt - 1), 
maxSleepTimeMs);
+            int jitter = ThreadLocalRandom.current().nextInt(Math.max(1, (int) 
(delayMs * 0.1)));
+            LOG.warn("Retrying task after failure: {}", e.getMessage(), e);
+            try {
+              TimeUnit.MILLISECONDS.sleep(delayMs + jitter);
+            } catch (InterruptedException ie) {
+              Thread.currentThread().interrupt();
+              throw new RuntimeException(ie);
             }
           }
+        }
+      }
+    }
 
-          int delayMs = (int) Math.min(
-              minSleepTimeMs * Math.pow(scaleFactor, attempt - 1),
-              maxSleepTimeMs);
-          int jitter = ThreadLocalRandom.current()
-              .nextInt(Math.max(1, (int) (delayMs * 0.1)));
+    private <E extends Exception> boolean shouldPropagate(long start, int 
attempt, E exception) {
+      long durationMs = System.currentTimeMillis() - start;
+      if (attempt >= maxAttempts || (durationMs > maxDurationMs && attempt > 
1)) {
+        if (durationMs > maxDurationMs) {
+          LOG.info("Stopping retries after {} ms", durationMs);
+        }
+        return true;
+      }
 
-          LOG.warn("Retrying task after failure: " + e.getMessage(), e);
+      if (onlyRetryExceptions != null) {
+        // if onlyRetryExceptions are present, then this retries if one is 
found
+        boolean matchedRetryException = false;
+        for (Class<? extends Exception> exClass : onlyRetryExceptions) {
+          if (exClass.isInstance(exception)) {
+            matchedRetryException = true;
+          }
+        }
+        if (!matchedRetryException) {
+          return true;
+        }
 
-          try {
-            TimeUnit.MILLISECONDS.sleep(delayMs + jitter);
-          } catch (InterruptedException ie) {
-            Thread.currentThread().interrupt();
-            throw new RuntimeException(ie);
+      } else {
+        // otherwise, always retry unless one of the stop exceptions is found
+        for (Class<? extends Exception> exClass : stopRetryExceptions) {
+          if (exClass.isInstance(exception)) {
+            return true;
           }
         }
       }
+      return false;
     }
   }
 
-  private static Collection<Throwable> waitFor(Collection<Future<?>> futures)
-      throws Error {
+  private static Collection<Throwable> waitFor(Collection<Future<?>> futures) {
     while (true) {
-      int numFinished = 0;
-      for (Future<?> future : futures) {
-        if (future.isDone()) {
-          numFinished += 1;
-        }
-      }
-
+      long numFinished = futures.stream().filter(Future::isDone).count();
       if (numFinished == futures.size()) {
         List<Throwable> uncaught = new ArrayList<>();
-          // all of the futures are done, get any uncaught exceptions
-          for (Future<?> future : futures) {
-            try {
-              future.get();
-
-            } catch (InterruptedException e) {
-              LOG.warn("Interrupted while getting future results", e);
-              for (Throwable t : uncaught) {
-                e.addSuppressed(t);
-              }
-              Thread.currentThread().interrupt();
-              throw new RuntimeException(e);
-
-            } catch (CancellationException e) {
-              // ignore cancellations
-
-            } catch (ExecutionException e) {
-              Throwable cause = e.getCause();
-              if (Error.class.isInstance(cause)) {
-                for (Throwable t : uncaught) {
-                  cause.addSuppressed(t);
-                }
-                throw (Error) cause;
-              }
-
-              if (cause != null) {
-                uncaught.add(e);
-              }
-
-              LOG.warn("Task threw uncaught exception", cause);
-            }
-          }
-
+        // all of the futures are done, get any uncaught exceptions
+        futures.forEach(future -> 
uncaught.addAll(extractUncaughtException(future)));
         return uncaught;
-
       } else {
         try {
           Thread.sleep(10);
         } catch (InterruptedException e) {
           LOG.warn("Interrupted while waiting for tasks to finish", e);
-
-          for (Future<?> future : futures) {
-            future.cancel(true);
-          }
+          futures.forEach(future -> future.cancel(true));
 
 Review comment:
   There are already lots of changes to this file, so I'd prefer not making 
these cosmetic ones.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to