This is an automated email from the ASF dual-hosted git repository.

suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 62a4344  [GOBBLIN-1565]Make GMCEWriter fault tolerant so that one 
topic failure will not affect other topics in the same container (#3419)
62a4344 is described below

commit 62a434474d96cdf63bd4913846f6ca07f8c9e5e9
Author: Zihan Li <[email protected]>
AuthorDate: Tue Nov 9 17:24:28 2021 -0800

    [GOBBLIN-1565]Make GMCEWriter fault tolerant so that one topic failure will 
not affect other topics in the same container (#3419)
    
    * [GOBBLIN-1565]Make GMCEWriter fault tolerant so that one topic failure 
will not affect other topics in the same container
    
    * address comments
    
    * change the way we set low watermark to have a better indicate for the 
watermark range of the snapshot
    
    * address comments
    
    * fix test error
---
 .../gobblin/hive/writer/HiveMetadataWriter.java    |   9 ++
 .../apache/gobblin/hive/writer/MetadataWriter.java |  10 ++
 .../gobblin/iceberg/writer/GobblinMCEWriter.java   | 131 +++++++++++++++++----
 .../iceberg/writer/GobblinMetadataException.java   |  40 +++++++
 .../iceberg/writer/IcebergMetadataWriter.java      |  19 +--
 .../iceberg/writer/HiveMetadataWriterTest.java     |   5 +-
 .../iceberg/writer/IcebergMetadataWriterTest.java  |  39 +++++-
 7 files changed, 218 insertions(+), 35 deletions(-)

diff --git 
a/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/writer/HiveMetadataWriter.java
 
b/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/writer/HiveMetadataWriter.java
index 9ca6bb9..5ed0d47 100644
--- 
a/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/writer/HiveMetadataWriter.java
+++ 
b/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/writer/HiveMetadataWriter.java
@@ -123,6 +123,15 @@ public class HiveMetadataWriter implements MetadataWriter {
     }
   }
 
+  @Override
+  public void reset(String dbName, String tableName) throws IOException {
+    String tableKey = tableNameJoiner.join(dbName, tableName);
+    this.currentExecutionMap.remove(tableKey);
+    this.schemaCreationTimeMap.remove(tableKey);
+    this.latestSchemaMap.remove(tableKey);
+    this.specMaps.remove(tableKey);
+  }
+
   public void write(GobblinMetadataChangeEvent gmce, Map<String, 
Collection<HiveSpec>> newSpecsMap,
       Map<String, Collection<HiveSpec>> oldSpecsMap, HiveSpec tableSpec) 
throws IOException {
     String dbName = tableSpec.getTable().getDbName();
diff --git 
a/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/writer/MetadataWriter.java
 
b/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/writer/MetadataWriter.java
index 6333688..e9457d1 100644
--- 
a/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/writer/MetadataWriter.java
+++ 
b/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/writer/MetadataWriter.java
@@ -48,6 +48,16 @@ public interface MetadataWriter extends Closeable {
   void flush(String dbName, String tableName) throws IOException;
 
   /**
+   * If something wrong happens, we want to clean up in-memory state for the 
table inside the writer so that we can continue
+   * registration for this table without affect correctness
+   *
+   * @param dbName The db name of metadata-registration target.
+   * @param tableName The table name of metadata-registration target.
+   * @throws IOException
+   */
+  void reset(String dbName, String tableName) throws IOException;
+
+  /**
    * Compute and cache the metadata from the GMCE
    * @param recordEnvelope Containing {@link GobblinMetadataChangeEvent}
    * @param newSpecsMap The container (as a map) for new specs.
diff --git 
a/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/GobblinMCEWriter.java
 
b/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/GobblinMCEWriter.java
index a6036e3..324ca85 100644
--- 
a/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/GobblinMCEWriter.java
+++ 
b/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/GobblinMCEWriter.java
@@ -29,9 +29,13 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 
+import lombok.AllArgsConstructor;
+import lombok.Setter;
 import org.apache.avro.Schema;
 import org.apache.avro.generic.GenericRecord;
 import org.apache.avro.specific.SpecificData;
+import org.apache.gobblin.source.extractor.CheckpointableWatermark;
+import org.apache.gobblin.source.extractor.extract.LongWatermark;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.serde2.avro.AvroSerdeUtils;
@@ -87,12 +91,15 @@ public class GobblinMCEWriter implements 
DataWriter<GenericRecord> {
   public static final String METADATA_PARALLEL_RUNNER_TIMEOUT_MILLS = 
"metadata.parallel.runner.timeout.mills";
   public static final String HIVE_PARTITION_NAME = "hive.partition.name";
   public static final String GMCE_METADATA_WRITER_CLASSES = 
"gmce.metadata.writer.classes";
+  public static final String GMCE_METADATA_WRITER_MAX_ERROR_DATASET = 
"gmce.metadata.writer.max.error.dataset";
+  public static final int DEFUALT_GMCE_METADATA_WRITER_MAX_ERROR_DATASET = 0;
   public static final int DEFAULT_ICEBERG_PARALLEL_TIMEOUT_MILLS = 60000;
   public static final String TABLE_NAME_DELIMITER = ".";
   @Getter
   List<MetadataWriter> metadataWriters;
-  Map<String, OperationType> tableOperationTypeMap;
-  Map<String, OperationType> datasetOperationTypeMap;
+  Map<String, TableStatus> tableOperationTypeMap;
+  @Getter
+  Map<String, Map<String, GobblinMetadataException>> datasetErrorMap;
   Set<String> acceptedClusters;
   protected State state;
   private final ParallelRunner parallelRunner;
@@ -101,18 +108,30 @@ public class GobblinMCEWriter implements 
DataWriter<GenericRecord> {
   private Map<String, Cache<String, Collection<HiveSpec>>> newSpecsMaps;
   private Closer closer = Closer.create();
   protected final AtomicLong recordCount = new AtomicLong(0L);
+  @Setter
+  private int maxErrorDataset;
+
+  @AllArgsConstructor
+  static class TableStatus {
+    OperationType operationType;
+    String datasetPath;
+    String gmceTopicPartition;
+    long gmceLowWatermark;
+    long gmceHighWatermark;
+  }
 
   GobblinMCEWriter(DataWriterBuilder<Schema, GenericRecord> builder, State 
properties) throws IOException {
     newSpecsMaps = new HashMap<>();
     oldSpecsMaps = new HashMap<>();
     metadataWriters = new ArrayList<>();
+    datasetErrorMap = new HashMap<>();
     acceptedClusters = properties.getPropAsSet(ACCEPTED_CLUSTER_NAMES, 
ClustersNames.getInstance().getClusterName());
     state = properties;
+    maxErrorDataset = 
state.getPropAsInt(GMCE_METADATA_WRITER_MAX_ERROR_DATASET, 
DEFUALT_GMCE_METADATA_WRITER_MAX_ERROR_DATASET);
     for (String className : state.getPropAsList(GMCE_METADATA_WRITER_CLASSES, 
IcebergMetadataWriter.class.getName())) {
       
metadataWriters.add(closer.register(GobblinConstructorUtils.invokeConstructor(MetadataWriter.class,
 className, state)));
     }
     tableOperationTypeMap = new HashMap<>();
-    datasetOperationTypeMap = new HashMap<>();
     parallelRunner = closer.register(new 
ParallelRunner(state.getPropAsInt(METADATA_REGISTRATION_THREADS, 20),
         FileSystem.get(HadoopUtils.getConfFromState(properties))));
     parallelRunnerTimeoutMills =
@@ -187,6 +206,8 @@ public class GobblinMCEWriter implements 
DataWriter<GenericRecord> {
   @Override
   public void writeEnvelope(RecordEnvelope<GenericRecord> recordEnvelope) 
throws IOException {
     GenericRecord genericRecord = recordEnvelope.getRecord();
+    CheckpointableWatermark watermark = recordEnvelope.getWatermark();
+    Preconditions.checkNotNull(watermark);
     //filter out the events that not emitted by accepted clusters
     if (!acceptedClusters.contains(genericRecord.get("cluster"))) {
       return;
@@ -197,13 +218,7 @@ public class GobblinMCEWriter implements 
DataWriter<GenericRecord> {
     String datasetName = gmce.getDatasetIdentifier().toString();
     //remove the old hive spec cache after flush
     //Here we assume that new hive spec for one path always be the 
same(ingestion flow register to same tables)
-    if (!datasetOperationTypeMap.containsKey(datasetName)) {
-      oldSpecsMaps.remove(datasetName);
-    }
-    if (datasetOperationTypeMap.containsKey(datasetName)
-        && datasetOperationTypeMap.get(datasetName) != 
gmce.getOperationType()) {
-      datasetOperationTypeMap.put(datasetName, gmce.getOperationType());
-    }
+    oldSpecsMaps.remove(datasetName);
 
     // Mapping from URI of path of arrival files to the list of HiveSpec 
objects.
     // We assume in one same operation interval, for same dataset, the table 
property will not change to reduce the time to compute hiveSpec.
@@ -248,20 +263,93 @@ public class GobblinMCEWriter implements 
DataWriter<GenericRecord> {
       String dbName = spec.getTable().getDbName();
       String tableName = spec.getTable().getTableName();
       String tableString = Joiner.on(TABLE_NAME_DELIMITER).join(dbName, 
tableName);
-      if (tableOperationTypeMap.containsKey(tableString)
-          && tableOperationTypeMap.get(tableString) != 
gmce.getOperationType()) {
-        for (MetadataWriter writer : metadataWriters) {
-          writer.flush(dbName, tableName);
-        }
-      }
-      tableOperationTypeMap.put(tableString, gmce.getOperationType());
-      for (MetadataWriter writer : metadataWriters) {
-        writer.writeEnvelope(recordEnvelope, newSpecsMap, oldSpecsMap, spec);
+      if (!tableOperationTypeMap.containsKey(tableString)) {
+        tableOperationTypeMap.put(tableString, new 
TableStatus(gmce.getOperationType(),
+            gmce.getDatasetIdentifier().getNativeName(), watermark.getSource(),
+            ((LongWatermark)watermark.getWatermark()).getValue()-1, 
((LongWatermark)watermark.getWatermark()).getValue()));
+      } else if (tableOperationTypeMap.get(tableString).operationType != 
gmce.getOperationType()) {
+        flush(dbName, tableName);
+        tableOperationTypeMap.put(tableString, new 
TableStatus(gmce.getOperationType(),
+            gmce.getDatasetIdentifier().getNativeName(), watermark.getSource(),
+            ((LongWatermark)watermark.getWatermark()).getValue()-1, 
((LongWatermark)watermark.getWatermark()).getValue()));
       }
+      tableOperationTypeMap.get(tableString).gmceHighWatermark = 
((LongWatermark)watermark.getWatermark()).getValue();
+      write(recordEnvelope, newSpecsMap, oldSpecsMap, spec);
     }
     this.recordCount.incrementAndGet();
   }
 
+  // Add fault tolerant ability and make sure we can emit GTE as desired
+  private void write(RecordEnvelope recordEnvelope, ConcurrentHashMap 
newSpecsMap, ConcurrentHashMap oldSpecsMap, HiveSpec spec) throws IOException {
+    boolean meetException = false;
+    String dbName = spec.getTable().getDbName();
+    String tableName = spec.getTable().getTableName();
+    String tableString = Joiner.on(TABLE_NAME_DELIMITER).join(dbName, 
tableName);
+    for (MetadataWriter writer : metadataWriters) {
+      if (meetException) {
+        writer.reset(dbName, tableName);
+      } else {
+        try {
+          writer.writeEnvelope(recordEnvelope, newSpecsMap, oldSpecsMap, spec);
+        } catch (Exception e) {
+          meetException = true;
+          writer.reset(dbName, tableName);
+          addOrThrowException(e, tableString, dbName, tableName);
+        }
+      }
+    }
+  }
+
+  private void addOrThrowException(Exception e, String tableString, String 
dbName, String tableName) throws IOException{
+    TableStatus tableStatus = tableOperationTypeMap.get(tableString);
+    Map<String, GobblinMetadataException> tableErrorMap = 
this.datasetErrorMap.getOrDefault(tableStatus.datasetPath, new HashMap<>());
+    if (tableErrorMap.containsKey(tableString)) {
+      tableErrorMap.get(tableString).highWatermark = 
tableStatus.gmceHighWatermark;
+    } else {
+      GobblinMetadataException gobblinMetadataException =
+          new GobblinMetadataException(tableStatus.datasetPath, dbName, 
tableName, tableStatus.gmceTopicPartition, tableStatus.gmceLowWatermark, 
tableStatus.gmceHighWatermark, e);
+      tableErrorMap.put(tableString, gobblinMetadataException);
+    }
+    this.datasetErrorMap.put(tableStatus.datasetPath, tableErrorMap);
+    log.error(String.format("Meet exception when flush table %s", 
tableString), e);
+    if (datasetErrorMap.size() > maxErrorDataset) {
+      //Fail the job if the error size exceeds some number
+      throw new IOException(String.format("Container fails to flush for more 
than %s dataset, last exception we met is: ", maxErrorDataset), e);
+    }
+    tableOperationTypeMap.remove(tableString);
+  }
+
+  // Add fault tolerant ability and make sure we can emit GTE as desired
+  private void flush(String dbName, String tableName) throws IOException {
+    boolean meetException = false;
+    String tableString = Joiner.on(TABLE_NAME_DELIMITER).join(dbName, 
tableName);
+    if (tableOperationTypeMap.get(tableString).gmceLowWatermark == 
tableOperationTypeMap.get(tableString).gmceHighWatermark) {
+      // No need to flush
+      return;
+    }
+    for (MetadataWriter writer : metadataWriters) {
+      if(meetException) {
+        writer.reset(dbName, tableName);
+      } else {
+        try {
+          writer.flush(dbName, tableName);
+        } catch (IOException e) {
+          meetException = true;
+          writer.reset(dbName, tableName);
+          addOrThrowException(e, tableString, dbName, tableName);
+        }
+      }
+    }
+    if (!meetException && 
datasetErrorMap.containsKey(tableOperationTypeMap.get(tableString).datasetPath)
+        && 
datasetErrorMap.get(tableOperationTypeMap.get(tableString).datasetPath).containsKey(tableString))
 {
+      // We only want to emit GTE when the table watermark moves. There can be 
two scenario that watermark move, one is after one flush interval,
+      // we commit new watermark to state store, anther is here, where during 
the flush interval, we flush table because table operation changes.
+      // Under this condition, error map contains this dataset means we met 
error before this flush, but this time when flush succeed and
+      // the watermark inside the table moves, so we want to emit GTE to 
indicate there is some data loss here
+      //todo: since we finish flush for this table once, need to emit GTE to 
indicate we miss data for this table
+      log.warn(String.format("Send GTE to indicate table flush failure for 
%s", tableString));
+    }
+  }
   /**
    * Call the metadata writers to do flush each table metadata.
    * Flush of metadata writer is the place that do real metadata
@@ -279,12 +367,11 @@ public class GobblinMCEWriter implements 
DataWriter<GenericRecord> {
     log.info(String.format("start to flushing %s records", 
String.valueOf(recordCount.get())));
     for (String tableString : tableOperationTypeMap.keySet()) {
       List<String> tid = 
Splitter.on(TABLE_NAME_DELIMITER).splitToList(tableString);
-      for (MetadataWriter writer : metadataWriters) {
-        writer.flush(tid.get(0), tid.get(1));
-      }
+      flush(tid.get(0), tid.get(1));
     }
     tableOperationTypeMap.clear();
     recordCount.lazySet(0L);
+    //todo: after flush, watermark will move forward, so emit error GTE here
   }
 
   @Override
diff --git 
a/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/GobblinMetadataException.java
 
b/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/GobblinMetadataException.java
new file mode 100644
index 0000000..90f1e63
--- /dev/null
+++ 
b/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/GobblinMetadataException.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.iceberg.writer;
+
+import java.io.IOException;
+
+
+public class GobblinMetadataException extends IOException {
+  public String datasetPath;
+  public String dbName;
+  public String tableName;
+  public String GMCETopicPartition;
+  public long highWatermark;
+  public long lowWatermark;
+  public Exception exception;
+  GobblinMetadataException(String datasetPath, String dbName, String 
tableName, String GMCETopicPartition, long lowWatermark, long highWatermark, 
Exception exception) {
+    super(String.format("failed to flush table %s, %s", dbName, tableName), 
exception);
+    this.datasetPath = datasetPath;
+    this.dbName = dbName;
+    this.tableName = tableName;
+    this.GMCETopicPartition = GMCETopicPartition;
+    this.highWatermark = highWatermark;
+    this.lowWatermark = lowWatermark;
+  }
+}
diff --git 
a/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriter.java
 
b/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriter.java
index 28051a8..db01793 100644
--- 
a/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriter.java
+++ 
b/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriter.java
@@ -46,6 +46,7 @@ import java.util.stream.Stream;
 
 import org.apache.avro.generic.GenericRecord;
 import org.apache.avro.specific.SpecificData;
+import org.apache.gobblin.source.extractor.extract.LongWatermark;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -114,7 +115,6 @@ import org.apache.gobblin.metrics.event.EventSubmitter;
 import org.apache.gobblin.metrics.event.GobblinEventBuilder;
 import org.apache.gobblin.metrics.kafka.KafkaSchemaRegistry;
 import org.apache.gobblin.metrics.kafka.SchemaRegistryException;
-import 
org.apache.gobblin.source.extractor.extract.kafka.KafkaStreamingExtractor.KafkaWatermark;
 import org.apache.gobblin.stream.RecordEnvelope;
 import org.apache.gobblin.time.TimeIterator;
 import org.apache.gobblin.util.AvroUtils;
@@ -267,10 +267,6 @@ public class IcebergMetadataWriter implements 
MetadataWriter {
     currentWatermark =
         
icebergTable.properties().containsKey(String.format(GMCE_HIGH_WATERMARK_KEY, 
topicPartition)) ? Long.parseLong(
             
icebergTable.properties().get(String.format(GMCE_HIGH_WATERMARK_KEY, 
topicPartition))) : DEFAULT_WATERMARK;
-    if (currentWatermark != DEFAULT_WATERMARK) {
-      // set the low watermark for current snapshot
-      tableMetadataMap.computeIfAbsent(tid, t -> new 
TableMetadata()).lowWatermark = Optional.of(currentWatermark);
-    }
     return currentWatermark;
   }
 
@@ -861,6 +857,11 @@ public class IcebergMetadataWriter implements 
MetadataWriter {
     }
   }
 
+  @Override
+  public void reset(String dbName, String tableName) throws IOException {
+      this.tableMetadataMap.remove(TableIdentifier.of(dbName, tableName));
+  }
+
   /**
    * NOTE: completion watermark for a window [t1, t2] is marked as t2 if audit 
counts match
    * for that window (aka its is set to the beginning of next window)
@@ -1013,13 +1014,13 @@ public class IcebergMetadataWriter implements 
MetadataWriter {
       if (whitelistBlacklist.acceptTable(tableSpec.getTable().getDbName(), 
tableSpec.getTable().getTableName())) {
         TableIdentifier tid = 
TableIdentifier.of(tableSpec.getTable().getDbName(), 
tableSpec.getTable().getTableName());
         String topicPartition = tableTopicPartitionMap.computeIfAbsent(tid,
-            t -> ((KafkaWatermark) 
recordEnvelope.getWatermark()).getTopicPartition().toString());
+            t -> recordEnvelope.getWatermark().getSource());
         Long currentWatermark = getAndPersistCurrentWatermark(tid, 
topicPartition);
-        Long currentOffset = ((KafkaWatermark) 
recordEnvelope.getWatermark()).getLwm().getValue();
+        Long currentOffset = 
((LongWatermark)recordEnvelope.getWatermark().getWatermark()).getValue();
 
         if (currentOffset > currentWatermark) {
-          if (currentWatermark == DEFAULT_WATERMARK) {
-            //This means we haven't register this table or the GMCE topic 
partition changed, we need to reset the low watermark
+          if (!tableMetadataMap.computeIfAbsent(tid, t -> new 
TableMetadata()).lowWatermark.isPresent()) {
+            //This means we haven't register this table or met some error 
before, we need to reset the low watermark
             tableMetadataMap.computeIfAbsent(tid, t -> new 
TableMetadata()).lowWatermark =
                 Optional.of(currentOffset - 1);
           }
diff --git 
a/gobblin-iceberg/src/test/java/org/apache/gobblin/iceberg/writer/HiveMetadataWriterTest.java
 
b/gobblin-iceberg/src/test/java/org/apache/gobblin/iceberg/writer/HiveMetadataWriterTest.java
index 70a3a2b..d6906d3 100644
--- 
a/gobblin-iceberg/src/test/java/org/apache/gobblin/iceberg/writer/HiveMetadataWriterTest.java
+++ 
b/gobblin-iceberg/src/test/java/org/apache/gobblin/iceberg/writer/HiveMetadataWriterTest.java
@@ -177,8 +177,7 @@ public class HiveMetadataWriterTest extends 
HiveMetastoreTest {
     state.setProp("gmce.metadata.writer.classes", 
"org.apache.gobblin.hive.writer.HiveMetadataWriter");
     gobblinMCEWriter = new GobblinMCEWriter(new GobblinMCEWriterBuilder(), 
state);
   }
-
-  @Test(dependsOnGroups={"icebergMetadataWriterTest"})
+  @Test
   public void testHiveWriteAddFileGMCE() throws IOException {
     gobblinMCEWriter.writeEnvelope(new RecordEnvelope<>(gmce,
         new KafkaStreamingExtractor.KafkaWatermark(
@@ -215,7 +214,7 @@ public class HiveMetadataWriterTest extends 
HiveMetastoreTest {
 
   }
 
-  @Test(dependsOnMethods = {"testHiveWriteAddFileGMCE"})
+  @Test(dependsOnMethods = {"testHiveWriteAddFileGMCE"}, 
groups={"hiveMetadataWriterTest"})
   public void testHiveWriteRewriteFileGMCE() throws IOException {
     gmce.setTopicPartitionOffsetsRange(null);
     Map<String, String> registrationState = gmce.getRegistrationProperties();
diff --git 
a/gobblin-iceberg/src/test/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriterTest.java
 
b/gobblin-iceberg/src/test/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriterTest.java
index 8aba5e7..a1d9b18 100644
--- 
a/gobblin-iceberg/src/test/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriterTest.java
+++ 
b/gobblin-iceberg/src/test/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriterTest.java
@@ -188,7 +188,7 @@ public class IcebergMetadataWriterTest extends 
HiveMetastoreTest {
     return state;
   }
 
-  @Test
+  @Test(dependsOnGroups={"hiveMetadataWriterTest"})
   public void testWriteAddFileGMCE() throws IOException {
     // Creating a copy of gmce with static type in GenericRecord to work with 
writeEnvelop method
     // without risking running into type cast runtime error.
@@ -325,6 +325,40 @@ public class IcebergMetadataWriterTest extends 
HiveMetastoreTest {
     
Assert.assertEquals(table.properties().get("gmce.high.watermark.GobblinMetadataChangeEvent_test-1"),
 "45");
   }
 
+  @Test(dependsOnMethods={"testWriteAddFileGMCECompleteness"}, 
groups={"icebergMetadataWriterTest"})
+  public void testFaultTolerant() throws Exception {
+    // Set fault tolerant dataset number to be 1
+    gobblinMCEWriter.setMaxErrorDataset(1);
+    // Stop metaStore so write will fail
+    stopMetastore();
+    GenericRecord genericGmce = GenericData.get().deepCopy(gmce.getSchema(), 
gmce);
+    gobblinMCEWriter.writeEnvelope(new RecordEnvelope<>(genericGmce,
+        new KafkaStreamingExtractor.KafkaWatermark(
+            new 
KafkaPartition.Builder().withTopicName("GobblinMetadataChangeEvent_test").withId(1).build(),
+            new LongWatermark(51L))));
+    gobblinMCEWriter.writeEnvelope(new RecordEnvelope<>(genericGmce,
+        new KafkaStreamingExtractor.KafkaWatermark(
+            new 
KafkaPartition.Builder().withTopicName("GobblinMetadataChangeEvent_test").withId(1).build(),
+            new LongWatermark(52L))));
+    Assert.assertEquals(gobblinMCEWriter.getDatasetErrorMap().size(), 1);
+    Assert.assertEquals(gobblinMCEWriter.getDatasetErrorMap()
+        .get(new File(tmpDir, 
"data/tracking/testIcebergTable").getAbsolutePath())
+        .get("hivedb.testIcebergTable").lowWatermark, 50L);
+    Assert.assertEquals(gobblinMCEWriter.getDatasetErrorMap()
+        .get(new File(tmpDir, 
"data/tracking/testIcebergTable").getAbsolutePath())
+        .get("hivedb.testIcebergTable").highWatermark, 52L);
+    //We should not see exception as we have fault tolerant
+    gobblinMCEWriter.flush();
+    
gmce.getDatasetIdentifier().setNativeName("data/tracking/testFaultTolerant");
+    GenericRecord genericGmce_differentDb = 
GenericData.get().deepCopy(gmce.getSchema(), gmce);
+    Assert.expectThrows(IOException.class, () -> 
gobblinMCEWriter.writeEnvelope((new RecordEnvelope<>(genericGmce_differentDb,
+        new KafkaStreamingExtractor.KafkaWatermark(
+            new 
KafkaPartition.Builder().withTopicName("GobblinMetadataChangeEvent_test").withId(1).build(),
+            new LongWatermark(53L))))));
+    // restart metastore to make sure followiing test runs well
+    startMetastore();
+  }
+
   @Test(dependsOnMethods={"testChangeProperty"}, 
groups={"icebergMetadataWriterTest"})
   public void testWriteAddFileGMCECompleteness() throws IOException {
     // Creating a copy of gmce with static type in GenericRecord to work with 
writeEnvelop method
@@ -466,6 +500,9 @@ public class IcebergMetadataWriterTest extends 
HiveMetastoreTest {
       return Lists.newArrayList("hivedb");
     }
     protected List<String> getTableNames(Optional<String> dbPrefix, Path path) 
{
+      if (path.toString().contains("testFaultTolerant")) {
+        return Lists.newArrayList("testFaultTolerantIcebergTable");
+      }
       return Lists.newArrayList("testIcebergTable");
     }
   }

Reply via email to