This is an automated email from the ASF dual-hosted git repository.

codope pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 8e36fe91715 [HUDI-7924] Capture Latency and Failure Metrics For Hive 
Table recreation (#11498)
8e36fe91715 is described below

commit 8e36fe91715d96785fab63f51b3ab6ae61f2b53c
Author: vamsikarnika <[email protected]>
AuthorDate: Fri Jun 28 14:35:20 2024 +0530

    [HUDI-7924] Capture Latency and Failure Metrics For Hive Table recreation 
(#11498)
    
    Added latency and failure metrics for recreate table on meta sync failure.
    Results in pushing new metrics to prometheus which helps in monitoring
    the performance of recreating table.
    
    ---------
    
    Co-authored-by: Vamsi <[email protected]>
    Co-authored-by: Vamsi <[email protected]>
---
 .../java/org/apache/hudi/hive/HiveSyncTool.java    |   8 ++
 .../org/apache/hudi/hive/TestHiveSyncTool.java     |   6 ++
 .../apache/hudi/sync/common/HoodieSyncConfig.java  |  12 +++
 .../apache/hudi/sync/common/HoodieSyncTool.java    |   3 +
 .../sync/common/metrics/HoodieMetaSyncMetrics.java | 118 +++++++++++++++++++++
 .../common/metrics/TestHoodieMetaSyncMetrics.java  | 106 ++++++++++++++++++
 6 files changed, 253 insertions(+)

diff --git 
a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncTool.java 
b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncTool.java
index c2d2dd26fe8..1c2056785b7 100644
--- 
a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncTool.java
+++ 
b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncTool.java
@@ -33,6 +33,7 @@ import 
org.apache.hudi.sync.common.model.PartitionEvent.PartitionEventType;
 import org.apache.hudi.sync.common.util.SparkDataSourceTableUtils;
 
 import com.beust.jcommander.JCommander;
+import com.codahale.metrics.Timer;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.parquet.schema.MessageType;
@@ -41,6 +42,7 @@ import org.slf4j.LoggerFactory;
 
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.Properties;
 import java.util.Set;
 import java.util.stream.Collectors;
@@ -319,12 +321,18 @@ public class HiveSyncTool extends HoodieSyncTool 
implements AutoCloseable {
 
   private void recreateAndSyncHiveTable(String tableName, boolean 
useRealtimeInputFormat, boolean readAsOptimized) {
     LOG.info("recreating and syncing the table {}", tableName);
+    Timer.Context timerContext = metrics.getRecreateAndSyncTimer();
     MessageType schema = 
syncClient.getStorageSchema(!config.getBoolean(HIVE_SYNC_OMIT_METADATA_FIELDS));
     try {
       createOrReplaceTable(tableName, useRealtimeInputFormat, readAsOptimized, 
schema);
       syncAllPartitions(tableName);
       syncClient.updateLastCommitTimeSynced(tableName);
+      if (Objects.nonNull(timerContext)) {
+        long durationInNs = timerContext.stop();
+        metrics.updateRecreateAndSyncDurationInMs(durationInNs);
+      }
     } catch (HoodieHiveSyncException ex) {
+      metrics.incrementRecreateAndSyncFailureCounter();
       throw new HoodieHiveSyncException("failed to recreate the table for " + 
tableName, ex);
     }
   }
diff --git 
a/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java
 
b/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java
index f8b067014c3..34dea62db1f 100644
--- 
a/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java
+++ 
b/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java
@@ -18,6 +18,7 @@
 
 package org.apache.hudi.hive;
 
+import org.apache.hudi.common.config.HoodieCommonConfig;
 import org.apache.hudi.common.fs.FSUtils;
 import org.apache.hudi.common.model.HoodieCommitMetadata;
 import org.apache.hudi.common.model.HoodieFileFormat;
@@ -36,6 +37,7 @@ import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.StringUtils;
 import org.apache.hudi.common.util.collection.ImmutablePair;
 import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.config.metrics.HoodieMetricsConfig;
 import org.apache.hudi.hadoop.HoodieParquetInputFormat;
 import org.apache.hudi.hadoop.realtime.HoodieParquetRealtimeInputFormat;
 import org.apache.hudi.hadoop.utils.HoodieInputFormatUtils;
@@ -44,6 +46,7 @@ import org.apache.hudi.hive.ddl.HiveSyncMode;
 import org.apache.hudi.hive.testutils.HiveTestUtil;
 import org.apache.hudi.hive.util.IMetaStoreClientUtil;
 import org.apache.hudi.storage.hadoop.HadoopStorageConfiguration;
+import org.apache.hudi.metrics.MetricsReporterType;
 import org.apache.hudi.sync.common.HoodieSyncConfig;
 import org.apache.hudi.sync.common.model.FieldSchema;
 import org.apache.hudi.sync.common.model.Partition;
@@ -829,6 +832,9 @@ public class TestHiveSyncTool {
     hiveSyncProps.setProperty(HIVE_SYNC_MODE.key(), syncMode);
     hiveSyncProps.setProperty(HIVE_SYNC_FILTER_PUSHDOWN_ENABLED.key(), 
enablePushDown);
     hiveSyncProps.setProperty(RECREATE_HIVE_TABLE_ON_ERROR.key(), "true");
+    hiveSyncProps.setProperty(HoodieMetricsConfig.TURN_METRICS_ON.key(), 
"true");
+    hiveSyncProps.setProperty(HoodieCommonConfig.BASE_PATH.key(), basePath);
+    
hiveSyncProps.setProperty(HoodieMetricsConfig.METRICS_REPORTER_TYPE_VALUE.key(),
 MetricsReporterType.INMEMORY.name());
 
     String commitTime1 = "100";
     HiveTestUtil.createCOWTable(commitTime1, 5, true);
diff --git 
a/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/HoodieSyncConfig.java
 
b/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/HoodieSyncConfig.java
index 6662de5772e..b6ee02355ff 100644
--- 
a/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/HoodieSyncConfig.java
+++ 
b/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/HoodieSyncConfig.java
@@ -28,6 +28,7 @@ import org.apache.hudi.common.table.HoodieTableConfig;
 import org.apache.hudi.common.util.HadoopConfigUtils;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.config.metrics.HoodieMetricsConfig;
 import org.apache.hudi.hadoop.fs.HadoopFSUtils;
 import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
 
@@ -44,6 +45,7 @@ import java.util.List;
 import java.util.Properties;
 import java.util.stream.Collectors;
 
+import static org.apache.hudi.common.config.HoodieCommonConfig.BASE_PATH;
 import static 
org.apache.hudi.common.config.HoodieMetadataConfig.DEFAULT_METADATA_ENABLE_FOR_READERS;
 import static org.apache.hudi.common.table.HoodieTableConfig.BASE_FILE_FORMAT;
 import static org.apache.hudi.common.table.HoodieTableConfig.DATABASE_NAME;
@@ -199,6 +201,7 @@ public class HoodieSyncConfig extends HoodieConfig {
           + "obtained from Hudi's internal metadata table. Note, " + 
HoodieMetadataConfig.ENABLE + " must be set to true.");
 
   private Configuration hadoopConf;
+  private HoodieMetricsConfig metricsConfig;
 
   public HoodieSyncConfig(Properties props) {
     this(props, HadoopConfigUtils.createHadoopConf(props));
@@ -213,6 +216,11 @@ public class HoodieSyncConfig extends HoodieConfig {
         .collect(Collectors.joining("\n")));
     setDefaults(HoodieSyncConfig.class.getName());
     this.hadoopConf = hadoopConf;
+    this.metricsConfig = 
HoodieMetricsConfig.newBuilder().fromProperties(props).build();
+  }
+
+  public String getBasePath() {
+    return getString(BASE_PATH);
   }
 
   public void setHadoopConf(Configuration hadoopConf) {
@@ -223,6 +231,10 @@ public class HoodieSyncConfig extends HoodieConfig {
     return hadoopConf;
   }
 
+  public HoodieMetricsConfig getMetricsConfig() {
+    return metricsConfig;
+  }
+
   public FileSystem getHadoopFileSystem() {
     return HadoopFSUtils.getFs(getString(META_SYNC_BASE_PATH), 
getHadoopConf());
   }
diff --git 
a/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/HoodieSyncTool.java
 
b/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/HoodieSyncTool.java
index c614a7ae82b..ed341ed7c99 100644
--- 
a/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/HoodieSyncTool.java
+++ 
b/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/HoodieSyncTool.java
@@ -19,6 +19,7 @@ package org.apache.hudi.sync.common;
 
 import org.apache.hudi.common.config.TypedProperties;
 import org.apache.hudi.common.util.HadoopConfigUtils;
+import org.apache.hudi.sync.common.metrics.HoodieMetaSyncMetrics;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
@@ -33,6 +34,7 @@ public abstract class HoodieSyncTool implements AutoCloseable 
{
 
   protected Properties props;
   protected Configuration hadoopConf;
+  protected HoodieMetaSyncMetrics metrics;
 
   public HoodieSyncTool(Properties props) {
     this(props, HadoopConfigUtils.createHadoopConf(props));
@@ -41,6 +43,7 @@ public abstract class HoodieSyncTool implements AutoCloseable 
{
   public HoodieSyncTool(Properties props, Configuration hadoopConf) {
     this.props = props;
     this.hadoopConf = hadoopConf;
+    this.metrics = new HoodieMetaSyncMetrics(new HoodieSyncConfig(props, 
hadoopConf), getClass().getSimpleName());
   }
 
   @Deprecated
diff --git 
a/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/metrics/HoodieMetaSyncMetrics.java
 
b/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/metrics/HoodieMetaSyncMetrics.java
new file mode 100644
index 00000000000..cbe729ae030
--- /dev/null
+++ 
b/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/metrics/HoodieMetaSyncMetrics.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.sync.common.metrics;
+
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.common.util.VisibleForTesting;
+import org.apache.hudi.config.metrics.HoodieMetricsConfig;
+import org.apache.hudi.hadoop.fs.HadoopFSUtils;
+import org.apache.hudi.metrics.Metrics;
+import org.apache.hudi.storage.HoodieStorage;
+import org.apache.hudi.storage.HoodieStorageUtils;
+import org.apache.hudi.sync.common.HoodieSyncConfig;
+
+import com.codahale.metrics.Counter;
+import com.codahale.metrics.Timer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class HoodieMetaSyncMetrics {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(HoodieMetaSyncMetrics.class);
+  private static final String TIMER_ACTION = "timer";
+  private static final String COUNTER_ACTION = "counter";
+  private static final String META_SYNC_RECREATE_TABLE_METRIC = 
"meta_sync.recreate_table";
+  private static final String META_SYNC_RECREATE_TABLE_FAILURE_METRIC = 
"meta_sync.recreate_table.failure";
+  private static final String META_SYNC_ACTION = "meta_sync";
+  private static final String RECREATE_TABLE_DURATION_MS_METRIC = 
"recreate_table_duration_ms";
+  // Metrics are shut down by the shutdown hook added in the Metrics class
+  private Metrics metrics;
+  private HoodieMetricsConfig metricsConfig;
+  private transient HoodieStorage storage;
+
+  private final String syncToolName;
+
+  private static String recreateAndSyncFailureCounterName;
+  private static String recreateAndSyncTimerName;
+
+  private Timer recreateAndSyncTimer;
+  private Counter recreateAndSyncFailureCounter;
+
+  public HoodieMetaSyncMetrics(HoodieSyncConfig config, String syncToolName) {
+    this.metricsConfig = config.getMetricsConfig();
+    this.syncToolName = syncToolName;
+    if (metricsConfig.isMetricsOn()) {
+      this.storage = HoodieStorageUtils.getStorage(config.getBasePath(), 
HadoopFSUtils.getStorageConf(config.getHadoopConf()));
+      metrics = Metrics.getInstance(metricsConfig, storage);
+      recreateAndSyncTimerName = getMetricsName(TIMER_ACTION, 
META_SYNC_RECREATE_TABLE_METRIC);
+      recreateAndSyncFailureCounterName = getMetricsName(COUNTER_ACTION, 
META_SYNC_RECREATE_TABLE_FAILURE_METRIC);
+    }
+  }
+
+  public Metrics getMetrics() {
+    return metrics;
+  }
+
+  public Timer.Context getRecreateAndSyncTimer() {
+    if (metricsConfig.isMetricsOn() && recreateAndSyncTimer == null) {
+      recreateAndSyncTimer = createTimer(recreateAndSyncTimerName);
+    }
+    return recreateAndSyncTimer == null ? null : recreateAndSyncTimer.time();
+  }
+
+  private Timer createTimer(String name) {
+    return metricsConfig.isMetricsOn() ? metrics.getRegistry().timer(name) : 
null;
+  }
+
+  public void incrementRecreateAndSyncFailureCounter() {
+    recreateAndSyncFailureCounter = getCounter(recreateAndSyncFailureCounter, 
recreateAndSyncFailureCounterName);
+    recreateAndSyncFailureCounter.inc();
+  }
+
+  public void updateRecreateAndSyncDurationInMs(long durationInNs) {
+    if (metricsConfig.isMetricsOn()) {
+      long durationInMs = getDurationInMs(durationInNs);
+      LOG.info("Sending recreate and sync metrics {}", durationInMs);
+      metrics.registerGauge(getMetricsName(META_SYNC_ACTION, 
RECREATE_TABLE_DURATION_MS_METRIC), durationInMs);
+    }
+  }
+
+  /**
+   * By default, the timer context returns duration with nano seconds. Convert 
it to millisecond.
+   */
+  private long getDurationInMs(long ctxDuration) {
+    return ctxDuration / 1000000;
+  }
+
+  @VisibleForTesting
+  public String getMetricsName(String action, String metric) {
+    if 
(StringUtils.isNullOrEmpty(metricsConfig.getMetricReporterMetricsNamePrefix())) 
{
+      return String.format("%s.%s.%s", action, metric, syncToolName);
+    } else {
+      return String.format("%s.%s.%s.%s", 
metricsConfig.getMetricReporterMetricsNamePrefix(), action, metric, 
syncToolName);
+    }
+  }
+
+  public Counter getCounter(Counter counter, String name) {
+    if (counter == null) {
+      return metrics.getRegistry().counter(name);
+    }
+    return counter;
+  }
+}
diff --git 
a/hudi-sync/hudi-sync-common/src/test/java/org/apache/hudi/sync/common/metrics/TestHoodieMetaSyncMetrics.java
 
b/hudi-sync/hudi-sync-common/src/test/java/org/apache/hudi/sync/common/metrics/TestHoodieMetaSyncMetrics.java
new file mode 100644
index 00000000000..2a753b1fe99
--- /dev/null
+++ 
b/hudi-sync/hudi-sync-common/src/test/java/org/apache/hudi/sync/common/metrics/TestHoodieMetaSyncMetrics.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.sync.common.metrics;
+
+import org.apache.hudi.config.metrics.HoodieMetricsConfig;
+import org.apache.hudi.metrics.Metrics;
+import org.apache.hudi.metrics.MetricsReporterType;
+import org.apache.hudi.sync.common.HoodieSyncConfig;
+
+import com.codahale.metrics.Timer;
+import org.apache.hadoop.conf.Configuration;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.time.Instant;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.Mockito.when;
+
+@ExtendWith(MockitoExtension.class)
+public class TestHoodieMetaSyncMetrics {
+
+  @Mock
+  HoodieSyncConfig syncConfig;
+  @Mock
+  HoodieMetricsConfig metricsConfig;
+  HoodieMetaSyncMetrics hoodieSyncMetrics;
+  Metrics metrics;
+  private static Configuration hadoopConf;
+  private static Path basepath;
+
+  @BeforeEach
+  void setUp() throws IOException {
+    hadoopConf = new Configuration();
+    basepath = Files.createTempDirectory("hivesyncmetricstest" + 
Instant.now().toEpochMilli());
+    when(metricsConfig.isMetricsOn()).thenReturn(true);
+    when(syncConfig.getMetricsConfig()).thenReturn(metricsConfig);
+    when(syncConfig.getHadoopConf()).thenReturn(hadoopConf);
+    when(syncConfig.getBasePath()).thenReturn(basepath.toUri().toString());
+    
when(metricsConfig.getMetricsReporterType()).thenReturn(MetricsReporterType.INMEMORY);
+    when(metricsConfig.getBasePath()).thenReturn(basepath.toUri().toString());
+    
when(metricsConfig.getMetricReporterMetricsNamePrefix()).thenReturn("test_prefix");
+    hoodieSyncMetrics = new HoodieMetaSyncMetrics(syncConfig, 
"TestHiveSyncTool");
+    metrics = hoodieSyncMetrics.getMetrics();
+  }
+
+  @AfterEach
+  void shutdownMetrics() throws IOException {
+    Files.delete(basepath);
+    metrics.shutdown();
+  }
+
+  @Test
+  void testUpdateRecreateAndSyncDurationInMs() throws InterruptedException {
+    Timer.Context timerCtx = hoodieSyncMetrics.getRecreateAndSyncTimer();
+    Thread.sleep(5);
+    long durationInNs = timerCtx.stop();
+    hoodieSyncMetrics.updateRecreateAndSyncDurationInMs(durationInNs);
+    String metricName = hoodieSyncMetrics.getMetricsName("meta_sync", 
"recreate_table_duration_ms");
+    long timeIsMs = (Long) 
metrics.getRegistry().getGauges().get(metricName).getValue();
+    assertTrue(timeIsMs > 0, "recreate_table duration metric value should be > 
0");
+  }
+
+  @Test
+  void testIncrementRecreateAndSyncFailureCounter() {
+    hoodieSyncMetrics.incrementRecreateAndSyncFailureCounter();
+    String metricsName = hoodieSyncMetrics.getMetricsName("counter", 
"meta_sync.recreate_table.failure");
+    long count = 
metrics.getRegistry().getCounters().get(metricsName).getCount();
+    assertEquals(1, count, "recreate_table failure counter value should be 1");
+  }
+
+  @Test
+  void testIncrementRecreateAndSyncFailureCounter_WithoutMetricsNamePrefix() {
+    when(metricsConfig.getMetricReporterMetricsNamePrefix()).thenReturn("");
+    hoodieSyncMetrics = new HoodieMetaSyncMetrics(syncConfig, 
"TestHiveSyncTool");
+    metrics = hoodieSyncMetrics.getMetrics();
+    hoodieSyncMetrics.incrementRecreateAndSyncFailureCounter();
+    String metricsName = hoodieSyncMetrics.getMetricsName("counter", 
"meta_sync.recreate_table.failure");
+    long count = 
metrics.getRegistry().getCounters().get(metricsName).getCount();
+    assertEquals(1, count, "recreate_table failure counter value should be 1");
+  }
+}

Reply via email to