[ 
https://issues.apache.org/jira/browse/GOBBLIN-1565?focusedWorklogId=674110&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-674110
 ]

ASF GitHub Bot logged work on GOBBLIN-1565:
-------------------------------------------

                Author: ASF GitHub Bot
            Created on: 02/Nov/21 21:42
            Start Date: 02/Nov/21 21:42
    Worklog Time Spent: 10m 
      Work Description: sv2000 commented on a change in pull request #3419:
URL: https://github.com/apache/gobblin/pull/3419#discussion_r740443463



##########
File path: 
gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/GobblinMCEWriter.java
##########
@@ -248,20 +272,88 @@ public void writeEnvelope(RecordEnvelope<GenericRecord> 
recordEnvelope) throws I
       String dbName = spec.getTable().getDbName();
       String tableName = spec.getTable().getTableName();
       String tableString = Joiner.on(TABLE_NAME_DELIMITER).join(dbName, 
tableName);
-      if (tableOperationTypeMap.containsKey(tableString)
-          && tableOperationTypeMap.get(tableString) != 
gmce.getOperationType()) {
-        for (MetadataWriter writer : metadataWriters) {
-          writer.flush(dbName, tableName);
-        }
-      }
-      tableOperationTypeMap.put(tableString, gmce.getOperationType());
-      for (MetadataWriter writer : metadataWriters) {
-        writer.writeEnvelope(recordEnvelope, newSpecsMap, oldSpecsMap, spec);
+      if (!tableOperationTypeMap.containsKey(tableString)) {
+        tableOperationTypeMap.put(tableString, new 
TableStatus(gmce.getOperationType(),
+            gmce.getDatasetIdentifier().getNativeName(), 
watermark.getTopicPartition().getTopicName(), 
watermark.getTopicPartition().getId(),
+            watermark.getLwm().getValue()-1, watermark.getLwm().getValue()));
+      } else if (tableOperationTypeMap.get(tableString).operationType != 
gmce.getOperationType()) {
+        flush(dbName, tableName);
+        tableOperationTypeMap.put(tableString, new 
TableStatus(gmce.getOperationType(),
+            gmce.getDatasetIdentifier().getNativeName(), 
watermark.getTopicPartition().getTopicName(), 
watermark.getTopicPartition().getId(),
+            watermark.getLwm().getValue()-1, watermark.getLwm().getValue()));
       }
+      tableOperationTypeMap.get(tableString).gmceHighWatermark = 
watermark.getLwm().getValue();
+      write(recordEnvelope, newSpecsMap, oldSpecsMap, spec);
     }
     this.recordCount.incrementAndGet();
   }
 
