This is an automated email from the ASF dual-hosted git repository.
zihanli58 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new a512f20 [GOBBLIN-1588] Send failure events for write failures when
watermark is advanced in MCE writer (#3441)
a512f20 is described below
commit a512f20deddfdb37cfb15eca1d49e26da7ddc7d0
Author: Jack Moseley <[email protected]>
AuthorDate: Thu Jan 6 10:22:17 2022 -0800
[GOBBLIN-1588] Send failure events for write failures when watermark is
advanced in MCE writer (#3441)
* Send failure events for write failures when watermark is advanced in MCE
writer
* Address comments
---
.../gobblin/iceberg/writer/GobblinMCEWriter.java | 50 ++++++++++++++++++----
.../iceberg/writer/IcebergMCEMetadataKeys.java | 3 ++
.../iceberg/writer/IcebergMetadataWriterTest.java | 38 +++++++++++++---
3 files changed, 77 insertions(+), 14 deletions(-)
diff --git
a/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/GobblinMCEWriter.java
b/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/GobblinMCEWriter.java
index 324ca85..9d96eed 100644
---
a/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/GobblinMCEWriter.java
+++
b/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/GobblinMCEWriter.java
@@ -29,13 +29,9 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
-import lombok.AllArgsConstructor;
-import lombok.Setter;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.specific.SpecificData;
-import org.apache.gobblin.source.extractor.CheckpointableWatermark;
-import org.apache.gobblin.source.extractor.extract.LongWatermark;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.serde2.avro.AvroSerdeUtils;
@@ -49,7 +45,9 @@ import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.io.Closer;
+import lombok.AllArgsConstructor;
import lombok.Getter;
+import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import org.apache.gobblin.configuration.ConfigurationKeys;
@@ -59,9 +57,15 @@ import org.apache.gobblin.hive.policy.HiveRegistrationPolicy;
import org.apache.gobblin.hive.policy.HiveRegistrationPolicyBase;
import org.apache.gobblin.hive.spec.HiveSpec;
import org.apache.gobblin.hive.writer.MetadataWriter;
+import org.apache.gobblin.instrumented.Instrumented;
import org.apache.gobblin.metadata.DataFile;
import org.apache.gobblin.metadata.GobblinMetadataChangeEvent;
import org.apache.gobblin.metadata.OperationType;
+import org.apache.gobblin.metrics.MetricContext;
+import org.apache.gobblin.metrics.event.EventSubmitter;
+import org.apache.gobblin.metrics.event.GobblinEventBuilder;
+import org.apache.gobblin.source.extractor.CheckpointableWatermark;
+import org.apache.gobblin.source.extractor.extract.LongWatermark;
import org.apache.gobblin.stream.RecordEnvelope;
import org.apache.gobblin.util.ClustersNames;
import org.apache.gobblin.util.HadoopUtils;
@@ -84,6 +88,7 @@ import org.apache.gobblin.writer.DataWriterBuilder;
@SuppressWarnings("UnstableApiUsage")
@Slf4j
public class GobblinMCEWriter implements DataWriter<GenericRecord> {
+ public static final String GOBBLIN_MCE_WRITER_METRIC_NAMESPACE =
GobblinMCEWriter.class.getCanonicalName();
public static final String DEFAULT_HIVE_REGISTRATION_POLICY_KEY =
"default.hive.registration.policy";
public static final String FORCE_HIVE_DATABASE_NAME =
"force.hive.database.name";
public static final String ACCEPTED_CLUSTER_NAMES = "accepted.cluster.names";
@@ -110,6 +115,7 @@ public class GobblinMCEWriter implements
DataWriter<GenericRecord> {
protected final AtomicLong recordCount = new AtomicLong(0L);
@Setter
private int maxErrorDataset;
+ protected EventSubmitter eventSubmitter;
@AllArgsConstructor
static class TableStatus {
@@ -136,6 +142,8 @@ public class GobblinMCEWriter implements
DataWriter<GenericRecord> {
FileSystem.get(HadoopUtils.getConfFromState(properties))));
parallelRunnerTimeoutMills =
state.getPropAsInt(METADATA_PARALLEL_RUNNER_TIMEOUT_MILLS,
DEFAULT_ICEBERG_PARALLEL_TIMEOUT_MILLS);
+ MetricContext metricContext = Instrumented.getMetricContext(state,
this.getClass());
+ eventSubmitter = new EventSubmitter.Builder(metricContext,
GOBBLIN_MCE_WRITER_METRIC_NAMESPACE).build();
}
@Override
@@ -340,14 +348,14 @@ public class GobblinMCEWriter implements
DataWriter<GenericRecord> {
}
}
}
- if (!meetException &&
datasetErrorMap.containsKey(tableOperationTypeMap.get(tableString).datasetPath)
- &&
datasetErrorMap.get(tableOperationTypeMap.get(tableString).datasetPath).containsKey(tableString))
{
+ String datasetPath = tableOperationTypeMap.get(tableString).datasetPath;
+ if (!meetException && datasetErrorMap.containsKey(datasetPath) &&
datasetErrorMap.get(datasetPath).containsKey(tableString)) {
// We only want to emit GTE when the table watermark moves. There can be
two scenario that watermark move, one is after one flush interval,
// we commit new watermark to state store, anther is here, where during
the flush interval, we flush table because table operation changes.
// Under this condition, error map contains this dataset means we met
error before this flush, but this time when flush succeed and
// the watermark inside the table moves, so we want to emit GTE to
indicate there is some data loss here
- //todo: since we finish flush for this table once, need to emit GTE to
indicate we miss data for this table
- log.warn(String.format("Send GTE to indicate table flush failure for
%s", tableString));
+ submitFailureEvent(datasetErrorMap.get(datasetPath).get(tableString));
+ this.datasetErrorMap.get(datasetPath).remove(tableString);
}
}
/**
@@ -371,7 +379,13 @@ public class GobblinMCEWriter implements
DataWriter<GenericRecord> {
}
tableOperationTypeMap.clear();
recordCount.lazySet(0L);
- //todo: after flush, watermark will move forward, so emit error GTE here
+ // Emit events for all current errors, since the GMCE watermark will be
advanced
+ for (Map.Entry<String, Map<String, GobblinMetadataException>> entry :
datasetErrorMap.entrySet()) {
+ for (GobblinMetadataException exception : entry.getValue().values()) {
+ submitFailureEvent(exception);
+ }
+ entry.getValue().clear();
+ }
}
@Override
@@ -420,4 +434,22 @@ public class GobblinMCEWriter implements
DataWriter<GenericRecord> {
}
return tmpState;
}
+
+ /**
+ * Submit event indicating that a specific set of GMCEs have been skipped,
so there is a gap in the registration
+ */
+ private void submitFailureEvent(GobblinMetadataException exception) {
+ log.warn(String.format("Sending GTE to indicate table flush failure for
%s.%s", exception.dbName, exception.tableName));
+
+ GobblinEventBuilder gobblinTrackingEvent = new
GobblinEventBuilder(IcebergMCEMetadataKeys.METADATA_WRITER_FAILURE_EVENT);
+
+ gobblinTrackingEvent.addMetadata(IcebergMCEMetadataKeys.DATASET_HDFS_PATH,
exception.datasetPath);
+
gobblinTrackingEvent.addMetadata(IcebergMCEMetadataKeys.FAILURE_EVENT_DB_NAME,
exception.dbName);
+
gobblinTrackingEvent.addMetadata(IcebergMCEMetadataKeys.FAILURE_EVENT_TABLE_NAME,
exception.tableName);
+
gobblinTrackingEvent.addMetadata(IcebergMCEMetadataKeys.GMCE_TOPIC_PARTITION,
exception.GMCETopicPartition);
+
gobblinTrackingEvent.addMetadata(IcebergMCEMetadataKeys.GMCE_HIGH_WATERMARK,
Long.toString(exception.highWatermark));
+
gobblinTrackingEvent.addMetadata(IcebergMCEMetadataKeys.GMCE_LOW_WATERMARK,
Long.toString(exception.lowWatermark));
+
+ eventSubmitter.submit(gobblinTrackingEvent);
+ }
}
diff --git
a/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/IcebergMCEMetadataKeys.java
b/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/IcebergMCEMetadataKeys.java
index 310c01f..a4eb3b1 100644
---
a/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/IcebergMCEMetadataKeys.java
+++
b/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/IcebergMCEMetadataKeys.java
@@ -20,6 +20,7 @@ package org.apache.gobblin.iceberg.writer;
public class IcebergMCEMetadataKeys {
public static final String METRICS_NAMESPACE_ICEBERG_WRITER =
"IcebergWriter";
public static final String ICEBERG_COMMIT_EVENT_NAME =
"IcebergMetadataCommitEvent";
+ public static final String METADATA_WRITER_FAILURE_EVENT =
"MetadataWriterFailureEvent";
public static final String LAG_KEY_NAME = "endToEndLag";
public static final String SNAPSHOT_KEY_NAME = "currentSnapshotId";
public static final String MANIFEST_LOCATION = "currentManifestLocation";
@@ -31,6 +32,8 @@ public class IcebergMCEMetadataKeys {
public static final String GMCE_HIGH_WATERMARK = "gmceHighWatermark";
public static final String GMCE_LOW_WATERMARK = "gmceLowWatermark";
public static final String DATASET_HDFS_PATH = "datasetHdfsPath";
+ public static final String FAILURE_EVENT_DB_NAME = "databaseName";
+ public static final String FAILURE_EVENT_TABLE_NAME = "tableName";
private IcebergMCEMetadataKeys() {
}
diff --git
a/gobblin-iceberg/src/test/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriterTest.java
b/gobblin-iceberg/src/test/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriterTest.java
index 44c5514..c6e5ce5 100644
---
a/gobblin-iceberg/src/test/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriterTest.java
+++
b/gobblin-iceberg/src/test/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriterTest.java
@@ -19,6 +19,7 @@ package org.apache.gobblin.iceberg.writer;
import java.io.File;
import java.io.IOException;
+import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.TimeUnit;
@@ -61,6 +62,7 @@ import org.apache.gobblin.hive.HivePartition;
import org.apache.gobblin.hive.HiveRegistrationUnit;
import org.apache.gobblin.hive.HiveTable;
import org.apache.gobblin.hive.policy.HiveRegistrationPolicyBase;
+import org.apache.gobblin.hive.writer.MetadataWriter;
import org.apache.gobblin.metadata.DataFile;
import org.apache.gobblin.metadata.DataMetrics;
import org.apache.gobblin.metadata.DataOrigin;
@@ -68,6 +70,8 @@ import org.apache.gobblin.metadata.DatasetIdentifier;
import org.apache.gobblin.metadata.GobblinMetadataChangeEvent;
import org.apache.gobblin.metadata.OperationType;
import org.apache.gobblin.metadata.SchemaSource;
+import org.apache.gobblin.metrics.event.EventSubmitter;
+import org.apache.gobblin.metrics.event.GobblinEventBuilder;
import org.apache.gobblin.metrics.kafka.KafkaSchemaRegistry;
import org.apache.gobblin.source.extractor.extract.LongWatermark;
import org.apache.gobblin.source.extractor.extract.kafka.KafkaPartition;
@@ -105,6 +109,8 @@ public class IcebergMetadataWriterTest extends
HiveMetastoreTest {
static File hourlyDataFile_1;
static File dailyDataFile;
+ List<GobblinEventBuilder> eventsSent = new ArrayList<>();
+
@AfterClass
public void clean() throws Exception {
gobblinMCEWriter.close();
@@ -165,6 +171,10 @@ public class IcebergMetadataWriterTest extends
HiveMetastoreTest {
_avroPartitionSchema =
SchemaBuilder.record("partitionTest").fields().name("ds").type().optional().stringType().endRecord();
+
+ gobblinMCEWriter.eventSubmitter = Mockito.mock(EventSubmitter.class);
+ Mockito.doAnswer(invocation -> eventsSent.add(invocation.getArgumentAt(0,
GobblinEventBuilder.class)))
+
.when(gobblinMCEWriter.eventSubmitter).submit(Mockito.any(GobblinEventBuilder.class));
}
private State getState() {
@@ -330,8 +340,12 @@ public class IcebergMetadataWriterTest extends
HiveMetastoreTest {
public void testFaultTolerant() throws Exception {
// Set fault tolerant dataset number to be 1
gobblinMCEWriter.setMaxErrorDataset(1);
- // Stop metaStore so write will fail
- stopMetastore();
+
+ // Add a mock writer that always throws exception so that write will fail
+ MetadataWriter mockWriter = Mockito.mock(MetadataWriter.class);
+ Mockito.doThrow(new IOException("Test
failure")).when(mockWriter).writeEnvelope(Mockito.any(), Mockito.any(),
Mockito.any(), Mockito.any());
+ gobblinMCEWriter.metadataWriters.add(0, mockWriter);
+
GenericRecord genericGmce = GenericData.get().deepCopy(gmce.getSchema(),
gmce);
gobblinMCEWriter.writeEnvelope(new RecordEnvelope<>(genericGmce,
new KafkaStreamingExtractor.KafkaWatermark(
@@ -342,22 +356,36 @@ public class IcebergMetadataWriterTest extends
HiveMetastoreTest {
new
KafkaPartition.Builder().withTopicName("GobblinMetadataChangeEvent_test").withId(1).build(),
new LongWatermark(52L))));
Assert.assertEquals(gobblinMCEWriter.getDatasetErrorMap().size(), 1);
+
Assert.assertEquals(gobblinMCEWriter.getDatasetErrorMap().values().iterator().next().size(),
1);
Assert.assertEquals(gobblinMCEWriter.getDatasetErrorMap()
.get(new File(tmpDir,
"data/tracking/testIcebergTable").getAbsolutePath())
.get("hivedb.testIcebergTable").lowWatermark, 50L);
Assert.assertEquals(gobblinMCEWriter.getDatasetErrorMap()
.get(new File(tmpDir,
"data/tracking/testIcebergTable").getAbsolutePath())
.get("hivedb.testIcebergTable").highWatermark, 52L);
+
+ // No events sent yet since the topic has not been flushed
+ Assert.assertEquals(eventsSent.size(), 0);
+
//We should not see exception as we have fault tolerant
gobblinMCEWriter.flush();
+
+ // Since this topic has been flushed, there should be an event sent for
previous failure, and the table
+ // should be removed from the error map
+ Assert.assertEquals(eventsSent.size(), 1);
+
Assert.assertEquals(eventsSent.get(0).getMetadata().get(IcebergMCEMetadataKeys.FAILURE_EVENT_TABLE_NAME),
"testIcebergTable");
+
Assert.assertEquals(eventsSent.get(0).getMetadata().get(IcebergMCEMetadataKeys.GMCE_LOW_WATERMARK),
"50");
+
Assert.assertEquals(eventsSent.get(0).getMetadata().get(IcebergMCEMetadataKeys.GMCE_HIGH_WATERMARK),
"52");
+
Assert.assertEquals(gobblinMCEWriter.getDatasetErrorMap().values().iterator().next().size(),
0);
+
gmce.getDatasetIdentifier().setNativeName("data/tracking/testFaultTolerant");
GenericRecord genericGmce_differentDb =
GenericData.get().deepCopy(gmce.getSchema(), gmce);
Assert.expectThrows(IOException.class, () ->
gobblinMCEWriter.writeEnvelope((new RecordEnvelope<>(genericGmce_differentDb,
new KafkaStreamingExtractor.KafkaWatermark(
new
KafkaPartition.Builder().withTopicName("GobblinMetadataChangeEvent_test").withId(1).build(),
- new LongWatermark(53L))))));
- // restart metastore to make sure followiing test runs well
- startMetastore();
+ new LongWatermark(54L))))));
+
+ gobblinMCEWriter.metadataWriters.remove(0);
}
@Test(dependsOnMethods={"testChangeProperty"},
groups={"icebergMetadataWriterTest"})