This is an automated email from the ASF dual-hosted git repository.
zihanli58 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 3f22465 Add/fix some fields of MetadataWriterFailureEvent (#3485)
3f22465 is described below
commit 3f2246575e63538bccf24c83756bd45dabf82116
Author: Jack Moseley <[email protected]>
AuthorDate: Tue Mar 29 14:16:32 2022 -0700
Add/fix some fields of MetadataWriterFailureEvent (#3485)
---
.../org/apache/gobblin/iceberg/writer/GobblinMCEWriter.java | 11 +++++++++--
.../apache/gobblin/iceberg/writer/IcebergMCEMetadataKeys.java | 2 ++
.../apache/gobblin/iceberg/writer/IcebergMetadataWriter.java | 3 +--
3 files changed, 12 insertions(+), 4 deletions(-)
diff --git
a/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/GobblinMCEWriter.java
b/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/GobblinMCEWriter.java
index 86d0b5b..4dcefe7 100644
---
a/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/GobblinMCEWriter.java
+++
b/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/GobblinMCEWriter.java
@@ -62,6 +62,7 @@ import org.apache.gobblin.metadata.DataFile;
import org.apache.gobblin.metadata.GobblinMetadataChangeEvent;
import org.apache.gobblin.metadata.OperationType;
import org.apache.gobblin.metrics.MetricContext;
+import org.apache.gobblin.metrics.Tag;
import org.apache.gobblin.metrics.event.EventSubmitter;
import org.apache.gobblin.metrics.event.GobblinEventBuilder;
import org.apache.gobblin.source.extractor.CheckpointableWatermark;
@@ -142,7 +143,10 @@ public class GobblinMCEWriter implements
DataWriter<GenericRecord> {
FileSystem.get(HadoopUtils.getConfFromState(properties))));
parallelRunnerTimeoutMills =
state.getPropAsInt(METADATA_PARALLEL_RUNNER_TIMEOUT_MILLS,
DEFAULT_ICEBERG_PARALLEL_TIMEOUT_MILLS);
- MetricContext metricContext = Instrumented.getMetricContext(state,
this.getClass());
+ List<Tag<?>> tags = Lists.newArrayList();
+ String clusterIdentifier = ClustersNames.getInstance().getClusterName();
+ tags.add(new Tag<>(IcebergMCEMetadataKeys.CLUSTER_IDENTIFIER_KEY_NAME,
clusterIdentifier));
+ MetricContext metricContext = Instrumented.getMetricContext(state,
this.getClass(), tags);
eventSubmitter = new EventSubmitter.Builder(metricContext,
GOBBLIN_MCE_WRITER_METRIC_NAMESPACE).build();
}
@@ -446,9 +450,12 @@ public class GobblinMCEWriter implements
DataWriter<GenericRecord> {
gobblinTrackingEvent.addMetadata(IcebergMCEMetadataKeys.DATASET_HDFS_PATH,
exception.datasetPath);
gobblinTrackingEvent.addMetadata(IcebergMCEMetadataKeys.FAILURE_EVENT_DB_NAME,
exception.dbName);
gobblinTrackingEvent.addMetadata(IcebergMCEMetadataKeys.FAILURE_EVENT_TABLE_NAME,
exception.tableName);
-
gobblinTrackingEvent.addMetadata(IcebergMCEMetadataKeys.GMCE_TOPIC_PARTITION,
exception.GMCETopicPartition);
+ gobblinTrackingEvent.addMetadata(IcebergMCEMetadataKeys.GMCE_TOPIC_NAME,
exception.GMCETopicPartition.split("-")[0]);
+
gobblinTrackingEvent.addMetadata(IcebergMCEMetadataKeys.GMCE_TOPIC_PARTITION,
exception.GMCETopicPartition.split("-")[1]);
gobblinTrackingEvent.addMetadata(IcebergMCEMetadataKeys.GMCE_HIGH_WATERMARK,
Long.toString(exception.highWatermark));
gobblinTrackingEvent.addMetadata(IcebergMCEMetadataKeys.GMCE_LOW_WATERMARK,
Long.toString(exception.lowWatermark));
+ String message = exception.getCause() == null ? exception.getMessage() :
exception.getCause().getMessage();
+
gobblinTrackingEvent.addMetadata(IcebergMCEMetadataKeys.EXCEPTION_MESSAGE_KEY_NAME,
message);
eventSubmitter.submit(gobblinTrackingEvent);
}
diff --git
a/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/IcebergMCEMetadataKeys.java
b/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/IcebergMCEMetadataKeys.java
index a4eb3b1..f925b36 100644
---
a/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/IcebergMCEMetadataKeys.java
+++
b/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/IcebergMCEMetadataKeys.java
@@ -34,6 +34,8 @@ public class IcebergMCEMetadataKeys {
public static final String DATASET_HDFS_PATH = "datasetHdfsPath";
public static final String FAILURE_EVENT_DB_NAME = "databaseName";
public static final String FAILURE_EVENT_TABLE_NAME = "tableName";
+ public static final String CLUSTER_IDENTIFIER_KEY_NAME = "clusterIdentifier";
+ public static final String EXCEPTION_MESSAGE_KEY_NAME = "exceptionMessage";
private IcebergMCEMetadataKeys() {
}
diff --git
a/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriter.java
b/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriter.java
index df5739e..54e4406 100644
---
a/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriter.java
+++
b/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriter.java
@@ -144,7 +144,6 @@ public class IcebergMetadataWriter implements
MetadataWriter {
private static final String ICEBERG_REGISTRATION_BLACKLIST =
"iceberg.registration.blacklist";
private static final String ICEBERG_REGISTRATION_WHITELIST =
"iceberg.registration.whitelist";
private static final String ICEBERG_METADATA_FILE_PERMISSION =
"iceberg.metadata.file.permission";
- private static final String CLUSTER_IDENTIFIER_KEY_NAME =
"clusterIdentifier";
private static final String CREATE_TABLE_TIME = "iceberg.create.table.time";
private static final String SCHEMA_CREATION_TIME_KEY =
"schema.creation.time";
private static final String ADDED_FILES_CACHE_EXPIRING_TIME =
"added.files.cache.expiring.time";
@@ -200,7 +199,7 @@ public class IcebergMetadataWriter implements
MetadataWriter {
tableCurrentWatermarkMap = new HashMap<>();
List<Tag<?>> tags = Lists.newArrayList();
String clusterIdentifier = ClustersNames.getInstance().getClusterName();
- tags.add(new Tag<>(CLUSTER_IDENTIFIER_KEY_NAME, clusterIdentifier));
+ tags.add(new Tag<>(IcebergMCEMetadataKeys.CLUSTER_IDENTIFIER_KEY_NAME,
clusterIdentifier));
metricContext = closer.register(
GobblinMetricsRegistry.getInstance().getMetricContext(state,
IcebergMetadataWriter.class, tags));
this.eventSubmitter =