This is an automated email from the ASF dual-hosted git repository.

wlo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 05aa41c61 Use root cause for checking if exception is transient (#3585)
05aa41c61 is described below

commit 05aa41c61f3952362d84cfb4200ef2c361b9c748
Author: Jack Moseley <[email protected]>
AuthorDate: Fri Oct 21 16:06:09 2022 -0700

    Use root cause for checking if exception is transient (#3585)
---
 .../java/org/apache/gobblin/iceberg/writer/GobblinMCEWriter.java   | 2 +-
 .../org/apache/gobblin/iceberg/writer/GobblinMCEWriterTest.java    | 7 ++++++-
 2 files changed, 7 insertions(+), 2 deletions(-)

diff --git 
a/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/GobblinMCEWriter.java
 
b/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/GobblinMCEWriter.java
index 6773b7cad..5f77f14cf 100644
--- 
a/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/GobblinMCEWriter.java
+++ 
b/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/GobblinMCEWriter.java
@@ -439,7 +439,7 @@ public class GobblinMCEWriter implements 
DataWriter<GenericRecord> {
    * to avoid advancing watermarks and skipping GMCEs unnecessarily.
    */
   public static boolean isExceptionTransient(Exception e, Set<String> 
transientExceptionMessages) {
-    return transientExceptionMessages.stream().anyMatch(message -> 
e.getMessage().contains(message));
+    return transientExceptionMessages.stream().anyMatch(message -> 
Throwables.getRootCause(e).toString().contains(message));
   }
 
   /**
diff --git 
a/gobblin-iceberg/src/test/java/org/apache/gobblin/iceberg/writer/GobblinMCEWriterTest.java
 
b/gobblin-iceberg/src/test/java/org/apache/gobblin/iceberg/writer/GobblinMCEWriterTest.java
index e1c85f580..820fbe684 100644
--- 
a/gobblin-iceberg/src/test/java/org/apache/gobblin/iceberg/writer/GobblinMCEWriterTest.java
+++ 
b/gobblin-iceberg/src/test/java/org/apache/gobblin/iceberg/writer/GobblinMCEWriterTest.java
@@ -24,6 +24,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.RejectedExecutionException;
 import java.util.function.BiConsumer;
 import lombok.SneakyThrows;
 import org.apache.gobblin.configuration.State;
@@ -217,11 +218,15 @@ public class GobblinMCEWriterTest extends 
PowerMockTestCase {
 
   @Test
   public void testDetectTransientException() {
-    Set<String> transientExceptions = Sets.newHashSet("Filesystem closed", 
"Hive timeout");
+    Set<String> transientExceptions = Sets.newHashSet("Filesystem closed", 
"Hive timeout", "RejectedExecutionException");
     IOException transientException = new IOException("test1 Filesystem closed 
test");
+    IOException wrapperException = new IOException("wrapper exception", 
transientException);
     
Assert.assertTrue(GobblinMCEWriter.isExceptionTransient(transientException, 
transientExceptions));
+    Assert.assertTrue(GobblinMCEWriter.isExceptionTransient(wrapperException, 
transientExceptions));
     IOException nonTransientException = new IOException("Write failed due to 
bad schema");
     
Assert.assertFalse(GobblinMCEWriter.isExceptionTransient(nonTransientException, 
transientExceptions));
+    RejectedExecutionException rejectedExecutionException = new 
RejectedExecutionException("");
+    
Assert.assertTrue(GobblinMCEWriter.isExceptionTransient(rejectedExecutionException,
 transientExceptions));
   }
 
   @DataProvider(name="AllowMockMetadataWriter")

Reply via email to