This is an automated email from the ASF dual-hosted git repository.

wlo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 5338bdc5b Add matching of non-transient exceptions that will avoid 
failing the container in GMIP (#3662)
5338bdc5b is described below

commit 5338bdc5b8179ff1e647190d699edf225bfc7e77
Author: Jack Moseley <[email protected]>
AuthorDate: Fri Mar 31 14:02:21 2023 -0700

    Add matching of non-transient exceptions that will avoid failing the 
container in GMIP (#3662)
    
    * Add matching of non-transient exceptions that will avoid failing the 
container in GMIP
    
    * Add separate variable for currently seen dataset errors
---
 .../gobblin/iceberg/writer/GobblinMCEWriter.java   | 26 +++++++++++++++-------
 .../iceberg/writer/GobblinMCEWriterTest.java       |  8 +++----
 2 files changed, 22 insertions(+), 12 deletions(-)

diff --git 
a/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/GobblinMCEWriter.java
 
b/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/GobblinMCEWriter.java
index c7a7a5481..d2d72a969 100644
--- 
a/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/GobblinMCEWriter.java
+++ 
b/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/GobblinMCEWriter.java
@@ -107,6 +107,7 @@ public class GobblinMCEWriter implements 
DataWriter<GenericRecord> {
   public static final String GMCE_METADATA_WRITER_CLASSES = 
"gmce.metadata.writer.classes";
   public static final String GMCE_METADATA_WRITER_MAX_ERROR_DATASET = 
"gmce.metadata.writer.max.error.dataset";
   public static final String TRANSIENT_EXCEPTION_MESSAGES_KEY = 
"gmce.metadata.writer.transient.exception.messages";
+  public static final String NON_TRANSIENT_EXCEPTION_MESSAGES_KEY = 
"gmce.metadata.writer.nonTransient.exception.messages";
   public static final int DEFUALT_GMCE_METADATA_WRITER_MAX_ERROR_DATASET = 0;
   public static final int DEFAULT_ICEBERG_PARALLEL_TIMEOUT_MILLS = 60000;
   public static final String TABLE_NAME_DELIMITER = ".";
@@ -124,10 +125,12 @@ public class GobblinMCEWriter implements 
DataWriter<GenericRecord> {
   private Map<String, List<HiveRegistrationUnit.Column>> partitionKeysMap;
   private Closer closer = Closer.create();
   protected final AtomicLong recordCount = new AtomicLong(0L);
+  private final Set<String> currentErrorDatasets = new HashSet<>();
   @Setter
   private int maxErrorDataset;
   protected EventSubmitter eventSubmitter;
   private final Set<String> transientExceptionMessages;
+  private final Set<String> nonTransientExceptionMessages;
 
   @AllArgsConstructor
   static class TableStatus {
@@ -161,6 +164,7 @@ public class GobblinMCEWriter implements 
DataWriter<GenericRecord> {
     MetricContext metricContext = Instrumented.getMetricContext(state, 
this.getClass(), tags);
     eventSubmitter = new EventSubmitter.Builder(metricContext, 
GOBBLIN_MCE_WRITER_METRIC_NAMESPACE).build();
     transientExceptionMessages = new 
HashSet<>(properties.getPropAsList(TRANSIENT_EXCEPTION_MESSAGES_KEY, ""));
+    nonTransientExceptionMessages = new 
HashSet<>(properties.getPropAsList(NON_TRANSIENT_EXCEPTION_MESSAGES_KEY, ""));
   }
 
   @Override
@@ -339,7 +343,7 @@ public class GobblinMCEWriter implements 
DataWriter<GenericRecord> {
         try {
           writer.writeEnvelope(recordEnvelope, newSpecsMap, oldSpecsMap, spec);
         } catch (Exception e) {
-          if (isExceptionTransient(e, transientExceptionMessages)) {
+          if (exceptionMatches(e, transientExceptionMessages)) {
             throw new RuntimeException("Failing container due to transient 
exception for db: " + dbName + " table: " + tableName, e);
           }
           meetException = true;
@@ -390,8 +394,13 @@ public class GobblinMCEWriter implements 
DataWriter<GenericRecord> {
       
lastException.droppedPartitionValues.addAll(((HiveMetadataWriterWithPartitionInfoException)
 e).droppedPartitionValues);
     }
     this.datasetErrorMap.put(tableStatus.datasetPath, tableErrorMap);
-    log.error(String.format("Meet exception when flush table %s", 
tableString), e);
-    if (datasetErrorMap.size() > maxErrorDataset) {
+    if (!exceptionMatches(e, this.nonTransientExceptionMessages)) {
+      currentErrorDatasets.add(tableStatus.datasetPath);
+      log.error(String.format("Meet exception when flush table %s", 
tableString), e);
+    } else {
+      log.error(String.format("Detected known non-transient failure for table 
%s", tableString), e);
+    }
+    if (currentErrorDatasets.size() > maxErrorDataset) {
       //Fail the job if the error size exceeds some number
       throw new IOException(String.format("Container fails to flush for more 
than %s dataset, last exception we met is: ", maxErrorDataset), e);
     }
@@ -412,7 +421,7 @@ public class GobblinMCEWriter implements 
DataWriter<GenericRecord> {
         try {
           writer.flush(dbName, tableName);
         } catch (IOException e) {
-          if (isExceptionTransient(e, transientExceptionMessages)) {
+          if (exceptionMatches(e, transientExceptionMessages)) {
             throw new RuntimeException("Failing container due to transient 
exception for db: " + dbName + " table: " + tableName, e);
           }
           meetException = true;
@@ -435,11 +444,12 @@ public class GobblinMCEWriter implements 
DataWriter<GenericRecord> {
   }
 
   /**
-   * Check if exception is contained within a known list of transient 
exceptions. These exceptions should not be caught
-   * to avoid advancing watermarks and skipping GMCEs unnecessarily.
+   * Check if exception is contained within a known list of known exceptions. 
Transient exceptions should not be caught
+   * to avoid advancing watermarks and skipping GMCEs unnecessarily, while 
non-transient exceptions should not count
+   * towards the maximum number of failed datasets.
    */
-  public static boolean isExceptionTransient(Exception e, Set<String> 
transientExceptionMessages) {
-    return transientExceptionMessages.stream().anyMatch(message -> 
Throwables.getRootCause(e).toString().contains(message));
+  public static boolean exceptionMatches(Exception e, Set<String> 
exceptionMessages) {
+    return exceptionMessages.stream().anyMatch(message -> 
Throwables.getRootCause(e).toString().contains(message));
   }
 
   /**
diff --git 
a/gobblin-iceberg/src/test/java/org/apache/gobblin/iceberg/writer/GobblinMCEWriterTest.java
 
b/gobblin-iceberg/src/test/java/org/apache/gobblin/iceberg/writer/GobblinMCEWriterTest.java
index 129817b5b..415a1e5c8 100644
--- 
a/gobblin-iceberg/src/test/java/org/apache/gobblin/iceberg/writer/GobblinMCEWriterTest.java
+++ 
b/gobblin-iceberg/src/test/java/org/apache/gobblin/iceberg/writer/GobblinMCEWriterTest.java
@@ -213,12 +213,12 @@ public class GobblinMCEWriterTest {
     Set<String> transientExceptions = Sets.newHashSet("Filesystem closed", 
"Hive timeout", "RejectedExecutionException");
     IOException transientException = new IOException("test1 Filesystem closed 
test");
     IOException wrapperException = new IOException("wrapper exception", 
transientException);
-    
Assert.assertTrue(GobblinMCEWriter.isExceptionTransient(transientException, 
transientExceptions));
-    Assert.assertTrue(GobblinMCEWriter.isExceptionTransient(wrapperException, 
transientExceptions));
+    Assert.assertTrue(GobblinMCEWriter.exceptionMatches(transientException, 
transientExceptions));
+    Assert.assertTrue(GobblinMCEWriter.exceptionMatches(wrapperException, 
transientExceptions));
     IOException nonTransientException = new IOException("Write failed due to 
bad schema");
-    
Assert.assertFalse(GobblinMCEWriter.isExceptionTransient(nonTransientException, 
transientExceptions));
+    
Assert.assertFalse(GobblinMCEWriter.exceptionMatches(nonTransientException, 
transientExceptions));
     RejectedExecutionException rejectedExecutionException = new 
RejectedExecutionException("");
-    
Assert.assertTrue(GobblinMCEWriter.isExceptionTransient(rejectedExecutionException,
 transientExceptions));
+    
Assert.assertTrue(GobblinMCEWriter.exceptionMatches(rejectedExecutionException, 
transientExceptions));
   }
 
   @DataProvider(name="AllowMockMetadataWriter")

Reply via email to