This is an automated email from the ASF dual-hosted git repository.

zihanli58 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new a512f20  [GOBBLIN-1588] Send failure events for write failures when 
watermark is advanced in MCE writer (#3441)
a512f20 is described below

commit a512f20deddfdb37cfb15eca1d49e26da7ddc7d0
Author: Jack Moseley <[email protected]>
AuthorDate: Thu Jan 6 10:22:17 2022 -0800

    [GOBBLIN-1588] Send failure events for write failures when watermark is 
advanced in MCE writer (#3441)
    
    * Send failure events for write failures when watermark is advanced in MCE 
writer
    
    * Address comments
---
 .../gobblin/iceberg/writer/GobblinMCEWriter.java   | 50 ++++++++++++++++++----
 .../iceberg/writer/IcebergMCEMetadataKeys.java     |  3 ++
 .../iceberg/writer/IcebergMetadataWriterTest.java  | 38 +++++++++++++---
 3 files changed, 77 insertions(+), 14 deletions(-)

diff --git 
a/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/GobblinMCEWriter.java
 
b/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/GobblinMCEWriter.java
index 324ca85..9d96eed 100644
--- 
a/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/GobblinMCEWriter.java
+++ 
b/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/GobblinMCEWriter.java
@@ -29,13 +29,9 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 
-import lombok.AllArgsConstructor;
-import lombok.Setter;
 import org.apache.avro.Schema;
 import org.apache.avro.generic.GenericRecord;
 import org.apache.avro.specific.SpecificData;
-import org.apache.gobblin.source.extractor.CheckpointableWatermark;
-import org.apache.gobblin.source.extractor.extract.LongWatermark;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.serde2.avro.AvroSerdeUtils;
@@ -49,7 +45,9 @@ import com.google.common.collect.Iterables;
 import com.google.common.collect.Lists;
 import com.google.common.io.Closer;
 
+import lombok.AllArgsConstructor;
 import lombok.Getter;
+import lombok.Setter;
 import lombok.extern.slf4j.Slf4j;
 
 import org.apache.gobblin.configuration.ConfigurationKeys;
@@ -59,9 +57,15 @@ import org.apache.gobblin.hive.policy.HiveRegistrationPolicy;
 import org.apache.gobblin.hive.policy.HiveRegistrationPolicyBase;
 import org.apache.gobblin.hive.spec.HiveSpec;
 import org.apache.gobblin.hive.writer.MetadataWriter;
+import org.apache.gobblin.instrumented.Instrumented;
 import org.apache.gobblin.metadata.DataFile;
 import org.apache.gobblin.metadata.GobblinMetadataChangeEvent;
 import org.apache.gobblin.metadata.OperationType;
+import org.apache.gobblin.metrics.MetricContext;
+import org.apache.gobblin.metrics.event.EventSubmitter;
+import org.apache.gobblin.metrics.event.GobblinEventBuilder;
+import org.apache.gobblin.source.extractor.CheckpointableWatermark;
+import org.apache.gobblin.source.extractor.extract.LongWatermark;
 import org.apache.gobblin.stream.RecordEnvelope;
 import org.apache.gobblin.util.ClustersNames;
 import org.apache.gobblin.util.HadoopUtils;
@@ -84,6 +88,7 @@ import org.apache.gobblin.writer.DataWriterBuilder;
 @SuppressWarnings("UnstableApiUsage")
 @Slf4j
 public class GobblinMCEWriter implements DataWriter<GenericRecord> {
+  public static final String GOBBLIN_MCE_WRITER_METRIC_NAMESPACE = 
GobblinMCEWriter.class.getCanonicalName();
   public static final String DEFAULT_HIVE_REGISTRATION_POLICY_KEY = 
"default.hive.registration.policy";
   public static final String FORCE_HIVE_DATABASE_NAME = 
"force.hive.database.name";
   public static final String ACCEPTED_CLUSTER_NAMES = "accepted.cluster.names";
@@ -110,6 +115,7 @@ public class GobblinMCEWriter implements 
DataWriter<GenericRecord> {
   protected final AtomicLong recordCount = new AtomicLong(0L);
   @Setter
   private int maxErrorDataset;
+  protected EventSubmitter eventSubmitter;
 
   @AllArgsConstructor
   static class TableStatus {
@@ -136,6 +142,8 @@ public class GobblinMCEWriter implements 
DataWriter<GenericRecord> {
         FileSystem.get(HadoopUtils.getConfFromState(properties))));
     parallelRunnerTimeoutMills =
         state.getPropAsInt(METADATA_PARALLEL_RUNNER_TIMEOUT_MILLS, 
DEFAULT_ICEBERG_PARALLEL_TIMEOUT_MILLS);
+    MetricContext metricContext = Instrumented.getMetricContext(state, 
this.getClass());
+    eventSubmitter = new EventSubmitter.Builder(metricContext, 
GOBBLIN_MCE_WRITER_METRIC_NAMESPACE).build();
   }
 
   @Override
@@ -340,14 +348,14 @@ public class GobblinMCEWriter implements 
DataWriter<GenericRecord> {
         }
       }
     }
-    if (!meetException && 
datasetErrorMap.containsKey(tableOperationTypeMap.get(tableString).datasetPath)
-        && 
datasetErrorMap.get(tableOperationTypeMap.get(tableString).datasetPath).containsKey(tableString))
 {
+    String datasetPath = tableOperationTypeMap.get(tableString).datasetPath;
+    if (!meetException && datasetErrorMap.containsKey(datasetPath) && 
datasetErrorMap.get(datasetPath).containsKey(tableString)) {
       // We only want to emit GTE when the table watermark moves. There can be 
two scenario that watermark move, one is after one flush interval,
       // we commit new watermark to state store, anther is here, where during 
the flush interval, we flush table because table operation changes.
       // Under this condition, error map contains this dataset means we met 
error before this flush, but this time when flush succeed and
       // the watermark inside the table moves, so we want to emit GTE to 
indicate there is some data loss here
-      //todo: since we finish flush for this table once, need to emit GTE to 
indicate we miss data for this table
-      log.warn(String.format("Send GTE to indicate table flush failure for 
%s", tableString));
+      submitFailureEvent(datasetErrorMap.get(datasetPath).get(tableString));
+      this.datasetErrorMap.get(datasetPath).remove(tableString);
     }
   }
   /**
@@ -371,7 +379,13 @@ public class GobblinMCEWriter implements 
DataWriter<GenericRecord> {
     }
     tableOperationTypeMap.clear();
     recordCount.lazySet(0L);
-    //todo: after flush, watermark will move forward, so emit error GTE here
+    // Emit events for all current errors, since the GMCE watermark will be 
advanced
+    for (Map.Entry<String, Map<String, GobblinMetadataException>> entry : 
datasetErrorMap.entrySet()) {
+      for (GobblinMetadataException exception : entry.getValue().values()) {
+        submitFailureEvent(exception);
+      }
+      entry.getValue().clear();
+    }
   }
 
   @Override
@@ -420,4 +434,22 @@ public class GobblinMCEWriter implements 
DataWriter<GenericRecord> {
     }
     return tmpState;
   }
+
+  /**
+   * Submit event indicating that a specific set of GMCEs have been skipped, 
so there is a gap in the registration
+   */
+  private void submitFailureEvent(GobblinMetadataException exception) {
+    log.warn(String.format("Sending GTE to indicate table flush failure for 
%s.%s", exception.dbName, exception.tableName));
+
+    GobblinEventBuilder gobblinTrackingEvent = new 
GobblinEventBuilder(IcebergMCEMetadataKeys.METADATA_WRITER_FAILURE_EVENT);
+
+    gobblinTrackingEvent.addMetadata(IcebergMCEMetadataKeys.DATASET_HDFS_PATH, 
exception.datasetPath);
+    
gobblinTrackingEvent.addMetadata(IcebergMCEMetadataKeys.FAILURE_EVENT_DB_NAME, 
exception.dbName);
+    
gobblinTrackingEvent.addMetadata(IcebergMCEMetadataKeys.FAILURE_EVENT_TABLE_NAME,
 exception.tableName);
+    
gobblinTrackingEvent.addMetadata(IcebergMCEMetadataKeys.GMCE_TOPIC_PARTITION, 
exception.GMCETopicPartition);
+    
gobblinTrackingEvent.addMetadata(IcebergMCEMetadataKeys.GMCE_HIGH_WATERMARK, 
Long.toString(exception.highWatermark));
+    
gobblinTrackingEvent.addMetadata(IcebergMCEMetadataKeys.GMCE_LOW_WATERMARK, 
Long.toString(exception.lowWatermark));
+
+    eventSubmitter.submit(gobblinTrackingEvent);
+  }
 }
diff --git 
a/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/IcebergMCEMetadataKeys.java
 
b/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/IcebergMCEMetadataKeys.java
index 310c01f..a4eb3b1 100644
--- 
a/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/IcebergMCEMetadataKeys.java
+++ 
b/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/IcebergMCEMetadataKeys.java
@@ -20,6 +20,7 @@ package org.apache.gobblin.iceberg.writer;
 public class IcebergMCEMetadataKeys {
   public static final String METRICS_NAMESPACE_ICEBERG_WRITER = 
"IcebergWriter";
   public static final String ICEBERG_COMMIT_EVENT_NAME = 
"IcebergMetadataCommitEvent";
+  public static final String METADATA_WRITER_FAILURE_EVENT = 
"MetadataWriterFailureEvent";
   public static final String LAG_KEY_NAME = "endToEndLag";
   public static final String SNAPSHOT_KEY_NAME = "currentSnapshotId";
   public static final String MANIFEST_LOCATION = "currentManifestLocation";
@@ -31,6 +32,8 @@ public class IcebergMCEMetadataKeys {
   public static final String GMCE_HIGH_WATERMARK = "gmceHighWatermark";
   public static final String GMCE_LOW_WATERMARK = "gmceLowWatermark";
   public static final String DATASET_HDFS_PATH = "datasetHdfsPath";
+  public static final String FAILURE_EVENT_DB_NAME = "databaseName";
+  public static final String FAILURE_EVENT_TABLE_NAME = "tableName";
 
   private IcebergMCEMetadataKeys() {
   }
diff --git 
a/gobblin-iceberg/src/test/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriterTest.java
 
b/gobblin-iceberg/src/test/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriterTest.java
index 44c5514..c6e5ce5 100644
--- 
a/gobblin-iceberg/src/test/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriterTest.java
+++ 
b/gobblin-iceberg/src/test/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriterTest.java
@@ -19,6 +19,7 @@ package org.apache.gobblin.iceberg.writer;
 
 import java.io.File;
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
 import java.util.concurrent.TimeUnit;
@@ -61,6 +62,7 @@ import org.apache.gobblin.hive.HivePartition;
 import org.apache.gobblin.hive.HiveRegistrationUnit;
 import org.apache.gobblin.hive.HiveTable;
 import org.apache.gobblin.hive.policy.HiveRegistrationPolicyBase;
+import org.apache.gobblin.hive.writer.MetadataWriter;
 import org.apache.gobblin.metadata.DataFile;
 import org.apache.gobblin.metadata.DataMetrics;
 import org.apache.gobblin.metadata.DataOrigin;
@@ -68,6 +70,8 @@ import org.apache.gobblin.metadata.DatasetIdentifier;
 import org.apache.gobblin.metadata.GobblinMetadataChangeEvent;
 import org.apache.gobblin.metadata.OperationType;
 import org.apache.gobblin.metadata.SchemaSource;
+import org.apache.gobblin.metrics.event.EventSubmitter;
+import org.apache.gobblin.metrics.event.GobblinEventBuilder;
 import org.apache.gobblin.metrics.kafka.KafkaSchemaRegistry;
 import org.apache.gobblin.source.extractor.extract.LongWatermark;
 import org.apache.gobblin.source.extractor.extract.kafka.KafkaPartition;
@@ -105,6 +109,8 @@ public class IcebergMetadataWriterTest extends 
HiveMetastoreTest {
   static File hourlyDataFile_1;
   static File dailyDataFile;
 
+  List<GobblinEventBuilder> eventsSent = new ArrayList<>();
+
   @AfterClass
   public void clean() throws Exception {
     gobblinMCEWriter.close();
@@ -165,6 +171,10 @@ public class IcebergMetadataWriterTest extends 
HiveMetastoreTest {
 
     _avroPartitionSchema =
         
SchemaBuilder.record("partitionTest").fields().name("ds").type().optional().stringType().endRecord();
+
+    gobblinMCEWriter.eventSubmitter = Mockito.mock(EventSubmitter.class);
+    Mockito.doAnswer(invocation -> eventsSent.add(invocation.getArgumentAt(0, 
GobblinEventBuilder.class)))
+        
.when(gobblinMCEWriter.eventSubmitter).submit(Mockito.any(GobblinEventBuilder.class));
   }
 
   private State getState() {
@@ -330,8 +340,12 @@ public class IcebergMetadataWriterTest extends 
HiveMetastoreTest {
   public void testFaultTolerant() throws Exception {
     // Set fault tolerant dataset number to be 1
     gobblinMCEWriter.setMaxErrorDataset(1);
-    // Stop metaStore so write will fail
-    stopMetastore();
+
+    // Add a mock writer that always throws exception so that write will fail
+    MetadataWriter mockWriter = Mockito.mock(MetadataWriter.class);
+    Mockito.doThrow(new IOException("Test 
failure")).when(mockWriter).writeEnvelope(Mockito.any(), Mockito.any(), 
Mockito.any(), Mockito.any());
+    gobblinMCEWriter.metadataWriters.add(0, mockWriter);
+
     GenericRecord genericGmce = GenericData.get().deepCopy(gmce.getSchema(), 
gmce);
     gobblinMCEWriter.writeEnvelope(new RecordEnvelope<>(genericGmce,
         new KafkaStreamingExtractor.KafkaWatermark(
@@ -342,22 +356,36 @@ public class IcebergMetadataWriterTest extends 
HiveMetastoreTest {
             new 
KafkaPartition.Builder().withTopicName("GobblinMetadataChangeEvent_test").withId(1).build(),
             new LongWatermark(52L))));
     Assert.assertEquals(gobblinMCEWriter.getDatasetErrorMap().size(), 1);
+    
Assert.assertEquals(gobblinMCEWriter.getDatasetErrorMap().values().iterator().next().size(),
 1);
     Assert.assertEquals(gobblinMCEWriter.getDatasetErrorMap()
         .get(new File(tmpDir, 
"data/tracking/testIcebergTable").getAbsolutePath())
         .get("hivedb.testIcebergTable").lowWatermark, 50L);
     Assert.assertEquals(gobblinMCEWriter.getDatasetErrorMap()
         .get(new File(tmpDir, 
"data/tracking/testIcebergTable").getAbsolutePath())
         .get("hivedb.testIcebergTable").highWatermark, 52L);
+
+    // No events sent yet since the topic has not been flushed
+    Assert.assertEquals(eventsSent.size(), 0);
+
     //We should not see exception as we have fault tolerant
     gobblinMCEWriter.flush();
+
+    // Since this topic has been flushed, there should be an event sent for 
previous failure, and the table
+    // should be removed from the error map
+    Assert.assertEquals(eventsSent.size(), 1);
+    
Assert.assertEquals(eventsSent.get(0).getMetadata().get(IcebergMCEMetadataKeys.FAILURE_EVENT_TABLE_NAME),
 "testIcebergTable");
+    
Assert.assertEquals(eventsSent.get(0).getMetadata().get(IcebergMCEMetadataKeys.GMCE_LOW_WATERMARK),
 "50");
+    
Assert.assertEquals(eventsSent.get(0).getMetadata().get(IcebergMCEMetadataKeys.GMCE_HIGH_WATERMARK),
 "52");
+    
Assert.assertEquals(gobblinMCEWriter.getDatasetErrorMap().values().iterator().next().size(),
 0);
+
     
gmce.getDatasetIdentifier().setNativeName("data/tracking/testFaultTolerant");
     GenericRecord genericGmce_differentDb = 
GenericData.get().deepCopy(gmce.getSchema(), gmce);
     Assert.expectThrows(IOException.class, () -> 
gobblinMCEWriter.writeEnvelope((new RecordEnvelope<>(genericGmce_differentDb,
         new KafkaStreamingExtractor.KafkaWatermark(
             new 
KafkaPartition.Builder().withTopicName("GobblinMetadataChangeEvent_test").withId(1).build(),
-            new LongWatermark(53L))))));
-    // restart metastore to make sure followiing test runs well
-    startMetastore();
+            new LongWatermark(54L))))));
+
+    gobblinMCEWriter.metadataWriters.remove(0);
   }
 
   @Test(dependsOnMethods={"testChangeProperty"}, 
groups={"icebergMetadataWriterTest"})

Reply via email to