+  // Add fault tolerant ability and make sure we can emit GTE as desired
+  private void write(RecordEnvelope recordEnvelope, ConcurrentHashMap 
newSpecsMap, ConcurrentHashMap oldSpecsMap, HiveSpec spec) throws IOException {
+    boolean meetException = false;
+    String dbName = spec.getTable().getDbName();
+    String tableName = spec.getTable().getTableName();
+    String tableString = Joiner.on(TABLE_NAME_DELIMITER).join(dbName, 
tableName);
+    for (MetadataWriter writer : metadataWriters) {
+      if (meetException) {
+        writer.reset(dbName, tableName);
+      } else {
+        try {
+          writer.writeEnvelope(recordEnvelope, newSpecsMap, oldSpecsMap, spec);
+        } catch (Exception e) {
+          meetException = true;
+          addException(e, tableString, dbName, tableName);
+        }
+      }
+    }
+  }
+  private void addException(Exception e, String tableString, String dbName, 
String tableName) throws IOException{

Review comment:
       Add new line before the start of a new method.

##########
File path: 
gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/GobblinMCEWriter.java
##########
@@ -248,20 +272,88 @@ public void writeEnvelope(RecordEnvelope<GenericRecord> 
recordEnvelope) throws I
       String dbName = spec.getTable().getDbName();
       String tableName = spec.getTable().getTableName();
       String tableString = Joiner.on(TABLE_NAME_DELIMITER).join(dbName, 
tableName);
-      if (tableOperationTypeMap.containsKey(tableString)
-          && tableOperationTypeMap.get(tableString) != 
gmce.getOperationType()) {
-        for (MetadataWriter writer : metadataWriters) {
-          writer.flush(dbName, tableName);
-        }
-      }
-      tableOperationTypeMap.put(tableString, gmce.getOperationType());
-      for (MetadataWriter writer : metadataWriters) {
-        writer.writeEnvelope(recordEnvelope, newSpecsMap, oldSpecsMap, spec);
+      if (!tableOperationTypeMap.containsKey(tableString)) {
+        tableOperationTypeMap.put(tableString, new 
TableStatus(gmce.getOperationType(),
+            gmce.getDatasetIdentifier().getNativeName(), 
watermark.getTopicPartition().getTopicName(), 
watermark.getTopicPartition().getId(),
+            watermark.getLwm().getValue()-1, watermark.getLwm().getValue()));
+      } else if (tableOperationTypeMap.get(tableString).operationType != 
gmce.getOperationType()) {
+        flush(dbName, tableName);
+        tableOperationTypeMap.put(tableString, new 
TableStatus(gmce.getOperationType(),
+            gmce.getDatasetIdentifier().getNativeName(), 
watermark.getTopicPartition().getTopicName(), 
watermark.getTopicPartition().getId(),
+            watermark.getLwm().getValue()-1, watermark.getLwm().getValue()));
       }
+      tableOperationTypeMap.get(tableString).gmceHighWatermark = 
watermark.getLwm().getValue();
+      write(recordEnvelope, newSpecsMap, oldSpecsMap, spec);
     }
     this.recordCount.incrementAndGet();
   }
 
