[ 
https://issues.apache.org/jira/browse/GOBBLIN-1565?focusedWorklogId=675703&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-675703
 ]

ASF GitHub Bot logged work on GOBBLIN-1565:
-------------------------------------------

                Author: ASF GitHub Bot
            Created on: 04/Nov/21 01:19
            Start Date: 04/Nov/21 01:19
    Worklog Time Spent: 10m 
      Work Description: ZihanLi58 commented on a change in pull request #3419:
URL: https://github.com/apache/gobblin/pull/3419#discussion_r742452232



##########
File path: 
gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/GobblinMCEWriter.java
##########
@@ -248,20 +272,88 @@ public void writeEnvelope(RecordEnvelope<GenericRecord> 
recordEnvelope) throws I
       String dbName = spec.getTable().getDbName();
       String tableName = spec.getTable().getTableName();
       String tableString = Joiner.on(TABLE_NAME_DELIMITER).join(dbName, 
tableName);
-      if (tableOperationTypeMap.containsKey(tableString)
-          && tableOperationTypeMap.get(tableString) != 
gmce.getOperationType()) {
-        for (MetadataWriter writer : metadataWriters) {
-          writer.flush(dbName, tableName);
-        }
-      }
-      tableOperationTypeMap.put(tableString, gmce.getOperationType());
-      for (MetadataWriter writer : metadataWriters) {
-        writer.writeEnvelope(recordEnvelope, newSpecsMap, oldSpecsMap, spec);
+      if (!tableOperationTypeMap.containsKey(tableString)) {
+        tableOperationTypeMap.put(tableString, new 
TableStatus(gmce.getOperationType(),
+            gmce.getDatasetIdentifier().getNativeName(), 
watermark.getTopicPartition().getTopicName(), 
watermark.getTopicPartition().getId(),
+            watermark.getLwm().getValue()-1, watermark.getLwm().getValue()));
+      } else if (tableOperationTypeMap.get(tableString).operationType != 
gmce.getOperationType()) {
+        flush(dbName, tableName);
+        tableOperationTypeMap.put(tableString, new 
TableStatus(gmce.getOperationType(),
+            gmce.getDatasetIdentifier().getNativeName(), 
watermark.getTopicPartition().getTopicName(), 
watermark.getTopicPartition().getId(),
+            watermark.getLwm().getValue()-1, watermark.getLwm().getValue()));
       }
+      tableOperationTypeMap.get(tableString).gmceHighWatermark = 
watermark.getLwm().getValue();
+      write(recordEnvelope, newSpecsMap, oldSpecsMap, spec);
     }
     this.recordCount.incrementAndGet();
   }
 
