This is an automated email from the ASF dual-hosted git repository.
zihanli58 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 48139492b [GOBBLIN-1842] Add timers to GobblinMCEWriter (#3703)
48139492b is described below
commit 48139492b3d27c8514e00c3dd2c954a0cac72cd7
Author: Jack Moseley <[email protected]>
AuthorDate: Wed Jun 28 10:19:43 2023 -0700
[GOBBLIN-1842] Add timers to GobblinMCEWriter (#3703)
* Add timers to GobblinMCEWriter
* Add dataset level timers and more logs in flush
* Fix unit tests
---
.../gobblin/hive/writer/HiveMetadataWriter.java | 10 ++-
.../gobblin/iceberg/writer/GobblinMCEWriter.java | 89 ++++++++++++++++------
.../iceberg/writer/IcebergMetadataWriter.java | 18 +++--
.../iceberg/writer/IcebergMetadataWriterTest.java | 4 +
4 files changed, 89 insertions(+), 32 deletions(-)
diff --git
a/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/writer/HiveMetadataWriter.java
b/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/writer/HiveMetadataWriter.java
index 8400dd94b..02c0c5895 100644
---
a/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/writer/HiveMetadataWriter.java
+++
b/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/writer/HiveMetadataWriter.java
@@ -17,6 +17,7 @@
package org.apache.gobblin.hive.writer;
+import com.codahale.metrics.Timer;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Joiner;
import com.google.common.base.Optional;
@@ -160,8 +161,9 @@ public class HiveMetadataWriter implements MetadataWriter {
HashMap<List<String>, ListenableFuture<Void>> executionMap =
this.currentExecutionMap.get(tableKey);
//iterator all execution to get the result to make sure they all
succeeded
for (HashMap.Entry<List<String>, ListenableFuture<Void>> execution :
executionMap.entrySet()) {
- try {
+ try (Timer.Context context = new Timer().time()) {
execution.getValue().get(timeOutSeconds, TimeUnit.SECONDS);
+ log.info("Time taken to add partition to table {} is {} ms",
tableKey, TimeUnit.NANOSECONDS.toMillis(context.stop()));
} catch (TimeoutException e) {
// Since TimeoutException should always be a transient issue, throw
RuntimeException which will fail/retry container
throw new RuntimeException("Timeout waiting for result of
registration for table " + tableKey, e);
@@ -177,7 +179,11 @@ public class HiveMetadataWriter implements MetadataWriter {
if (cache != null) {
HiveSpec hiveSpec = cache.getIfPresent(execution.getKey());
if (hiveSpec != null) {
- eventSubmitter.submit(buildCommitEvent(dbName, tableName,
execution.getKey(), hiveSpec, HivePartitionOperation.ADD_OR_MODIFY));
+ try (Timer.Context context = new Timer().time()) {
+ eventSubmitter.submit(buildCommitEvent(dbName, tableName,
execution.getKey(), hiveSpec,
+ HivePartitionOperation.ADD_OR_MODIFY));
+ log.info("Time taken to submit event for table {} is {} ms",
tableKey, TimeUnit.NANOSECONDS.toMillis(context.stop()));
+ }
}
}
}
diff --git
a/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/GobblinMCEWriter.java
b/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/GobblinMCEWriter.java
index d2d72a969..559874056 100644
---
a/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/GobblinMCEWriter.java
+++
b/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/GobblinMCEWriter.java
@@ -38,6 +38,8 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.serde2.avro.AvroSerdeUtils;
import org.apache.commons.collections.CollectionUtils;
+
+import com.codahale.metrics.Timer;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Joiner;
@@ -69,6 +71,7 @@ import org.apache.gobblin.instrumented.Instrumented;
import org.apache.gobblin.metadata.DataFile;
import org.apache.gobblin.metadata.GobblinMetadataChangeEvent;
import org.apache.gobblin.metadata.OperationType;
+import org.apache.gobblin.metrics.ContextAwareTimer;
import org.apache.gobblin.metrics.MetricContext;
import org.apache.gobblin.metrics.Tag;
import org.apache.gobblin.metrics.event.EventSubmitter;
@@ -128,9 +131,17 @@ public class GobblinMCEWriter implements
DataWriter<GenericRecord> {
private final Set<String> currentErrorDatasets = new HashSet<>();
@Setter
private int maxErrorDataset;
+ @VisibleForTesting
+ public final MetricContext metricContext;
protected EventSubmitter eventSubmitter;
private final Set<String> transientExceptionMessages;
private final Set<String> nonTransientExceptionMessages;
+ @VisibleForTesting
+ public final Map<String, ContextAwareTimer> metadataWriterWriteTimers = new
HashMap<>();
+ @VisibleForTesting
+ public final Map<String, ContextAwareTimer> metadataWriterFlushTimers = new
HashMap<>();
+ private final ContextAwareTimer hiveSpecComputationTimer;
+ private final Map<String, ContextAwareTimer> datasetTimers = new HashMap<>();
@AllArgsConstructor
static class TableStatus {
@@ -150,19 +161,22 @@ public class GobblinMCEWriter implements
DataWriter<GenericRecord> {
acceptedClusters = properties.getPropAsSet(ACCEPTED_CLUSTER_NAMES,
ClustersNames.getInstance().getClusterName());
state = properties;
maxErrorDataset =
state.getPropAsInt(GMCE_METADATA_WRITER_MAX_ERROR_DATASET,
DEFUALT_GMCE_METADATA_WRITER_MAX_ERROR_DATASET);
+ List<Tag<?>> tags = Lists.newArrayList();
+ String clusterIdentifier = ClustersNames.getInstance().getClusterName();
+ tags.add(new Tag<>(MetadataWriterKeys.CLUSTER_IDENTIFIER_KEY_NAME,
clusterIdentifier));
+ metricContext = Instrumented.getMetricContext(state, this.getClass(),
tags);
+ eventSubmitter = new EventSubmitter.Builder(metricContext,
GOBBLIN_MCE_WRITER_METRIC_NAMESPACE).build();
for (String className : state.getPropAsList(GMCE_METADATA_WRITER_CLASSES,
IcebergMetadataWriter.class.getName())) {
metadataWriters.add(closer.register(GobblinConstructorUtils.invokeConstructor(MetadataWriter.class,
className, state)));
+ metadataWriterWriteTimers.put(className,
metricContext.contextAwareTimer(className + ".write", 1, TimeUnit.HOURS));
+ metadataWriterFlushTimers.put(className,
metricContext.contextAwareTimer(className + ".flush", 1, TimeUnit.HOURS));
}
+ hiveSpecComputationTimer =
metricContext.contextAwareTimer("hiveSpec.computation", 1, TimeUnit.HOURS);
tableOperationTypeMap = new HashMap<>();
parallelRunner = closer.register(new
ParallelRunner(state.getPropAsInt(METADATA_REGISTRATION_THREADS, 20),
FileSystem.get(HadoopUtils.getConfFromState(properties))));
parallelRunnerTimeoutMills =
state.getPropAsInt(METADATA_PARALLEL_RUNNER_TIMEOUT_MILLS,
DEFAULT_ICEBERG_PARALLEL_TIMEOUT_MILLS);
- List<Tag<?>> tags = Lists.newArrayList();
- String clusterIdentifier = ClustersNames.getInstance().getClusterName();
- tags.add(new Tag<>(MetadataWriterKeys.CLUSTER_IDENTIFIER_KEY_NAME,
clusterIdentifier));
- MetricContext metricContext = Instrumented.getMetricContext(state,
this.getClass(), tags);
- eventSubmitter = new EventSubmitter.Builder(metricContext,
GOBBLIN_MCE_WRITER_METRIC_NAMESPACE).build();
transientExceptionMessages = new
HashSet<>(properties.getPropAsList(TRANSIENT_EXCEPTION_MESSAGES_KEY, ""));
nonTransientExceptionMessages = new
HashSet<>(properties.getPropAsList(NON_TRANSIENT_EXCEPTION_MESSAGES_KEY, ""));
}
@@ -187,26 +201,28 @@ public class GobblinMCEWriter implements
DataWriter<GenericRecord> {
*/
private void computeSpecMap(List<String> files, ConcurrentHashMap<String,
Collection<HiveSpec>> specsMap,
Cache<String, Collection<HiveSpec>> cache, State registerState, boolean
isPrefix) throws IOException {
- HiveRegistrationPolicy policy =
HiveRegistrationPolicyBase.getPolicy(registerState);
- for (String file : files) {
- parallelRunner.submitCallable(new Callable<Void>() {
- @Override
- public Void call() throws Exception {
- try {
- Path regPath = isPrefix ? new Path(file) : new
Path(file).getParent();
- //Use raw path to comply with HDFS federation setting.
- Path rawPath = new Path(regPath.toUri().getRawPath());
- specsMap.put(regPath.toString(), cache.get(regPath.toString(), ()
-> policy.getHiveSpecs(rawPath)));
- } catch (Throwable e) {
- //todo: Emit failed GMCE in the future to easily track the error
gmce and investigate the reason for that.
- log.warn("Cannot get Hive Spec for {} using policy {} due to:",
file, policy.toString());
- log.warn(e.getMessage());
+ try (Timer.Context context = hiveSpecComputationTimer.time()) {
+ HiveRegistrationPolicy policy =
HiveRegistrationPolicyBase.getPolicy(registerState);
+ for (String file : files) {
+ parallelRunner.submitCallable(new Callable<Void>() {
+ @Override
+ public Void call() throws Exception {
+ try {
+ Path regPath = isPrefix ? new Path(file) : new
Path(file).getParent();
+ //Use raw path to comply with HDFS federation setting.
+ Path rawPath = new Path(regPath.toUri().getRawPath());
+ specsMap.put(regPath.toString(), cache.get(regPath.toString(),
() -> policy.getHiveSpecs(rawPath)));
+ } catch (Throwable e) {
+ //todo: Emit failed GMCE in the future to easily track the error
gmce and investigate the reason for that.
+ log.warn("Cannot get Hive Spec for {} using policy {} due to:",
file, policy.toString());
+ log.warn(e.getMessage());
+ }
+ return null;
}
- return null;
- }
- }, file);
+ }, file);
+ }
+ parallelRunner.waitForTasks(parallelRunnerTimeoutMills);
}
- parallelRunner.waitForTasks(parallelRunnerTimeoutMills);
}
@Override
@@ -341,7 +357,12 @@ public class GobblinMCEWriter implements
DataWriter<GenericRecord> {
writer.reset(dbName, tableName);
} else {
try {
- writer.writeEnvelope(recordEnvelope, newSpecsMap, oldSpecsMap, spec);
+ Timer writeTimer =
metadataWriterWriteTimers.get(writer.getClass().getName());
+ Timer datasetTimer = datasetTimers.computeIfAbsent(tableName, k ->
metricContext.contextAwareTimer(k, 1, TimeUnit.HOURS));
+ try (Timer.Context writeContext = writeTimer.time();
+ Timer.Context datasetContext = datasetTimer.time()) {
+ writer.writeEnvelope(recordEnvelope, newSpecsMap, oldSpecsMap,
spec);
+ }
} catch (Exception e) {
if (exceptionMatches(e, transientExceptionMessages)) {
throw new RuntimeException("Failing container due to transient
exception for db: " + dbName + " table: " + tableName, e);
@@ -419,7 +440,12 @@ public class GobblinMCEWriter implements
DataWriter<GenericRecord> {
writer.reset(dbName, tableName);
} else {
try {
- writer.flush(dbName, tableName);
+ Timer flushTimer =
metadataWriterFlushTimers.get(writer.getClass().getName());
+ Timer datasetTimer = datasetTimers.computeIfAbsent(tableName, k ->
metricContext.contextAwareTimer(k, 1, TimeUnit.HOURS));
+ try (Timer.Context flushContext = flushTimer.time();
+ Timer.Context datasetContext = datasetTimer.time()) {
+ writer.flush(dbName, tableName);
+ }
} catch (IOException e) {
if (exceptionMatches(e, transientExceptionMessages)) {
throw new RuntimeException("Failing container due to transient
exception for db: " + dbName + " table: " + tableName, e);
@@ -480,6 +506,7 @@ public class GobblinMCEWriter implements
DataWriter<GenericRecord> {
}
entry.getValue().clear();
}
+ logTimers();
}
@Override
@@ -565,4 +592,16 @@ public class GobblinMCEWriter implements
DataWriter<GenericRecord> {
List<MetadataWriter> failedWriters =
metadataWriters.subList(metadataWriters.indexOf(failedWriter),
metadataWriters.size());
return failedWriters.stream().map(writer ->
writer.getClass().getName()).collect(Collectors.toList());
}
+
+ private void logTimers() {
+ logTimer(hiveSpecComputationTimer);
+ metadataWriterWriteTimers.values().forEach(this::logTimer);
+ metadataWriterFlushTimers.values().forEach(this::logTimer);
+ datasetTimers.values().forEach(this::logTimer);
+ }
+
+ private void logTimer(ContextAwareTimer timer) {
+ log.info("Timer {} 1 hour mean duration: {} ms", timer.getName(),
TimeUnit.NANOSECONDS.toMillis((long) timer.getSnapshot().getMean()));
+ log.info("Timer {} 1 hour 99th percentile duration: {} ms",
timer.getName(), TimeUnit.NANOSECONDS.toMillis((long)
timer.getSnapshot().get99thPercentile()));
+ }
}
diff --git
a/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriter.java
b/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriter.java
index ecd528323..57a28f778 100644
---
a/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriter.java
+++
b/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriter.java
@@ -827,7 +827,10 @@ public class IcebergMetadataWriter implements
MetadataWriter {
String topicName = getTopicName(tid, tableMetadata);
if (tableMetadata.appendFiles.isPresent()) {
tableMetadata.appendFiles.get().commit();
- sendAuditCounts(topicName, tableMetadata.serializedAuditCountMaps);
+ try (Timer.Context context = new Timer().time()) {
+ sendAuditCounts(topicName, tableMetadata.serializedAuditCountMaps);
+ log.info("Sending audit counts for {} took {} ms", topicName,
TimeUnit.NANOSECONDS.toMillis(context.stop()));
+ }
if (tableMetadata.completenessEnabled) {
checkAndUpdateCompletenessWatermark(tableMetadata, topicName,
tableMetadata.datePartitions, props);
}
@@ -870,20 +873,25 @@ public class IcebergMetadataWriter implements
MetadataWriter {
UpdateProperties updateProperties = transaction.updateProperties();
props.forEach(updateProperties::set);
updateProperties.commit();
- try (AutoCloseableHiveLock lock = this.locks.getTableLock(dbName,
tableName)) {
+ try (AutoCloseableHiveLock lock = this.locks.getTableLock(dbName,
tableName);
+ Timer.Context context = new Timer().time()) {
transaction.commitTransaction();
+ log.info("Committing transaction for table {} took {} ms", tid,
TimeUnit.NANOSECONDS.toMillis(context.stop()));
}
// Emit GTE for snapshot commits
Snapshot snapshot = tableMetadata.table.get().currentSnapshot();
Map<String, String> currentProps =
tableMetadata.table.get().properties();
- submitSnapshotCommitEvent(snapshot, tableMetadata, dbName, tableName,
currentProps, highWatermark);
+ try (Timer.Context context = new Timer().time()) {
+ submitSnapshotCommitEvent(snapshot, tableMetadata, dbName,
tableName, currentProps, highWatermark);
+ log.info("Sending snapshot commit event for {} took {} ms",
topicName, TimeUnit.NANOSECONDS.toMillis(context.stop()));
+ }
//Reset the table metadata for next accumulation period
tableMetadata.reset(currentProps, highWatermark);
- log.info(String.format("Finish commit of new snapshot %s for table
%s", snapshot.snapshotId(), tid.toString()));
+ log.info(String.format("Finish commit of new snapshot %s for table
%s", snapshot.snapshotId(), tid));
} else {
- log.info("There's no transaction initiated for the table {}",
tid.toString());
+ log.info("There's no transaction initiated for the table {}", tid);
}
} catch (RuntimeException e) {
throw new IOException(String.format("Fail to flush table %s %s", dbName,
tableName), e);
diff --git
a/gobblin-iceberg/src/test/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriterTest.java
b/gobblin-iceberg/src/test/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriterTest.java
index bf7dcb383..e5819d6ba 100644
---
a/gobblin-iceberg/src/test/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriterTest.java
+++
b/gobblin-iceberg/src/test/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriterTest.java
@@ -348,6 +348,10 @@ public class IcebergMetadataWriterTest extends
HiveMetastoreTest {
MetadataWriter mockWriter = Mockito.mock(MetadataWriter.class);
Mockito.doThrow(new IOException("Test
failure")).when(mockWriter).writeEnvelope(Mockito.any(), Mockito.any(),
Mockito.any(), Mockito.any());
gobblinMCEWriter.metadataWriters.add(0, mockWriter);
+
gobblinMCEWriter.metadataWriterWriteTimers.put(mockWriter.getClass().getName(),
gobblinMCEWriter.metricContext
+ .contextAwareTimer(mockWriter.getClass().getName() + ".write", 1,
TimeUnit.HOURS));
+
gobblinMCEWriter.metadataWriterFlushTimers.put(mockWriter.getClass().getName(),
gobblinMCEWriter.metricContext
+ .contextAwareTimer(mockWriter.getClass().getName() + ".flush", 1,
TimeUnit.HOURS));
GobblinMetadataChangeEvent gmceWithMockWriter =
SpecificData.get().deepCopy(gmce.getSchema(), gmce);
gmceWithMockWriter.setAllowedMetadataWriters(Arrays.asList(IcebergMetadataWriter.class.getName(),
mockWriter.getClass().getName()));