This is an automated email from the ASF dual-hosted git repository.

zihanli58 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 48139492b [GOBBLIN-1842] Add timers to GobblinMCEWriter (#3703)
48139492b is described below

commit 48139492b3d27c8514e00c3dd2c954a0cac72cd7
Author: Jack Moseley <[email protected]>
AuthorDate: Wed Jun 28 10:19:43 2023 -0700

    [GOBBLIN-1842] Add timers to GobblinMCEWriter (#3703)
    
    * Add timers to GobblinMCEWriter
    
    * Add dataset level timers and more logs in flush
    
    * Fix unit tests
---
 .../gobblin/hive/writer/HiveMetadataWriter.java    | 10 ++-
 .../gobblin/iceberg/writer/GobblinMCEWriter.java   | 89 ++++++++++++++++------
 .../iceberg/writer/IcebergMetadataWriter.java      | 18 +++--
 .../iceberg/writer/IcebergMetadataWriterTest.java  |  4 +
 4 files changed, 89 insertions(+), 32 deletions(-)

diff --git 
a/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/writer/HiveMetadataWriter.java
 
b/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/writer/HiveMetadataWriter.java
index 8400dd94b..02c0c5895 100644
--- 
a/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/writer/HiveMetadataWriter.java
+++ 
b/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/writer/HiveMetadataWriter.java
@@ -17,6 +17,7 @@
 
 package org.apache.gobblin.hive.writer;
 
+import com.codahale.metrics.Timer;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Joiner;
 import com.google.common.base.Optional;
@@ -160,8 +161,9 @@ public class HiveMetadataWriter implements MetadataWriter {
       HashMap<List<String>, ListenableFuture<Void>> executionMap = 
this.currentExecutionMap.get(tableKey);
       //iterator all execution to get the result to make sure they all 
succeeded
       for (HashMap.Entry<List<String>, ListenableFuture<Void>> execution : 
executionMap.entrySet()) {
-        try {
+        try (Timer.Context context = new Timer().time()) {
           execution.getValue().get(timeOutSeconds, TimeUnit.SECONDS);
+          log.info("Time taken to add partition to table {} is {} ms", 
tableKey, TimeUnit.NANOSECONDS.toMillis(context.stop()));
         } catch (TimeoutException e) {
           // Since TimeoutException should always be a transient issue, throw 
RuntimeException which will fail/retry container
           throw new RuntimeException("Timeout waiting for result of 
registration for table " + tableKey, e);
@@ -177,7 +179,11 @@ public class HiveMetadataWriter implements MetadataWriter {
         if (cache != null) {
           HiveSpec hiveSpec = cache.getIfPresent(execution.getKey());
           if (hiveSpec != null) {
-            eventSubmitter.submit(buildCommitEvent(dbName, tableName, 
execution.getKey(), hiveSpec, HivePartitionOperation.ADD_OR_MODIFY));
+            try (Timer.Context context = new Timer().time()) {
+              eventSubmitter.submit(buildCommitEvent(dbName, tableName, 
execution.getKey(), hiveSpec,
+                  HivePartitionOperation.ADD_OR_MODIFY));
+              log.info("Time taken to submit event for table {} is {} ms", 
tableKey, TimeUnit.NANOSECONDS.toMillis(context.stop()));
+            }
           }
         }
       }
diff --git 
a/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/GobblinMCEWriter.java
 
b/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/GobblinMCEWriter.java
index d2d72a969..559874056 100644
--- 
a/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/GobblinMCEWriter.java
+++ 
b/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/GobblinMCEWriter.java
@@ -38,6 +38,8 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.serde2.avro.AvroSerdeUtils;
 import org.apache.commons.collections.CollectionUtils;
+
+import com.codahale.metrics.Timer;
 import com.google.common.annotations.VisibleForTesting;
 
 import com.google.common.base.Joiner;
@@ -69,6 +71,7 @@ import org.apache.gobblin.instrumented.Instrumented;
 import org.apache.gobblin.metadata.DataFile;
 import org.apache.gobblin.metadata.GobblinMetadataChangeEvent;
 import org.apache.gobblin.metadata.OperationType;
+import org.apache.gobblin.metrics.ContextAwareTimer;
 import org.apache.gobblin.metrics.MetricContext;
 import org.apache.gobblin.metrics.Tag;
 import org.apache.gobblin.metrics.event.EventSubmitter;
@@ -128,9 +131,17 @@ public class GobblinMCEWriter implements 
DataWriter<GenericRecord> {
   private final Set<String> currentErrorDatasets = new HashSet<>();
   @Setter
   private int maxErrorDataset;
+  @VisibleForTesting
+  public final MetricContext metricContext;
   protected EventSubmitter eventSubmitter;
   private final Set<String> transientExceptionMessages;
   private final Set<String> nonTransientExceptionMessages;
+  @VisibleForTesting
+  public final Map<String, ContextAwareTimer> metadataWriterWriteTimers = new 
HashMap<>();
+  @VisibleForTesting
+  public final Map<String, ContextAwareTimer> metadataWriterFlushTimers = new 
HashMap<>();
+  private final ContextAwareTimer hiveSpecComputationTimer;
+  private final Map<String, ContextAwareTimer> datasetTimers = new HashMap<>();
 
   @AllArgsConstructor
   static class TableStatus {
@@ -150,19 +161,22 @@ public class GobblinMCEWriter implements 
DataWriter<GenericRecord> {
     acceptedClusters = properties.getPropAsSet(ACCEPTED_CLUSTER_NAMES, 
ClustersNames.getInstance().getClusterName());
     state = properties;
     maxErrorDataset = 
state.getPropAsInt(GMCE_METADATA_WRITER_MAX_ERROR_DATASET, 
DEFUALT_GMCE_METADATA_WRITER_MAX_ERROR_DATASET);
+    List<Tag<?>> tags = Lists.newArrayList();
+    String clusterIdentifier = ClustersNames.getInstance().getClusterName();
+    tags.add(new Tag<>(MetadataWriterKeys.CLUSTER_IDENTIFIER_KEY_NAME, 
clusterIdentifier));
+    metricContext = Instrumented.getMetricContext(state, this.getClass(), 
tags);
+    eventSubmitter = new EventSubmitter.Builder(metricContext, 
GOBBLIN_MCE_WRITER_METRIC_NAMESPACE).build();
     for (String className : state.getPropAsList(GMCE_METADATA_WRITER_CLASSES, 
IcebergMetadataWriter.class.getName())) {
       
metadataWriters.add(closer.register(GobblinConstructorUtils.invokeConstructor(MetadataWriter.class,
 className, state)));
+      metadataWriterWriteTimers.put(className, 
metricContext.contextAwareTimer(className + ".write", 1, TimeUnit.HOURS));
+      metadataWriterFlushTimers.put(className, 
metricContext.contextAwareTimer(className + ".flush", 1, TimeUnit.HOURS));
     }
+    hiveSpecComputationTimer = 
metricContext.contextAwareTimer("hiveSpec.computation", 1, TimeUnit.HOURS);
     tableOperationTypeMap = new HashMap<>();
     parallelRunner = closer.register(new 
ParallelRunner(state.getPropAsInt(METADATA_REGISTRATION_THREADS, 20),
         FileSystem.get(HadoopUtils.getConfFromState(properties))));
     parallelRunnerTimeoutMills =
         state.getPropAsInt(METADATA_PARALLEL_RUNNER_TIMEOUT_MILLS, 
DEFAULT_ICEBERG_PARALLEL_TIMEOUT_MILLS);
-    List<Tag<?>> tags = Lists.newArrayList();
-    String clusterIdentifier = ClustersNames.getInstance().getClusterName();
-    tags.add(new Tag<>(MetadataWriterKeys.CLUSTER_IDENTIFIER_KEY_NAME, 
clusterIdentifier));
-    MetricContext metricContext = Instrumented.getMetricContext(state, 
this.getClass(), tags);
-    eventSubmitter = new EventSubmitter.Builder(metricContext, 
GOBBLIN_MCE_WRITER_METRIC_NAMESPACE).build();
     transientExceptionMessages = new 
HashSet<>(properties.getPropAsList(TRANSIENT_EXCEPTION_MESSAGES_KEY, ""));
     nonTransientExceptionMessages = new 
HashSet<>(properties.getPropAsList(NON_TRANSIENT_EXCEPTION_MESSAGES_KEY, ""));
   }
@@ -187,26 +201,28 @@ public class GobblinMCEWriter implements 
DataWriter<GenericRecord> {
    */
   private void computeSpecMap(List<String> files, ConcurrentHashMap<String, 
Collection<HiveSpec>> specsMap,
       Cache<String, Collection<HiveSpec>> cache, State registerState, boolean 
isPrefix) throws IOException {
-    HiveRegistrationPolicy policy = 
HiveRegistrationPolicyBase.getPolicy(registerState);
-    for (String file : files) {
-      parallelRunner.submitCallable(new Callable<Void>() {
-        @Override
-        public Void call() throws Exception {
-          try {
-            Path regPath = isPrefix ? new Path(file) : new 
Path(file).getParent();
-            //Use raw path to comply with HDFS federation setting.
-            Path rawPath = new Path(regPath.toUri().getRawPath());
-            specsMap.put(regPath.toString(), cache.get(regPath.toString(), () 
-> policy.getHiveSpecs(rawPath)));
-          } catch (Throwable e) {
-            //todo: Emit failed GMCE in the future to easily track the error 
gmce and investigate the reason for that.
-            log.warn("Cannot get Hive Spec for {} using policy {} due to:", 
file, policy.toString());
-            log.warn(e.getMessage());
+    try (Timer.Context context = hiveSpecComputationTimer.time()) {
+      HiveRegistrationPolicy policy = 
HiveRegistrationPolicyBase.getPolicy(registerState);
+      for (String file : files) {
+        parallelRunner.submitCallable(new Callable<Void>() {
+          @Override
+          public Void call() throws Exception {
+            try {
+              Path regPath = isPrefix ? new Path(file) : new 
Path(file).getParent();
+              //Use raw path to comply with HDFS federation setting.
+              Path rawPath = new Path(regPath.toUri().getRawPath());
+              specsMap.put(regPath.toString(), cache.get(regPath.toString(), 
() -> policy.getHiveSpecs(rawPath)));
+            } catch (Throwable e) {
+              //todo: Emit failed GMCE in the future to easily track the error 
gmce and investigate the reason for that.
+              log.warn("Cannot get Hive Spec for {} using policy {} due to:", 
file, policy.toString());
+              log.warn(e.getMessage());
+            }
+            return null;
           }
-          return null;
-        }
-      }, file);
+        }, file);
+      }
+      parallelRunner.waitForTasks(parallelRunnerTimeoutMills);
     }
-    parallelRunner.waitForTasks(parallelRunnerTimeoutMills);
   }
 
   @Override
@@ -341,7 +357,12 @@ public class GobblinMCEWriter implements 
DataWriter<GenericRecord> {
         writer.reset(dbName, tableName);
       } else {
         try {
-          writer.writeEnvelope(recordEnvelope, newSpecsMap, oldSpecsMap, spec);
+          Timer writeTimer = 
metadataWriterWriteTimers.get(writer.getClass().getName());
+          Timer datasetTimer = datasetTimers.computeIfAbsent(tableName, k -> 
metricContext.contextAwareTimer(k, 1, TimeUnit.HOURS));
+          try (Timer.Context writeContext = writeTimer.time();
+              Timer.Context datasetContext = datasetTimer.time()) {
+            writer.writeEnvelope(recordEnvelope, newSpecsMap, oldSpecsMap, 
spec);
+          }
         } catch (Exception e) {
           if (exceptionMatches(e, transientExceptionMessages)) {
             throw new RuntimeException("Failing container due to transient 
exception for db: " + dbName + " table: " + tableName, e);
@@ -419,7 +440,12 @@ public class GobblinMCEWriter implements 
DataWriter<GenericRecord> {
         writer.reset(dbName, tableName);
       } else {
         try {
-          writer.flush(dbName, tableName);
+          Timer flushTimer = 
metadataWriterFlushTimers.get(writer.getClass().getName());
+          Timer datasetTimer = datasetTimers.computeIfAbsent(tableName, k -> 
metricContext.contextAwareTimer(k, 1, TimeUnit.HOURS));
+          try (Timer.Context flushContext = flushTimer.time();
+              Timer.Context datasetContext = datasetTimer.time()) {
+            writer.flush(dbName, tableName);
+          }
         } catch (IOException e) {
           if (exceptionMatches(e, transientExceptionMessages)) {
             throw new RuntimeException("Failing container due to transient 
exception for db: " + dbName + " table: " + tableName, e);
@@ -480,6 +506,7 @@ public class GobblinMCEWriter implements 
DataWriter<GenericRecord> {
       }
       entry.getValue().clear();
     }
+    logTimers();
   }
 
   @Override
@@ -565,4 +592,16 @@ public class GobblinMCEWriter implements 
DataWriter<GenericRecord> {
     List<MetadataWriter> failedWriters = 
metadataWriters.subList(metadataWriters.indexOf(failedWriter), 
metadataWriters.size());
     return failedWriters.stream().map(writer -> 
writer.getClass().getName()).collect(Collectors.toList());
   }
+
+  private void logTimers() {
+    logTimer(hiveSpecComputationTimer);
+    metadataWriterWriteTimers.values().forEach(this::logTimer);
+    metadataWriterFlushTimers.values().forEach(this::logTimer);
+    datasetTimers.values().forEach(this::logTimer);
+  }
+
+  private void logTimer(ContextAwareTimer timer) {
+    log.info("Timer {} 1 hour mean duration: {} ms", timer.getName(), 
TimeUnit.NANOSECONDS.toMillis((long) timer.getSnapshot().getMean()));
+    log.info("Timer {} 1 hour 99th percentile duration: {} ms", 
timer.getName(), TimeUnit.NANOSECONDS.toMillis((long) 
timer.getSnapshot().get99thPercentile()));
+  }
 }
diff --git 
a/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriter.java
 
b/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriter.java
index ecd528323..57a28f778 100644
--- 
a/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriter.java
+++ 
b/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriter.java
@@ -827,7 +827,10 @@ public class IcebergMetadataWriter implements 
MetadataWriter {
         String topicName = getTopicName(tid, tableMetadata);
         if (tableMetadata.appendFiles.isPresent()) {
           tableMetadata.appendFiles.get().commit();
-          sendAuditCounts(topicName, tableMetadata.serializedAuditCountMaps);
+          try (Timer.Context context = new Timer().time()) {
+            sendAuditCounts(topicName, tableMetadata.serializedAuditCountMaps);
+            log.info("Sending audit counts for {} took {} ms", topicName, 
TimeUnit.NANOSECONDS.toMillis(context.stop()));
+          }
           if (tableMetadata.completenessEnabled) {
             checkAndUpdateCompletenessWatermark(tableMetadata, topicName, 
tableMetadata.datePartitions, props);
           }
@@ -870,20 +873,25 @@ public class IcebergMetadataWriter implements 
MetadataWriter {
         UpdateProperties updateProperties = transaction.updateProperties();
         props.forEach(updateProperties::set);
         updateProperties.commit();
-        try (AutoCloseableHiveLock lock = this.locks.getTableLock(dbName, 
tableName)) {
+        try (AutoCloseableHiveLock lock = this.locks.getTableLock(dbName, 
tableName);
+            Timer.Context context = new Timer().time()) {
           transaction.commitTransaction();
+          log.info("Committing transaction for table {} took {} ms", tid, 
TimeUnit.NANOSECONDS.toMillis(context.stop()));
         }
 
         // Emit GTE for snapshot commits
         Snapshot snapshot = tableMetadata.table.get().currentSnapshot();
         Map<String, String> currentProps = 
tableMetadata.table.get().properties();
-        submitSnapshotCommitEvent(snapshot, tableMetadata, dbName, tableName, 
currentProps, highWatermark);
+        try (Timer.Context context = new Timer().time()) {
+          submitSnapshotCommitEvent(snapshot, tableMetadata, dbName, 
tableName, currentProps, highWatermark);
+          log.info("Sending snapshot commit event for {} took {} ms", 
topicName, TimeUnit.NANOSECONDS.toMillis(context.stop()));
+        }
 
         //Reset the table metadata for next accumulation period
         tableMetadata.reset(currentProps, highWatermark);
-        log.info(String.format("Finish commit of new snapshot %s for table 
%s", snapshot.snapshotId(), tid.toString()));
+        log.info(String.format("Finish commit of new snapshot %s for table 
%s", snapshot.snapshotId(), tid));
       } else {
-        log.info("There's no transaction initiated for the table {}", 
tid.toString());
+        log.info("There's no transaction initiated for the table {}", tid);
       }
     } catch (RuntimeException e) {
       throw new IOException(String.format("Fail to flush table %s %s", dbName, 
tableName), e);
diff --git 
a/gobblin-iceberg/src/test/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriterTest.java
 
b/gobblin-iceberg/src/test/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriterTest.java
index bf7dcb383..e5819d6ba 100644
--- 
a/gobblin-iceberg/src/test/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriterTest.java
+++ 
b/gobblin-iceberg/src/test/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriterTest.java
@@ -348,6 +348,10 @@ public class IcebergMetadataWriterTest extends 
HiveMetastoreTest {
     MetadataWriter mockWriter = Mockito.mock(MetadataWriter.class);
     Mockito.doThrow(new IOException("Test 
failure")).when(mockWriter).writeEnvelope(Mockito.any(), Mockito.any(), 
Mockito.any(), Mockito.any());
     gobblinMCEWriter.metadataWriters.add(0, mockWriter);
+    
gobblinMCEWriter.metadataWriterWriteTimers.put(mockWriter.getClass().getName(), 
gobblinMCEWriter.metricContext
+        .contextAwareTimer(mockWriter.getClass().getName() + ".write", 1, 
TimeUnit.HOURS));
+    
gobblinMCEWriter.metadataWriterFlushTimers.put(mockWriter.getClass().getName(), 
gobblinMCEWriter.metricContext
+        .contextAwareTimer(mockWriter.getClass().getName() + ".flush", 1, 
TimeUnit.HOURS));
 
     GobblinMetadataChangeEvent gmceWithMockWriter = 
SpecificData.get().deepCopy(gmce.getSchema(), gmce);
     
gmceWithMockWriter.setAllowedMetadataWriters(Arrays.asList(IcebergMetadataWriter.class.getName(),
 mockWriter.getClass().getName()));

Reply via email to