+  // Add fault tolerant ability and make sure we can emit GTE as desired
+  private void write(RecordEnvelope recordEnvelope, ConcurrentHashMap 
newSpecsMap, ConcurrentHashMap oldSpecsMap, HiveSpec spec) throws IOException {
+    boolean meetException = false;
+    String dbName = spec.getTable().getDbName();
+    String tableName = spec.getTable().getTableName();
+    String tableString = Joiner.on(TABLE_NAME_DELIMITER).join(dbName, 
tableName);
+    for (MetadataWriter writer : metadataWriters) {
+      if (meetException) {
+        writer.reset(dbName, tableName);
+      } else {
+        try {
+          writer.writeEnvelope(recordEnvelope, newSpecsMap, oldSpecsMap, spec);
+        } catch (Exception e) {
+          meetException = true;
+          addException(e, tableString, dbName, tableName);
+        }
+      }
+    }
+  }
+  private void addException(Exception e, String tableString, String dbName, 
String tableName) throws IOException{
+    TableStatus tableStatus = tableOperationTypeMap.get(tableString);
+    Map<String, Queue<MetadataFlushException>> tableErrorMap = 
this.datasetErrorMap.getOrDefault(tableStatus.datasetName, new HashMap<>());
+    Queue<MetadataFlushException> errors = 
tableErrorMap.getOrDefault(tableString, new LinkedList<>());
+    if (!errors.isEmpty() && errors.peek().highWatermark == 
tableStatus.gmceLowWatermark) {
+      errors.peek().highWatermark = tableStatus.gmceHighWatermark;
+    } else {
+      MetadataFlushException metadataFlushException =
+          new MetadataFlushException(tableStatus.datasetName, dbName, 
tableName, tableStatus.gmceTopicName, tableStatus.gmceTopicPartition, 
tableStatus.gmceLowWatermark, tableStatus.gmceHighWatermark, e);
+      errors.offer(metadataFlushException);
+    }
+    tableErrorMap.put(tableString, errors);
+    this.datasetErrorMap.put(tableStatus.datasetName, tableErrorMap);

Review comment:
       This is confusing. How is datasetName different from tableString? If 
there is a 1:1 mapping between datasetName and tableString, is Map of Maps the 
right data structure here?

##########
File path: 
gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/GobblinMCEWriter.java
##########
@@ -187,6 +210,7 @@ public Descriptor getDataDescriptor() {
   @Override
   public void writeEnvelope(RecordEnvelope<GenericRecord> recordEnvelope) 
throws IOException {
     GenericRecord genericRecord = recordEnvelope.getRecord();
+    KafkaStreamingExtractor.KafkaWatermark watermark = 
((KafkaStreamingExtractor.KafkaWatermark) recordEnvelope.getWatermark());

Review comment:
       Looks like we are leaking details here. Ideally, we should not expose 
details of extractor implementation here. Can we use the 
CheckpointableWatermark interface instead? 

##########
File path: 
gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/MetadataFlushException.java
##########
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.iceberg.writer;
+
+import java.io.IOException;
+
+
+public class MetadataFlushException extends IOException {
+  public String datasetName;
+  public String dbName;
+  public String tableName;
+  public String GMCETopicName;
+  public int GMCETopicPartition;
+  public long highWatermark;
+  public long lowWatermark;
+  public Exception exception;
+  MetadataFlushException(String datasetName, String dbName, String tableName, 
String GMCETopicName,

Review comment:
       Since this is used in both write and flush, shall we call it 
GMCEException or something more generic?

##########
File path: 
gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/writer/MetadataWriter.java
##########
@@ -47,6 +47,16 @@
    */
   void flush(String dbName, String tableName) throws IOException;
 
+  /**
+   * If something wrong happens, we want to reset the table inside the writer 
so that we can continual

Review comment:
       continual -> continue

##########
File path: 
gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/GobblinMCEWriter.java
##########
@@ -248,20 +272,88 @@ public void writeEnvelope(RecordEnvelope<GenericRecord> 
recordEnvelope) throws I
       String dbName = spec.getTable().getDbName();
       String tableName = spec.getTable().getTableName();
       String tableString = Joiner.on(TABLE_NAME_DELIMITER).join(dbName, 
tableName);
-      if (tableOperationTypeMap.containsKey(tableString)
-          && tableOperationTypeMap.get(tableString) != 
gmce.getOperationType()) {
-        for (MetadataWriter writer : metadataWriters) {
-          writer.flush(dbName, tableName);
-        }
-      }
-      tableOperationTypeMap.put(tableString, gmce.getOperationType());
-      for (MetadataWriter writer : metadataWriters) {
-        writer.writeEnvelope(recordEnvelope, newSpecsMap, oldSpecsMap, spec);
+      if (!tableOperationTypeMap.containsKey(tableString)) {
+        tableOperationTypeMap.put(tableString, new 
TableStatus(gmce.getOperationType(),
+            gmce.getDatasetIdentifier().getNativeName(), 
watermark.getTopicPartition().getTopicName(), 
watermark.getTopicPartition().getId(),
+            watermark.getLwm().getValue()-1, watermark.getLwm().getValue()));
+      } else if (tableOperationTypeMap.get(tableString).operationType != 
gmce.getOperationType()) {
+        flush(dbName, tableName);
+        tableOperationTypeMap.put(tableString, new 
TableStatus(gmce.getOperationType(),
+            gmce.getDatasetIdentifier().getNativeName(), 
watermark.getTopicPartition().getTopicName(), 
watermark.getTopicPartition().getId(),
+            watermark.getLwm().getValue()-1, watermark.getLwm().getValue()));
       }
+      tableOperationTypeMap.get(tableString).gmceHighWatermark = 
watermark.getLwm().getValue();
+      write(recordEnvelope, newSpecsMap, oldSpecsMap, spec);
     }
     this.recordCount.incrementAndGet();
   }
 
+  // Add fault tolerant ability and make sure we can emit GTE as desired
+  private void write(RecordEnvelope recordEnvelope, ConcurrentHashMap 
newSpecsMap, ConcurrentHashMap oldSpecsMap, HiveSpec spec) throws IOException {
+    boolean meetException = false;
+    String dbName = spec.getTable().getDbName();
+    String tableName = spec.getTable().getTableName();
+    String tableString = Joiner.on(TABLE_NAME_DELIMITER).join(dbName, 
tableName);
+    for (MetadataWriter writer : metadataWriters) {
+      if (meetException) {
+        writer.reset(dbName, tableName);
+      } else {
+        try {
+          writer.writeEnvelope(recordEnvelope, newSpecsMap, oldSpecsMap, spec);
+        } catch (Exception e) {
+          meetException = true;
+          addException(e, tableString, dbName, tableName);
+        }
+      }
+    }
+  }
+  private void addException(Exception e, String tableString, String dbName, 
String tableName) throws IOException{
+    TableStatus tableStatus = tableOperationTypeMap.get(tableString);
+    Map<String, Queue<MetadataFlushException>> tableErrorMap = 
this.datasetErrorMap.getOrDefault(tableStatus.datasetName, new HashMap<>());
+    Queue<MetadataFlushException> errors = 
tableErrorMap.getOrDefault(tableString, new LinkedList<>());
+    if (!errors.isEmpty() && errors.peek().highWatermark == 
tableStatus.gmceLowWatermark) {
+      errors.peek().highWatermark = tableStatus.gmceHighWatermark;
+    } else {
+      MetadataFlushException metadataFlushException =
+          new MetadataFlushException(tableStatus.datasetName, dbName, 
tableName, tableStatus.gmceTopicName, tableStatus.gmceTopicPartition, 
tableStatus.gmceLowWatermark, tableStatus.gmceHighWatermark, e);
+      errors.offer(metadataFlushException);
+    }
+    tableErrorMap.put(tableString, errors);
+    this.datasetErrorMap.put(tableStatus.datasetName, tableErrorMap);
+    log.error(String.format("Meet exception when flush table %s", 
tableString), e);
+    if (datasetErrorMap.size() > maxErrorDataset) {
+      //Fail the job if the error size exceeds some number
+      throw new IOException(String.format("Container fails to flush for more 
than %s dataset", maxErrorDataset));
+    }
+    tableStatus.gmceLowWatermark = tableStatus.gmceHighWatermark;
+  }
+
+  // Add fault tolerant ability and make sure we can emit GTE as desired
+  private void flush(String dbName, String tableName) throws IOException {
+    boolean meetException = false;
+    String tableString = Joiner.on(TABLE_NAME_DELIMITER).join(dbName, 
tableName);
+    if (tableOperationTypeMap.get(tableString).gmceLowWatermark == 
tableOperationTypeMap.get(tableString).gmceHighWatermark) {
+      // No need to flush
+      return;
+    }
+    for (MetadataWriter writer : metadataWriters) {
+      if(meetException) {
+        writer.reset(dbName, tableName);
+      } else {
+        try {
+          writer.flush(dbName, tableName);
+        } catch (IOException e) {
+          meetException = true;
+          addException(e, tableString, dbName, tableName);
+        }
+      }
+    }
+    if (!meetException && 
datasetErrorMap.containsKey(tableOperationTypeMap.get(tableString).datasetName)

Review comment:
       Shouldn't this if condition be triggered if meetException = true? 

##########
File path: 
gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/writer/MetadataWriter.java
##########
@@ -47,6 +47,16 @@
    */
   void flush(String dbName, String tableName) throws IOException;
 
+  /**
+   * If something wrong happens, we want to reset the table inside the writer 
so that we can continual

Review comment:
       The meaning of reset is not quite clear from the javadoc description. Is 
it about cleaning up some in-memory state that is maintained by the pipeline 
for the table?

##########
File path: 
gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/GobblinMCEWriter.java
##########
@@ -248,20 +272,88 @@ public void writeEnvelope(RecordEnvelope<GenericRecord> 
recordEnvelope) throws I
       String dbName = spec.getTable().getDbName();
       String tableName = spec.getTable().getTableName();
       String tableString = Joiner.on(TABLE_NAME_DELIMITER).join(dbName, 
tableName);
-      if (tableOperationTypeMap.containsKey(tableString)
-          && tableOperationTypeMap.get(tableString) != 
gmce.getOperationType()) {
-        for (MetadataWriter writer : metadataWriters) {
-          writer.flush(dbName, tableName);
-        }
-      }
-      tableOperationTypeMap.put(tableString, gmce.getOperationType());
-      for (MetadataWriter writer : metadataWriters) {
-        writer.writeEnvelope(recordEnvelope, newSpecsMap, oldSpecsMap, spec);
+      if (!tableOperationTypeMap.containsKey(tableString)) {
+        tableOperationTypeMap.put(tableString, new 
TableStatus(gmce.getOperationType(),
+            gmce.getDatasetIdentifier().getNativeName(), 
watermark.getTopicPartition().getTopicName(), 
watermark.getTopicPartition().getId(),
+            watermark.getLwm().getValue()-1, watermark.getLwm().getValue()));
+      } else if (tableOperationTypeMap.get(tableString).operationType != 
gmce.getOperationType()) {
+        flush(dbName, tableName);
+        tableOperationTypeMap.put(tableString, new 
TableStatus(gmce.getOperationType(),
+            gmce.getDatasetIdentifier().getNativeName(), 
watermark.getTopicPartition().getTopicName(), 
watermark.getTopicPartition().getId(),
+            watermark.getLwm().getValue()-1, watermark.getLwm().getValue()));
       }
+      tableOperationTypeMap.get(tableString).gmceHighWatermark = 
watermark.getLwm().getValue();
+      write(recordEnvelope, newSpecsMap, oldSpecsMap, spec);
     }
     this.recordCount.incrementAndGet();
   }
 
+  // Add fault tolerant ability and make sure we can emit GTE as desired
+  private void write(RecordEnvelope recordEnvelope, ConcurrentHashMap 
newSpecsMap, ConcurrentHashMap oldSpecsMap, HiveSpec spec) throws IOException {
+    boolean meetException = false;
+    String dbName = spec.getTable().getDbName();
+    String tableName = spec.getTable().getTableName();
+    String tableString = Joiner.on(TABLE_NAME_DELIMITER).join(dbName, 
tableName);
+    for (MetadataWriter writer : metadataWriters) {
+      if (meetException) {
+        writer.reset(dbName, tableName);
+      } else {
+        try {
+          writer.writeEnvelope(recordEnvelope, newSpecsMap, oldSpecsMap, spec);
+        } catch (Exception e) {
+          meetException = true;
+          addException(e, tableString, dbName, tableName);
+        }
+      }
+    }
+  }
+  private void addException(Exception e, String tableString, String dbName, 
String tableName) throws IOException{
+    TableStatus tableStatus = tableOperationTypeMap.get(tableString);
+    Map<String, Queue<MetadataFlushException>> tableErrorMap = 
this.datasetErrorMap.getOrDefault(tableStatus.datasetName, new HashMap<>());
+    Queue<MetadataFlushException> errors = 
tableErrorMap.getOrDefault(tableString, new LinkedList<>());
+    if (!errors.isEmpty() && errors.peek().highWatermark == 
tableStatus.gmceLowWatermark) {
+      errors.peek().highWatermark = tableStatus.gmceHighWatermark;
+    } else {
+      MetadataFlushException metadataFlushException =
+          new MetadataFlushException(tableStatus.datasetName, dbName, 
tableName, tableStatus.gmceTopicName, tableStatus.gmceTopicPartition, 
tableStatus.gmceLowWatermark, tableStatus.gmceHighWatermark, e);
+      errors.offer(metadataFlushException);
+    }
+    tableErrorMap.put(tableString, errors);
+    this.datasetErrorMap.put(tableStatus.datasetName, tableErrorMap);
+    log.error(String.format("Meet exception when flush table %s", 
tableString), e);
+    if (datasetErrorMap.size() > maxErrorDataset) {
+      //Fail the job if the error size exceeds some number
+      throw new IOException(String.format("Container fails to flush for more 
than %s dataset", maxErrorDataset));

Review comment:
       Should we throw the actual exception that the flush/write encountered 
for the specific dataset? 

##########
File path: 
gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/GobblinMCEWriter.java
##########
@@ -248,20 +272,88 @@ public void writeEnvelope(RecordEnvelope<GenericRecord> 
recordEnvelope) throws I
       String dbName = spec.getTable().getDbName();
       String tableName = spec.getTable().getTableName();
       String tableString = Joiner.on(TABLE_NAME_DELIMITER).join(dbName, 
tableName);
-      if (tableOperationTypeMap.containsKey(tableString)
-          && tableOperationTypeMap.get(tableString) != 
gmce.getOperationType()) {
-        for (MetadataWriter writer : metadataWriters) {
-          writer.flush(dbName, tableName);
-        }
-      }
-      tableOperationTypeMap.put(tableString, gmce.getOperationType());
-      for (MetadataWriter writer : metadataWriters) {
-        writer.writeEnvelope(recordEnvelope, newSpecsMap, oldSpecsMap, spec);
+      if (!tableOperationTypeMap.containsKey(tableString)) {
+        tableOperationTypeMap.put(tableString, new 
TableStatus(gmce.getOperationType(),
+            gmce.getDatasetIdentifier().getNativeName(), 
watermark.getTopicPartition().getTopicName(), 
watermark.getTopicPartition().getId(),
+            watermark.getLwm().getValue()-1, watermark.getLwm().getValue()));
+      } else if (tableOperationTypeMap.get(tableString).operationType != 
gmce.getOperationType()) {
+        flush(dbName, tableName);
+        tableOperationTypeMap.put(tableString, new 
TableStatus(gmce.getOperationType(),
+            gmce.getDatasetIdentifier().getNativeName(), 
watermark.getTopicPartition().getTopicName(), 
watermark.getTopicPartition().getId(),
+            watermark.getLwm().getValue()-1, watermark.getLwm().getValue()));
       }
+      tableOperationTypeMap.get(tableString).gmceHighWatermark = 
watermark.getLwm().getValue();
+      write(recordEnvelope, newSpecsMap, oldSpecsMap, spec);
     }
     this.recordCount.incrementAndGet();
   }
 
+  // Add fault tolerant ability and make sure we can emit GTE as desired
+  private void write(RecordEnvelope recordEnvelope, ConcurrentHashMap 
newSpecsMap, ConcurrentHashMap oldSpecsMap, HiveSpec spec) throws IOException {
+    boolean meetException = false;
+    String dbName = spec.getTable().getDbName();
+    String tableName = spec.getTable().getTableName();
+    String tableString = Joiner.on(TABLE_NAME_DELIMITER).join(dbName, 
tableName);
+    for (MetadataWriter writer : metadataWriters) {
+      if (meetException) {
+        writer.reset(dbName, tableName);
+      } else {
+        try {
+          writer.writeEnvelope(recordEnvelope, newSpecsMap, oldSpecsMap, spec);
+        } catch (Exception e) {
+          meetException = true;
+          addException(e, tableString, dbName, tableName);
+        }
+      }
+    }
+  }
+  private void addException(Exception e, String tableString, String dbName, 
String tableName) throws IOException{

Review comment:
       Should we call this method addOrThrowException?

##########
File path: 
gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/MetadataFlushException.java
##########
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.iceberg.writer;
+
+import java.io.IOException;
+
+
+public class MetadataFlushException extends IOException {
+  public String datasetName;

Review comment:
       Seems like there is a lot of overlap in the fields: datasetName, dbName, 
tableName, topicName, topicPartition. Can we keep only the ones needed and 
derive the rest from those fields?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

    Worklog Id:     (was: 674110)
    Time Spent: 1.5h  (was: 1h 20m)

> Make GMCEWriter fault tolerant so that one topic failure will not affect 
> other topics in the same container
> -----------------------------------------------------------------------------------------------------------
>
>                 Key: GOBBLIN-1565
>                 URL: https://issues.apache.org/jira/browse/GOBBLIN-1565
>             Project: Apache Gobblin
>          Issue Type: Improvement
>            Reporter: Zihan Li
>            Priority: Major
>          Time Spent: 1.5h
>  Remaining Estimate: 0h
>
> GMIP should be fault tolerant, so that one topic failure should not affect 
> other topics in the same container. One potential solution is that we can 
> catch the error and document the error dataset, if we have more than one 
> datasets have issue, that normally something wrong happening on the whole 
> job, we should fail the task, but if there is only one problematic topic, we 
> can handle the exception and let the pipeline move on and process other 
> topics.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to