This is an automated email from the ASF dual-hosted git repository.
wlo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 5338bdc5b Add matching of non-transient exceptions that will avoid
failing the container in GMIP (#3662)
5338bdc5b is described below
commit 5338bdc5b8179ff1e647190d699edf225bfc7e77
Author: Jack Moseley <[email protected]>
AuthorDate: Fri Mar 31 14:02:21 2023 -0700
Add matching of non-transient exceptions that will avoid failing the
container in GMIP (#3662)
* Add matching of non-transient exceptions that will avoid failing the
container in GMIP
* Add separate variable for currently seen dataset errors
---
.../gobblin/iceberg/writer/GobblinMCEWriter.java | 26 +++++++++++++++-------
.../iceberg/writer/GobblinMCEWriterTest.java | 8 +++----
2 files changed, 22 insertions(+), 12 deletions(-)
diff --git
a/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/GobblinMCEWriter.java
b/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/GobblinMCEWriter.java
index c7a7a5481..d2d72a969 100644
---
a/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/GobblinMCEWriter.java
+++
b/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/GobblinMCEWriter.java
@@ -107,6 +107,7 @@ public class GobblinMCEWriter implements
DataWriter<GenericRecord> {
public static final String GMCE_METADATA_WRITER_CLASSES =
"gmce.metadata.writer.classes";
public static final String GMCE_METADATA_WRITER_MAX_ERROR_DATASET =
"gmce.metadata.writer.max.error.dataset";
public static final String TRANSIENT_EXCEPTION_MESSAGES_KEY =
"gmce.metadata.writer.transient.exception.messages";
+ public static final String NON_TRANSIENT_EXCEPTION_MESSAGES_KEY =
"gmce.metadata.writer.nonTransient.exception.messages";
public static final int DEFUALT_GMCE_METADATA_WRITER_MAX_ERROR_DATASET = 0;
public static final int DEFAULT_ICEBERG_PARALLEL_TIMEOUT_MILLS = 60000;
public static final String TABLE_NAME_DELIMITER = ".";
@@ -124,10 +125,12 @@ public class GobblinMCEWriter implements
DataWriter<GenericRecord> {
private Map<String, List<HiveRegistrationUnit.Column>> partitionKeysMap;
private Closer closer = Closer.create();
protected final AtomicLong recordCount = new AtomicLong(0L);
+ private final Set<String> currentErrorDatasets = new HashSet<>();
@Setter
private int maxErrorDataset;
protected EventSubmitter eventSubmitter;
private final Set<String> transientExceptionMessages;
+ private final Set<String> nonTransientExceptionMessages;
@AllArgsConstructor
static class TableStatus {
@@ -161,6 +164,7 @@ public class GobblinMCEWriter implements
DataWriter<GenericRecord> {
MetricContext metricContext = Instrumented.getMetricContext(state,
this.getClass(), tags);
eventSubmitter = new EventSubmitter.Builder(metricContext,
GOBBLIN_MCE_WRITER_METRIC_NAMESPACE).build();
transientExceptionMessages = new
HashSet<>(properties.getPropAsList(TRANSIENT_EXCEPTION_MESSAGES_KEY, ""));
+ nonTransientExceptionMessages = new
HashSet<>(properties.getPropAsList(NON_TRANSIENT_EXCEPTION_MESSAGES_KEY, ""));
}
@Override
@@ -339,7 +343,7 @@ public class GobblinMCEWriter implements
DataWriter<GenericRecord> {
try {
writer.writeEnvelope(recordEnvelope, newSpecsMap, oldSpecsMap, spec);
} catch (Exception e) {
- if (isExceptionTransient(e, transientExceptionMessages)) {
+ if (exceptionMatches(e, transientExceptionMessages)) {
throw new RuntimeException("Failing container due to transient
exception for db: " + dbName + " table: " + tableName, e);
}
meetException = true;
@@ -390,8 +394,13 @@ public class GobblinMCEWriter implements
DataWriter<GenericRecord> {
lastException.droppedPartitionValues.addAll(((HiveMetadataWriterWithPartitionInfoException)
e).droppedPartitionValues);
}
this.datasetErrorMap.put(tableStatus.datasetPath, tableErrorMap);
- log.error(String.format("Meet exception when flush table %s",
tableString), e);
- if (datasetErrorMap.size() > maxErrorDataset) {
+ if (!exceptionMatches(e, this.nonTransientExceptionMessages)) {
+ currentErrorDatasets.add(tableStatus.datasetPath);
+ log.error(String.format("Meet exception when flush table %s",
tableString), e);
+ } else {
+ log.error(String.format("Detected known non-transient failure for table
%s", tableString), e);
+ }
+ if (currentErrorDatasets.size() > maxErrorDataset) {
//Fail the job if the error size exceeds some number
throw new IOException(String.format("Container fails to flush for more
than %s dataset, last exception we met is: ", maxErrorDataset), e);
}
@@ -412,7 +421,7 @@ public class GobblinMCEWriter implements
DataWriter<GenericRecord> {
try {
writer.flush(dbName, tableName);
} catch (IOException e) {
- if (isExceptionTransient(e, transientExceptionMessages)) {
+ if (exceptionMatches(e, transientExceptionMessages)) {
throw new RuntimeException("Failing container due to transient
exception for db: " + dbName + " table: " + tableName, e);
}
meetException = true;
@@ -435,11 +444,12 @@ public class GobblinMCEWriter implements
DataWriter<GenericRecord> {
}
/**
- * Check if exception is contained within a known list of transient
exceptions. These exceptions should not be caught
- * to avoid advancing watermarks and skipping GMCEs unnecessarily.
+ * Check if exception is contained within a known list of known exceptions.
Transient exceptions should not be caught
+ * to avoid advancing watermarks and skipping GMCEs unnecessarily, while
non-transient exceptions should not count
+ * towards the maximum number of failed datasets.
*/
- public static boolean isExceptionTransient(Exception e, Set<String>
transientExceptionMessages) {
- return transientExceptionMessages.stream().anyMatch(message ->
Throwables.getRootCause(e).toString().contains(message));
+ public static boolean exceptionMatches(Exception e, Set<String>
exceptionMessages) {
+ return exceptionMessages.stream().anyMatch(message ->
Throwables.getRootCause(e).toString().contains(message));
}
/**
diff --git
a/gobblin-iceberg/src/test/java/org/apache/gobblin/iceberg/writer/GobblinMCEWriterTest.java
b/gobblin-iceberg/src/test/java/org/apache/gobblin/iceberg/writer/GobblinMCEWriterTest.java
index 129817b5b..415a1e5c8 100644
---
a/gobblin-iceberg/src/test/java/org/apache/gobblin/iceberg/writer/GobblinMCEWriterTest.java
+++
b/gobblin-iceberg/src/test/java/org/apache/gobblin/iceberg/writer/GobblinMCEWriterTest.java
@@ -213,12 +213,12 @@ public class GobblinMCEWriterTest {
Set<String> transientExceptions = Sets.newHashSet("Filesystem closed",
"Hive timeout", "RejectedExecutionException");
IOException transientException = new IOException("test1 Filesystem closed
test");
IOException wrapperException = new IOException("wrapper exception",
transientException);
-
Assert.assertTrue(GobblinMCEWriter.isExceptionTransient(transientException,
transientExceptions));
- Assert.assertTrue(GobblinMCEWriter.isExceptionTransient(wrapperException,
transientExceptions));
+ Assert.assertTrue(GobblinMCEWriter.exceptionMatches(transientException,
transientExceptions));
+ Assert.assertTrue(GobblinMCEWriter.exceptionMatches(wrapperException,
transientExceptions));
IOException nonTransientException = new IOException("Write failed due to
bad schema");
-
Assert.assertFalse(GobblinMCEWriter.isExceptionTransient(nonTransientException,
transientExceptions));
+
Assert.assertFalse(GobblinMCEWriter.exceptionMatches(nonTransientException,
transientExceptions));
RejectedExecutionException rejectedExecutionException = new
RejectedExecutionException("");
-
Assert.assertTrue(GobblinMCEWriter.isExceptionTransient(rejectedExecutionException,
transientExceptions));
+
Assert.assertTrue(GobblinMCEWriter.exceptionMatches(rejectedExecutionException,
transientExceptions));
}
@DataProvider(name="AllowMockMetadataWriter")