+  // Add fault tolerant ability and make sure we can emit GTE as desired
+  private void write(RecordEnvelope recordEnvelope, ConcurrentHashMap 
newSpecsMap, ConcurrentHashMap oldSpecsMap, HiveSpec spec) throws IOException {
+    boolean meetException = false;
+    String dbName = spec.getTable().getDbName();
+    String tableName = spec.getTable().getTableName();
+    String tableString = Joiner.on(TABLE_NAME_DELIMITER).join(dbName, 
tableName);
+    for (MetadataWriter writer : metadataWriters) {
+      if (meetException) {
+        writer.reset(dbName, tableName);
+      } else {
+        try {
+          writer.writeEnvelope(recordEnvelope, newSpecsMap, oldSpecsMap, spec);
+        } catch (Exception e) {
+          meetException = true;
+          addException(e, tableString, dbName, tableName);
+        }
+      }
+    }
+  }
+  private void addException(Exception e, String tableString, String dbName, 
String tableName) throws IOException{
+    TableStatus tableStatus = tableOperationTypeMap.get(tableString);
+    Map<String, Queue<MetadataFlushException>> tableErrorMap = 
this.datasetErrorMap.getOrDefault(tableStatus.datasetName, new HashMap<>());
+    Queue<MetadataFlushException> errors = 
tableErrorMap.getOrDefault(tableString, new LinkedList<>());
+    if (!errors.isEmpty() && errors.peek().highWatermark == 
tableStatus.gmceLowWatermark) {
+      errors.peek().highWatermark = tableStatus.gmceHighWatermark;
+    } else {
+      MetadataFlushException metadataFlushException =
+          new MetadataFlushException(tableStatus.datasetName, dbName, 
tableName, tableStatus.gmceTopicName, tableStatus.gmceTopicPartition, 
tableStatus.gmceLowWatermark, tableStatus.gmceHighWatermark, e);
+      errors.offer(metadataFlushException);
+    }
+    tableErrorMap.put(tableString, errors);
+    this.datasetErrorMap.put(tableStatus.datasetName, tableErrorMap);
+    log.error(String.format("Meet exception when flush table %s", 
tableString), e);
+    if (datasetErrorMap.size() > maxErrorDataset) {
+      //Fail the job if the error size exceeds some number
+      throw new IOException(String.format("Container fails to flush for more 
than %s dataset", maxErrorDataset));

Review comment:
       I print out the exception also we will emit GTE for those exception. But 
will update the exception to contain the last exception we met

##########
File path: 
gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/GobblinMCEWriter.java
##########
@@ -248,20 +272,88 @@ public void writeEnvelope(RecordEnvelope<GenericRecord> 
recordEnvelope) throws I
       String dbName = spec.getTable().getDbName();
       String tableName = spec.getTable().getTableName();
       String tableString = Joiner.on(TABLE_NAME_DELIMITER).join(dbName, 
tableName);
-      if (tableOperationTypeMap.containsKey(tableString)
-          && tableOperationTypeMap.get(tableString) != 
gmce.getOperationType()) {
-        for (MetadataWriter writer : metadataWriters) {
-          writer.flush(dbName, tableName);
-        }
-      }
-      tableOperationTypeMap.put(tableString, gmce.getOperationType());
-      for (MetadataWriter writer : metadataWriters) {
-        writer.writeEnvelope(recordEnvelope, newSpecsMap, oldSpecsMap, spec);
+      if (!tableOperationTypeMap.containsKey(tableString)) {
+        tableOperationTypeMap.put(tableString, new 
TableStatus(gmce.getOperationType(),
+            gmce.getDatasetIdentifier().getNativeName(), 
watermark.getTopicPartition().getTopicName(), 
watermark.getTopicPartition().getId(),
+            watermark.getLwm().getValue()-1, watermark.getLwm().getValue()));
+      } else if (tableOperationTypeMap.get(tableString).operationType != 
gmce.getOperationType()) {
+        flush(dbName, tableName);
+        tableOperationTypeMap.put(tableString, new 
TableStatus(gmce.getOperationType(),
+            gmce.getDatasetIdentifier().getNativeName(), 
watermark.getTopicPartition().getTopicName(), 
watermark.getTopicPartition().getId(),
+            watermark.getLwm().getValue()-1, watermark.getLwm().getValue()));
       }
+      tableOperationTypeMap.get(tableString).gmceHighWatermark = 
watermark.getLwm().getValue();
+      write(recordEnvelope, newSpecsMap, oldSpecsMap, spec);
     }
     this.recordCount.incrementAndGet();
   }
 
+  // Add fault tolerant ability and make sure we can emit GTE as desired
+  private void write(RecordEnvelope recordEnvelope, ConcurrentHashMap 
newSpecsMap, ConcurrentHashMap oldSpecsMap, HiveSpec spec) throws IOException {
+    boolean meetException = false;
+    String dbName = spec.getTable().getDbName();
+    String tableName = spec.getTable().getTableName();
+    String tableString = Joiner.on(TABLE_NAME_DELIMITER).join(dbName, 
tableName);
+    for (MetadataWriter writer : metadataWriters) {
+      if (meetException) {
+        writer.reset(dbName, tableName);
+      } else {
+        try {
+          writer.writeEnvelope(recordEnvelope, newSpecsMap, oldSpecsMap, spec);
+        } catch (Exception e) {
+          meetException = true;
+          addException(e, tableString, dbName, tableName);
+        }
+      }
+    }
+  }
+  private void addException(Exception e, String tableString, String dbName, 
String tableName) throws IOException{
+    TableStatus tableStatus = tableOperationTypeMap.get(tableString);
+    Map<String, Queue<MetadataFlushException>> tableErrorMap = 
this.datasetErrorMap.getOrDefault(tableStatus.datasetName, new HashMap<>());
+    Queue<MetadataFlushException> errors = 
tableErrorMap.getOrDefault(tableString, new LinkedList<>());
+    if (!errors.isEmpty() && errors.peek().highWatermark == 
tableStatus.gmceLowWatermark) {
+      errors.peek().highWatermark = tableStatus.gmceHighWatermark;
+    } else {
+      MetadataFlushException metadataFlushException =
+          new MetadataFlushException(tableStatus.datasetName, dbName, 
tableName, tableStatus.gmceTopicName, tableStatus.gmceTopicPartition, 
tableStatus.gmceLowWatermark, tableStatus.gmceHighWatermark, e);
+      errors.offer(metadataFlushException);
+    }
+    tableErrorMap.put(tableString, errors);
+    this.datasetErrorMap.put(tableStatus.datasetName, tableErrorMap);
+    log.error(String.format("Meet exception when flush table %s", 
tableString), e);
+    if (datasetErrorMap.size() > maxErrorDataset) {
+      //Fail the job if the error size exceeds some number
+      throw new IOException(String.format("Container fails to flush for more 
than %s dataset", maxErrorDataset));
+    }
+    tableStatus.gmceLowWatermark = tableStatus.gmceHighWatermark;
+  }
+
+  // Add fault tolerant ability and make sure we can emit GTE as desired
+  private void flush(String dbName, String tableName) throws IOException {
+    boolean meetException = false;
+    String tableString = Joiner.on(TABLE_NAME_DELIMITER).join(dbName, 
tableName);
+    if (tableOperationTypeMap.get(tableString).gmceLowWatermark == 
tableOperationTypeMap.get(tableString).gmceHighWatermark) {
+      // No need to flush
+      return;
+    }
+    for (MetadataWriter writer : metadataWriters) {
+      if(meetException) {
+        writer.reset(dbName, tableName);
+      } else {
+        try {
+          writer.flush(dbName, tableName);
+        } catch (IOException e) {
+          meetException = true;
+          addException(e, tableString, dbName, tableName);
+        }
+      }
+    }
+    if (!meetException && 
datasetErrorMap.containsKey(tableOperationTypeMap.get(tableString).datasetName)

Review comment:
       We want to emit GTE only when the actual watermark moves. it's either in 
the flush() method where watermark in state store move or we successfully 
commit one snapshot so that the high watermark in the iceberg table property 
moves. That's why the condition here is meetException is false as this means we 
successfully commit once for this table after we met exception. Another place 
we will emit GTE is at the end of the flush() (not flush(db,table)) method. 

##########
File path: 
gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/GobblinMCEWriter.java
##########
@@ -248,20 +272,88 @@ public void writeEnvelope(RecordEnvelope<GenericRecord> 
recordEnvelope) throws I
       String dbName = spec.getTable().getDbName();
       String tableName = spec.getTable().getTableName();
       String tableString = Joiner.on(TABLE_NAME_DELIMITER).join(dbName, 
tableName);
-      if (tableOperationTypeMap.containsKey(tableString)
-          && tableOperationTypeMap.get(tableString) != 
gmce.getOperationType()) {
-        for (MetadataWriter writer : metadataWriters) {
-          writer.flush(dbName, tableName);
-        }
-      }
-      tableOperationTypeMap.put(tableString, gmce.getOperationType());
-      for (MetadataWriter writer : metadataWriters) {
-        writer.writeEnvelope(recordEnvelope, newSpecsMap, oldSpecsMap, spec);
+      if (!tableOperationTypeMap.containsKey(tableString)) {
+        tableOperationTypeMap.put(tableString, new 
TableStatus(gmce.getOperationType(),
+            gmce.getDatasetIdentifier().getNativeName(), 
watermark.getTopicPartition().getTopicName(), 
watermark.getTopicPartition().getId(),
+            watermark.getLwm().getValue()-1, watermark.getLwm().getValue()));
+      } else if (tableOperationTypeMap.get(tableString).operationType != 
gmce.getOperationType()) {
+        flush(dbName, tableName);
+        tableOperationTypeMap.put(tableString, new 
TableStatus(gmce.getOperationType(),
+            gmce.getDatasetIdentifier().getNativeName(), 
watermark.getTopicPartition().getTopicName(), 
watermark.getTopicPartition().getId(),
+            watermark.getLwm().getValue()-1, watermark.getLwm().getValue()));
       }
+      tableOperationTypeMap.get(tableString).gmceHighWatermark = 
watermark.getLwm().getValue();
+      write(recordEnvelope, newSpecsMap, oldSpecsMap, spec);
     }
     this.recordCount.incrementAndGet();
   }
 
+  // Add fault tolerant ability and make sure we can emit GTE as desired
+  private void write(RecordEnvelope recordEnvelope, ConcurrentHashMap 
newSpecsMap, ConcurrentHashMap oldSpecsMap, HiveSpec spec) throws IOException {
+    boolean meetException = false;
+    String dbName = spec.getTable().getDbName();
+    String tableName = spec.getTable().getTableName();
+    String tableString = Joiner.on(TABLE_NAME_DELIMITER).join(dbName, 
tableName);
+    for (MetadataWriter writer : metadataWriters) {
+      if (meetException) {
+        writer.reset(dbName, tableName);
+      } else {
+        try {
+          writer.writeEnvelope(recordEnvelope, newSpecsMap, oldSpecsMap, spec);
+        } catch (Exception e) {
+          meetException = true;
+          addException(e, tableString, dbName, tableName);
+        }
+      }
+    }
+  }
+  private void addException(Exception e, String tableString, String dbName, 
String tableName) throws IOException{
+    TableStatus tableStatus = tableOperationTypeMap.get(tableString);
+    Map<String, Queue<MetadataFlushException>> tableErrorMap = 
this.datasetErrorMap.getOrDefault(tableStatus.datasetName, new HashMap<>());
+    Queue<MetadataFlushException> errors = 
tableErrorMap.getOrDefault(tableString, new LinkedList<>());
+    if (!errors.isEmpty() && errors.peek().highWatermark == 
tableStatus.gmceLowWatermark) {
+      errors.peek().highWatermark = tableStatus.gmceHighWatermark;
+    } else {
+      MetadataFlushException metadataFlushException =
+          new MetadataFlushException(tableStatus.datasetName, dbName, 
tableName, tableStatus.gmceTopicName, tableStatus.gmceTopicPartition, 
tableStatus.gmceLowWatermark, tableStatus.gmceHighWatermark, e);
+      errors.offer(metadataFlushException);
+    }
+    tableErrorMap.put(tableString, errors);
+    this.datasetErrorMap.put(tableStatus.datasetName, tableErrorMap);

Review comment:
       Sorry for that confusing, I was using datasetName to be consistent with 
the metadata schema where they treat path as datasetName. I will change 
datasetName to datasetPath to avoid this confusion. It's not a 1:1 mapping, as 
one dataset can be register into multi tables

##########
File path: 
gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/GobblinMCEWriter.java
##########
@@ -248,20 +272,88 @@ public void writeEnvelope(RecordEnvelope<GenericRecord> 
recordEnvelope) throws I
       String dbName = spec.getTable().getDbName();
       String tableName = spec.getTable().getTableName();
       String tableString = Joiner.on(TABLE_NAME_DELIMITER).join(dbName, 
tableName);
-      if (tableOperationTypeMap.containsKey(tableString)
-          && tableOperationTypeMap.get(tableString) != 
gmce.getOperationType()) {
-        for (MetadataWriter writer : metadataWriters) {
-          writer.flush(dbName, tableName);
-        }
-      }
-      tableOperationTypeMap.put(tableString, gmce.getOperationType());
-      for (MetadataWriter writer : metadataWriters) {
-        writer.writeEnvelope(recordEnvelope, newSpecsMap, oldSpecsMap, spec);
+      if (!tableOperationTypeMap.containsKey(tableString)) {
+        tableOperationTypeMap.put(tableString, new 
TableStatus(gmce.getOperationType(),
+            gmce.getDatasetIdentifier().getNativeName(), 
watermark.getTopicPartition().getTopicName(), 
watermark.getTopicPartition().getId(),
+            watermark.getLwm().getValue()-1, watermark.getLwm().getValue()));
+      } else if (tableOperationTypeMap.get(tableString).operationType != 
gmce.getOperationType()) {
+        flush(dbName, tableName);
+        tableOperationTypeMap.put(tableString, new 
TableStatus(gmce.getOperationType(),
+            gmce.getDatasetIdentifier().getNativeName(), 
watermark.getTopicPartition().getTopicName(), 
watermark.getTopicPartition().getId(),
+            watermark.getLwm().getValue()-1, watermark.getLwm().getValue()));
       }
+      tableOperationTypeMap.get(tableString).gmceHighWatermark = 
watermark.getLwm().getValue();
+      write(recordEnvelope, newSpecsMap, oldSpecsMap, spec);
     }
     this.recordCount.incrementAndGet();
   }
 
+  // Add fault tolerant ability and make sure we can emit GTE as desired
+  private void write(RecordEnvelope recordEnvelope, ConcurrentHashMap 
newSpecsMap, ConcurrentHashMap oldSpecsMap, HiveSpec spec) throws IOException {
+    boolean meetException = false;
+    String dbName = spec.getTable().getDbName();
+    String tableName = spec.getTable().getTableName();
+    String tableString = Joiner.on(TABLE_NAME_DELIMITER).join(dbName, 
tableName);
+    for (MetadataWriter writer : metadataWriters) {
+      if (meetException) {
+        writer.reset(dbName, tableName);
+      } else {
+        try {
+          writer.writeEnvelope(recordEnvelope, newSpecsMap, oldSpecsMap, spec);
+        } catch (Exception e) {
+          meetException = true;
+          addException(e, tableString, dbName, tableName);
+        }
+      }
+    }
+  }
+  private void addException(Exception e, String tableString, String dbName, 
String tableName) throws IOException{
+    TableStatus tableStatus = tableOperationTypeMap.get(tableString);
+    Map<String, Queue<MetadataFlushException>> tableErrorMap = 
this.datasetErrorMap.getOrDefault(tableStatus.datasetName, new HashMap<>());
+    Queue<MetadataFlushException> errors = 
tableErrorMap.getOrDefault(tableString, new LinkedList<>());
+    if (!errors.isEmpty() && errors.peek().highWatermark == 
tableStatus.gmceLowWatermark) {
+      errors.peek().highWatermark = tableStatus.gmceHighWatermark;
+    } else {
+      MetadataFlushException metadataFlushException =
+          new MetadataFlushException(tableStatus.datasetName, dbName, 
tableName, tableStatus.gmceTopicName, tableStatus.gmceTopicPartition, 
tableStatus.gmceLowWatermark, tableStatus.gmceHighWatermark, e);
+      errors.offer(metadataFlushException);
+    }
+    tableErrorMap.put(tableString, errors);
+    this.datasetErrorMap.put(tableStatus.datasetName, tableErrorMap);
+    log.error(String.format("Meet exception when flush table %s", 
tableString), e);
+    if (datasetErrorMap.size() > maxErrorDataset) {
+      //Fail the job if the error size exceeds some number
+      throw new IOException(String.format("Container fails to flush for more 
than %s dataset", maxErrorDataset));

Review comment:
       I print out the exception also we will emit GTE for those exception. But 
will update the exception to contain the last exception we met

##########
File path: 
gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/GobblinMCEWriter.java
##########
@@ -248,20 +272,88 @@ public void writeEnvelope(RecordEnvelope<GenericRecord> 
recordEnvelope) throws I
       String dbName = spec.getTable().getDbName();
       String tableName = spec.getTable().getTableName();
       String tableString = Joiner.on(TABLE_NAME_DELIMITER).join(dbName, 
tableName);
-      if (tableOperationTypeMap.containsKey(tableString)
-          && tableOperationTypeMap.get(tableString) != 
gmce.getOperationType()) {
-        for (MetadataWriter writer : metadataWriters) {
-          writer.flush(dbName, tableName);
-        }
-      }
-      tableOperationTypeMap.put(tableString, gmce.getOperationType());
-      for (MetadataWriter writer : metadataWriters) {
-        writer.writeEnvelope(recordEnvelope, newSpecsMap, oldSpecsMap, spec);
+      if (!tableOperationTypeMap.containsKey(tableString)) {
+        tableOperationTypeMap.put(tableString, new 
TableStatus(gmce.getOperationType(),
+            gmce.getDatasetIdentifier().getNativeName(), 
watermark.getTopicPartition().getTopicName(), 
watermark.getTopicPartition().getId(),
+            watermark.getLwm().getValue()-1, watermark.getLwm().getValue()));
+      } else if (tableOperationTypeMap.get(tableString).operationType != 
gmce.getOperationType()) {
+        flush(dbName, tableName);
+        tableOperationTypeMap.put(tableString, new 
TableStatus(gmce.getOperationType(),
+            gmce.getDatasetIdentifier().getNativeName(), 
watermark.getTopicPartition().getTopicName(), 
watermark.getTopicPartition().getId(),
+            watermark.getLwm().getValue()-1, watermark.getLwm().getValue()));
       }
+      tableOperationTypeMap.get(tableString).gmceHighWatermark = 
watermark.getLwm().getValue();
+      write(recordEnvelope, newSpecsMap, oldSpecsMap, spec);
     }
     this.recordCount.incrementAndGet();
   }
 
+  // Add fault tolerant ability and make sure we can emit GTE as desired
+  private void write(RecordEnvelope recordEnvelope, ConcurrentHashMap 
newSpecsMap, ConcurrentHashMap oldSpecsMap, HiveSpec spec) throws IOException {
+    boolean meetException = false;
+    String dbName = spec.getTable().getDbName();
+    String tableName = spec.getTable().getTableName();
+    String tableString = Joiner.on(TABLE_NAME_DELIMITER).join(dbName, 
tableName);
+    for (MetadataWriter writer : metadataWriters) {
+      if (meetException) {
+        writer.reset(dbName, tableName);
+      } else {
+        try {
+          writer.writeEnvelope(recordEnvelope, newSpecsMap, oldSpecsMap, spec);
+        } catch (Exception e) {
+          meetException = true;
+          addException(e, tableString, dbName, tableName);
+        }
+      }
+    }
+  }
+  private void addException(Exception e, String tableString, String dbName, 
String tableName) throws IOException{
+    TableStatus tableStatus = tableOperationTypeMap.get(tableString);
+    Map<String, Queue<MetadataFlushException>> tableErrorMap = 
this.datasetErrorMap.getOrDefault(tableStatus.datasetName, new HashMap<>());
+    Queue<MetadataFlushException> errors = 
tableErrorMap.getOrDefault(tableString, new LinkedList<>());
+    if (!errors.isEmpty() && errors.peek().highWatermark == 
tableStatus.gmceLowWatermark) {
+      errors.peek().highWatermark = tableStatus.gmceHighWatermark;
+    } else {
+      MetadataFlushException metadataFlushException =
+          new MetadataFlushException(tableStatus.datasetName, dbName, 
tableName, tableStatus.gmceTopicName, tableStatus.gmceTopicPartition, 
tableStatus.gmceLowWatermark, tableStatus.gmceHighWatermark, e);
+      errors.offer(metadataFlushException);
+    }
+    tableErrorMap.put(tableString, errors);
+    this.datasetErrorMap.put(tableStatus.datasetName, tableErrorMap);
+    log.error(String.format("Meet exception when flush table %s", 
tableString), e);
+    if (datasetErrorMap.size() > maxErrorDataset) {
+      //Fail the job if the error size exceeds some number
+      throw new IOException(String.format("Container fails to flush for more 
than %s dataset", maxErrorDataset));
+    }
+    tableStatus.gmceLowWatermark = tableStatus.gmceHighWatermark;
+  }
+
+  // Add fault tolerant ability and make sure we can emit GTE as desired
+  private void flush(String dbName, String tableName) throws IOException {
+    boolean meetException = false;
+    String tableString = Joiner.on(TABLE_NAME_DELIMITER).join(dbName, 
tableName);
+    if (tableOperationTypeMap.get(tableString).gmceLowWatermark == 
tableOperationTypeMap.get(tableString).gmceHighWatermark) {
+      // No need to flush
+      return;
+    }
+    for (MetadataWriter writer : metadataWriters) {
+      if(meetException) {
+        writer.reset(dbName, tableName);
+      } else {
+        try {
+          writer.flush(dbName, tableName);
+        } catch (IOException e) {
+          meetException = true;
+          addException(e, tableString, dbName, tableName);
+        }
+      }
+    }
+    if (!meetException && 
datasetErrorMap.containsKey(tableOperationTypeMap.get(tableString).datasetName)

Review comment:
       We want to emit GTE only when the actual watermark moves. it's either in 
the flush() method where watermark in state store move or we successfully 
commit one snapshot so that the high watermark in the iceberg table property 
moves. That's why the condition here is meetException is false as this means we 
successfully commit once for this table after we met exception. Another place 
we will emit GTE is at the end of the flush() (not flush(db,table)) method. 

##########
File path: 
gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/GobblinMCEWriter.java
##########
@@ -248,20 +272,88 @@ public void writeEnvelope(RecordEnvelope<GenericRecord> 
recordEnvelope) throws I
       String dbName = spec.getTable().getDbName();
       String tableName = spec.getTable().getTableName();
       String tableString = Joiner.on(TABLE_NAME_DELIMITER).join(dbName, 
tableName);
-      if (tableOperationTypeMap.containsKey(tableString)
-          && tableOperationTypeMap.get(tableString) != 
gmce.getOperationType()) {
-        for (MetadataWriter writer : metadataWriters) {
-          writer.flush(dbName, tableName);
-        }
-      }
-      tableOperationTypeMap.put(tableString, gmce.getOperationType());
-      for (MetadataWriter writer : metadataWriters) {
-        writer.writeEnvelope(recordEnvelope, newSpecsMap, oldSpecsMap, spec);
+      if (!tableOperationTypeMap.containsKey(tableString)) {
+        tableOperationTypeMap.put(tableString, new 
TableStatus(gmce.getOperationType(),
+            gmce.getDatasetIdentifier().getNativeName(), 
watermark.getTopicPartition().getTopicName(), 
watermark.getTopicPartition().getId(),
+            watermark.getLwm().getValue()-1, watermark.getLwm().getValue()));
+      } else if (tableOperationTypeMap.get(tableString).operationType != 
gmce.getOperationType()) {
+        flush(dbName, tableName);
+        tableOperationTypeMap.put(tableString, new 
TableStatus(gmce.getOperationType(),
+            gmce.getDatasetIdentifier().getNativeName(), 
watermark.getTopicPartition().getTopicName(), 
watermark.getTopicPartition().getId(),
+            watermark.getLwm().getValue()-1, watermark.getLwm().getValue()));
       }
+      tableOperationTypeMap.get(tableString).gmceHighWatermark = 
watermark.getLwm().getValue();
+      write(recordEnvelope, newSpecsMap, oldSpecsMap, spec);
     }
     this.recordCount.incrementAndGet();
   }
 
+  // Add fault tolerant ability and make sure we can emit GTE as desired
+  private void write(RecordEnvelope recordEnvelope, ConcurrentHashMap 
newSpecsMap, ConcurrentHashMap oldSpecsMap, HiveSpec spec) throws IOException {
+    boolean meetException = false;
+    String dbName = spec.getTable().getDbName();
+    String tableName = spec.getTable().getTableName();
+    String tableString = Joiner.on(TABLE_NAME_DELIMITER).join(dbName, 
tableName);
+    for (MetadataWriter writer : metadataWriters) {
+      if (meetException) {
+        writer.reset(dbName, tableName);
+      } else {
+        try {
+          writer.writeEnvelope(recordEnvelope, newSpecsMap, oldSpecsMap, spec);
+        } catch (Exception e) {
+          meetException = true;
+          addException(e, tableString, dbName, tableName);
+        }
+      }
+    }
+  }
+  private void addException(Exception e, String tableString, String dbName, 
String tableName) throws IOException{
+    TableStatus tableStatus = tableOperationTypeMap.get(tableString);
+    Map<String, Queue<MetadataFlushException>> tableErrorMap = 
this.datasetErrorMap.getOrDefault(tableStatus.datasetName, new HashMap<>());
+    Queue<MetadataFlushException> errors = 
tableErrorMap.getOrDefault(tableString, new LinkedList<>());
+    if (!errors.isEmpty() && errors.peek().highWatermark == 
tableStatus.gmceLowWatermark) {
+      errors.peek().highWatermark = tableStatus.gmceHighWatermark;
+    } else {
+      MetadataFlushException metadataFlushException =
+          new MetadataFlushException(tableStatus.datasetName, dbName, 
tableName, tableStatus.gmceTopicName, tableStatus.gmceTopicPartition, 
tableStatus.gmceLowWatermark, tableStatus.gmceHighWatermark, e);
+      errors.offer(metadataFlushException);
+    }
+    tableErrorMap.put(tableString, errors);
+    this.datasetErrorMap.put(tableStatus.datasetName, tableErrorMap);

Review comment:
       Sorry for that confusing, I was using datasetName to be consistent with 
the metadata schema where they treat path as datasetName. I will change 
datasetName to datasetPath to avoid this confusion. It's not a 1:1 mapping, as 
one dataset can be register into multi tables




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

    Worklog Id:     (was: 675703)
    Time Spent: 1h 40m  (was: 1.5h)

> Make GMCEWriter fault tolerant so that one topic failure will not affect 
> other topics in the same container
> -----------------------------------------------------------------------------------------------------------
>
>                 Key: GOBBLIN-1565
>                 URL: https://issues.apache.org/jira/browse/GOBBLIN-1565
>             Project: Apache Gobblin
>          Issue Type: Improvement
>            Reporter: Zihan Li
>            Priority: Major
>          Time Spent: 1h 40m
>  Remaining Estimate: 0h
>
> GMIP should be fault tolerant, so that one topic failure should not affect 
> other topics in the same container. One potential solution is that we can 
> catch the error and document the error dataset, if we have more than one 
> datasets have issue, that normally something wrong happening on the whole 
> job, we should fail the task, but if there is only one problematic topic, we 
> can handle the exception and let the pipeline move on and process other 
> topics.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to