This is an automated email from the ASF dual-hosted git repository.
codope pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 8e36fe91715 [HUDI-7924] Capture Latency and Failure Metrics For Hive
Table recreation (#11498)
8e36fe91715 is described below
commit 8e36fe91715d96785fab63f51b3ab6ae61f2b53c
Author: vamsikarnika <[email protected]>
AuthorDate: Fri Jun 28 14:35:20 2024 +0530
[HUDI-7924] Capture Latency and Failure Metrics For Hive Table recreation
(#11498)
Added latency and failure metrics for recreate table on meta sync failure.
Results in pushing new metrics to prometheus which helps in monitoring
the performance of recreating table.
---------
Co-authored-by: Vamsi <[email protected]>
Co-authored-by: Vamsi <[email protected]>
---
.../java/org/apache/hudi/hive/HiveSyncTool.java | 8 ++
.../org/apache/hudi/hive/TestHiveSyncTool.java | 6 ++
.../apache/hudi/sync/common/HoodieSyncConfig.java | 12 +++
.../apache/hudi/sync/common/HoodieSyncTool.java | 3 +
.../sync/common/metrics/HoodieMetaSyncMetrics.java | 118 +++++++++++++++++++++
.../common/metrics/TestHoodieMetaSyncMetrics.java | 106 ++++++++++++++++++
6 files changed, 253 insertions(+)
diff --git
a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncTool.java
b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncTool.java
index c2d2dd26fe8..1c2056785b7 100644
---
a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncTool.java
+++
b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncTool.java
@@ -33,6 +33,7 @@ import
org.apache.hudi.sync.common.model.PartitionEvent.PartitionEventType;
import org.apache.hudi.sync.common.util.SparkDataSourceTableUtils;
import com.beust.jcommander.JCommander;
+import com.codahale.metrics.Timer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.parquet.schema.MessageType;
@@ -41,6 +42,7 @@ import org.slf4j.LoggerFactory;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
import java.util.Properties;
import java.util.Set;
import java.util.stream.Collectors;
@@ -319,12 +321,18 @@ public class HiveSyncTool extends HoodieSyncTool
implements AutoCloseable {
private void recreateAndSyncHiveTable(String tableName, boolean
useRealtimeInputFormat, boolean readAsOptimized) {
LOG.info("recreating and syncing the table {}", tableName);
+ Timer.Context timerContext = metrics.getRecreateAndSyncTimer();
MessageType schema =
syncClient.getStorageSchema(!config.getBoolean(HIVE_SYNC_OMIT_METADATA_FIELDS));
try {
createOrReplaceTable(tableName, useRealtimeInputFormat, readAsOptimized,
schema);
syncAllPartitions(tableName);
syncClient.updateLastCommitTimeSynced(tableName);
+ if (Objects.nonNull(timerContext)) {
+ long durationInNs = timerContext.stop();
+ metrics.updateRecreateAndSyncDurationInMs(durationInNs);
+ }
} catch (HoodieHiveSyncException ex) {
+ metrics.incrementRecreateAndSyncFailureCounter();
throw new HoodieHiveSyncException("failed to recreate the table for " +
tableName, ex);
}
}
diff --git
a/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java
b/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java
index f8b067014c3..34dea62db1f 100644
---
a/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java
+++
b/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java
@@ -18,6 +18,7 @@
package org.apache.hudi.hive;
+import org.apache.hudi.common.config.HoodieCommonConfig;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieFileFormat;
@@ -36,6 +37,7 @@ import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.collection.ImmutablePair;
import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.config.metrics.HoodieMetricsConfig;
import org.apache.hudi.hadoop.HoodieParquetInputFormat;
import org.apache.hudi.hadoop.realtime.HoodieParquetRealtimeInputFormat;
import org.apache.hudi.hadoop.utils.HoodieInputFormatUtils;
@@ -44,6 +46,7 @@ import org.apache.hudi.hive.ddl.HiveSyncMode;
import org.apache.hudi.hive.testutils.HiveTestUtil;
import org.apache.hudi.hive.util.IMetaStoreClientUtil;
import org.apache.hudi.storage.hadoop.HadoopStorageConfiguration;
+import org.apache.hudi.metrics.MetricsReporterType;
import org.apache.hudi.sync.common.HoodieSyncConfig;
import org.apache.hudi.sync.common.model.FieldSchema;
import org.apache.hudi.sync.common.model.Partition;
@@ -829,6 +832,9 @@ public class TestHiveSyncTool {
hiveSyncProps.setProperty(HIVE_SYNC_MODE.key(), syncMode);
hiveSyncProps.setProperty(HIVE_SYNC_FILTER_PUSHDOWN_ENABLED.key(),
enablePushDown);
hiveSyncProps.setProperty(RECREATE_HIVE_TABLE_ON_ERROR.key(), "true");
+ hiveSyncProps.setProperty(HoodieMetricsConfig.TURN_METRICS_ON.key(),
"true");
+ hiveSyncProps.setProperty(HoodieCommonConfig.BASE_PATH.key(), basePath);
+
hiveSyncProps.setProperty(HoodieMetricsConfig.METRICS_REPORTER_TYPE_VALUE.key(),
MetricsReporterType.INMEMORY.name());
String commitTime1 = "100";
HiveTestUtil.createCOWTable(commitTime1, 5, true);
diff --git
a/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/HoodieSyncConfig.java
b/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/HoodieSyncConfig.java
index 6662de5772e..b6ee02355ff 100644
---
a/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/HoodieSyncConfig.java
+++
b/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/HoodieSyncConfig.java
@@ -28,6 +28,7 @@ import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.util.HadoopConfigUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.config.metrics.HoodieMetricsConfig;
import org.apache.hudi.hadoop.fs.HadoopFSUtils;
import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
@@ -44,6 +45,7 @@ import java.util.List;
import java.util.Properties;
import java.util.stream.Collectors;
+import static org.apache.hudi.common.config.HoodieCommonConfig.BASE_PATH;
import static
org.apache.hudi.common.config.HoodieMetadataConfig.DEFAULT_METADATA_ENABLE_FOR_READERS;
import static org.apache.hudi.common.table.HoodieTableConfig.BASE_FILE_FORMAT;
import static org.apache.hudi.common.table.HoodieTableConfig.DATABASE_NAME;
@@ -199,6 +201,7 @@ public class HoodieSyncConfig extends HoodieConfig {
+ "obtained from Hudi's internal metadata table. Note, " +
HoodieMetadataConfig.ENABLE + " must be set to true.");
private Configuration hadoopConf;
+ private HoodieMetricsConfig metricsConfig;
public HoodieSyncConfig(Properties props) {
this(props, HadoopConfigUtils.createHadoopConf(props));
@@ -213,6 +216,11 @@ public class HoodieSyncConfig extends HoodieConfig {
.collect(Collectors.joining("\n")));
setDefaults(HoodieSyncConfig.class.getName());
this.hadoopConf = hadoopConf;
+ this.metricsConfig =
HoodieMetricsConfig.newBuilder().fromProperties(props).build();
+ }
+
+ public String getBasePath() {
+ return getString(BASE_PATH);
}
public void setHadoopConf(Configuration hadoopConf) {
@@ -223,6 +231,10 @@ public class HoodieSyncConfig extends HoodieConfig {
return hadoopConf;
}
+ public HoodieMetricsConfig getMetricsConfig() {
+ return metricsConfig;
+ }
+
public FileSystem getHadoopFileSystem() {
return HadoopFSUtils.getFs(getString(META_SYNC_BASE_PATH),
getHadoopConf());
}
diff --git
a/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/HoodieSyncTool.java
b/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/HoodieSyncTool.java
index c614a7ae82b..ed341ed7c99 100644
---
a/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/HoodieSyncTool.java
+++
b/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/HoodieSyncTool.java
@@ -19,6 +19,7 @@ package org.apache.hudi.sync.common;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.util.HadoopConfigUtils;
+import org.apache.hudi.sync.common.metrics.HoodieMetaSyncMetrics;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
@@ -33,6 +34,7 @@ public abstract class HoodieSyncTool implements AutoCloseable
{
protected Properties props;
protected Configuration hadoopConf;
+ protected HoodieMetaSyncMetrics metrics;
public HoodieSyncTool(Properties props) {
this(props, HadoopConfigUtils.createHadoopConf(props));
@@ -41,6 +43,7 @@ public abstract class HoodieSyncTool implements AutoCloseable
{
public HoodieSyncTool(Properties props, Configuration hadoopConf) {
this.props = props;
this.hadoopConf = hadoopConf;
+ this.metrics = new HoodieMetaSyncMetrics(new HoodieSyncConfig(props,
hadoopConf), getClass().getSimpleName());
}
@Deprecated
diff --git
a/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/metrics/HoodieMetaSyncMetrics.java
b/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/metrics/HoodieMetaSyncMetrics.java
new file mode 100644
index 00000000000..cbe729ae030
--- /dev/null
+++
b/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/metrics/HoodieMetaSyncMetrics.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.sync.common.metrics;
+
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.common.util.VisibleForTesting;
+import org.apache.hudi.config.metrics.HoodieMetricsConfig;
+import org.apache.hudi.hadoop.fs.HadoopFSUtils;
+import org.apache.hudi.metrics.Metrics;
+import org.apache.hudi.storage.HoodieStorage;
+import org.apache.hudi.storage.HoodieStorageUtils;
+import org.apache.hudi.sync.common.HoodieSyncConfig;
+
+import com.codahale.metrics.Counter;
+import com.codahale.metrics.Timer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class HoodieMetaSyncMetrics {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(HoodieMetaSyncMetrics.class);
+ private static final String TIMER_ACTION = "timer";
+ private static final String COUNTER_ACTION = "counter";
+ private static final String META_SYNC_RECREATE_TABLE_METRIC =
"meta_sync.recreate_table";
+ private static final String META_SYNC_RECREATE_TABLE_FAILURE_METRIC =
"meta_sync.recreate_table.failure";
+ private static final String META_SYNC_ACTION = "meta_sync";
+ private static final String RECREATE_TABLE_DURATION_MS_METRIC =
"recreate_table_duration_ms";
+ // Metrics are shut down by the shutdown hook added in the Metrics class
+ private Metrics metrics;
+ private HoodieMetricsConfig metricsConfig;
+ private transient HoodieStorage storage;
+
+ private final String syncToolName;
+
+ private static String recreateAndSyncFailureCounterName;
+ private static String recreateAndSyncTimerName;
+
+ private Timer recreateAndSyncTimer;
+ private Counter recreateAndSyncFailureCounter;
+
+ public HoodieMetaSyncMetrics(HoodieSyncConfig config, String syncToolName) {
+ this.metricsConfig = config.getMetricsConfig();
+ this.syncToolName = syncToolName;
+ if (metricsConfig.isMetricsOn()) {
+ this.storage = HoodieStorageUtils.getStorage(config.getBasePath(),
HadoopFSUtils.getStorageConf(config.getHadoopConf()));
+ metrics = Metrics.getInstance(metricsConfig, storage);
+ recreateAndSyncTimerName = getMetricsName(TIMER_ACTION,
META_SYNC_RECREATE_TABLE_METRIC);
+ recreateAndSyncFailureCounterName = getMetricsName(COUNTER_ACTION,
META_SYNC_RECREATE_TABLE_FAILURE_METRIC);
+ }
+ }
+
+ public Metrics getMetrics() {
+ return metrics;
+ }
+
+ public Timer.Context getRecreateAndSyncTimer() {
+ if (metricsConfig.isMetricsOn() && recreateAndSyncTimer == null) {
+ recreateAndSyncTimer = createTimer(recreateAndSyncTimerName);
+ }
+ return recreateAndSyncTimer == null ? null : recreateAndSyncTimer.time();
+ }
+
+ private Timer createTimer(String name) {
+ return metricsConfig.isMetricsOn() ? metrics.getRegistry().timer(name) :
null;
+ }
+
+ public void incrementRecreateAndSyncFailureCounter() {
+ recreateAndSyncFailureCounter = getCounter(recreateAndSyncFailureCounter,
recreateAndSyncFailureCounterName);
+ recreateAndSyncFailureCounter.inc();
+ }
+
+ public void updateRecreateAndSyncDurationInMs(long durationInNs) {
+ if (metricsConfig.isMetricsOn()) {
+ long durationInMs = getDurationInMs(durationInNs);
+ LOG.info("Sending recreate and sync metrics {}", durationInMs);
+ metrics.registerGauge(getMetricsName(META_SYNC_ACTION,
RECREATE_TABLE_DURATION_MS_METRIC), durationInMs);
+ }
+ }
+
+ /**
+ * By default, the timer context returns duration with nano seconds. Convert
it to millisecond.
+ */
+ private long getDurationInMs(long ctxDuration) {
+ return ctxDuration / 1000000;
+ }
+
+ @VisibleForTesting
+ public String getMetricsName(String action, String metric) {
+ if
(StringUtils.isNullOrEmpty(metricsConfig.getMetricReporterMetricsNamePrefix()))
{
+ return String.format("%s.%s.%s", action, metric, syncToolName);
+ } else {
+ return String.format("%s.%s.%s.%s",
metricsConfig.getMetricReporterMetricsNamePrefix(), action, metric,
syncToolName);
+ }
+ }
+
+ public Counter getCounter(Counter counter, String name) {
+ if (counter == null) {
+ return metrics.getRegistry().counter(name);
+ }
+ return counter;
+ }
+}
diff --git
a/hudi-sync/hudi-sync-common/src/test/java/org/apache/hudi/sync/common/metrics/TestHoodieMetaSyncMetrics.java
b/hudi-sync/hudi-sync-common/src/test/java/org/apache/hudi/sync/common/metrics/TestHoodieMetaSyncMetrics.java
new file mode 100644
index 00000000000..2a753b1fe99
--- /dev/null
+++
b/hudi-sync/hudi-sync-common/src/test/java/org/apache/hudi/sync/common/metrics/TestHoodieMetaSyncMetrics.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.sync.common.metrics;
+
+import org.apache.hudi.config.metrics.HoodieMetricsConfig;
+import org.apache.hudi.metrics.Metrics;
+import org.apache.hudi.metrics.MetricsReporterType;
+import org.apache.hudi.sync.common.HoodieSyncConfig;
+
+import com.codahale.metrics.Timer;
+import org.apache.hadoop.conf.Configuration;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.time.Instant;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.Mockito.when;
+
+@ExtendWith(MockitoExtension.class)
+public class TestHoodieMetaSyncMetrics {
+
+ @Mock
+ HoodieSyncConfig syncConfig;
+ @Mock
+ HoodieMetricsConfig metricsConfig;
+ HoodieMetaSyncMetrics hoodieSyncMetrics;
+ Metrics metrics;
+ private static Configuration hadoopConf;
+ private static Path basepath;
+
+ @BeforeEach
+ void setUp() throws IOException {
+ hadoopConf = new Configuration();
+ basepath = Files.createTempDirectory("hivesyncmetricstest" +
Instant.now().toEpochMilli());
+ when(metricsConfig.isMetricsOn()).thenReturn(true);
+ when(syncConfig.getMetricsConfig()).thenReturn(metricsConfig);
+ when(syncConfig.getHadoopConf()).thenReturn(hadoopConf);
+ when(syncConfig.getBasePath()).thenReturn(basepath.toUri().toString());
+
when(metricsConfig.getMetricsReporterType()).thenReturn(MetricsReporterType.INMEMORY);
+ when(metricsConfig.getBasePath()).thenReturn(basepath.toUri().toString());
+
when(metricsConfig.getMetricReporterMetricsNamePrefix()).thenReturn("test_prefix");
+ hoodieSyncMetrics = new HoodieMetaSyncMetrics(syncConfig,
"TestHiveSyncTool");
+ metrics = hoodieSyncMetrics.getMetrics();
+ }
+
+ @AfterEach
+ void shutdownMetrics() throws IOException {
+ Files.delete(basepath);
+ metrics.shutdown();
+ }
+
+ @Test
+ void testUpdateRecreateAndSyncDurationInMs() throws InterruptedException {
+ Timer.Context timerCtx = hoodieSyncMetrics.getRecreateAndSyncTimer();
+ Thread.sleep(5);
+ long durationInNs = timerCtx.stop();
+ hoodieSyncMetrics.updateRecreateAndSyncDurationInMs(durationInNs);
+ String metricName = hoodieSyncMetrics.getMetricsName("meta_sync",
"recreate_table_duration_ms");
+ long timeIsMs = (Long)
metrics.getRegistry().getGauges().get(metricName).getValue();
+ assertTrue(timeIsMs > 0, "recreate_table duration metric value should be >
0");
+ }
+
+ @Test
+ void testIncrementRecreateAndSyncFailureCounter() {
+ hoodieSyncMetrics.incrementRecreateAndSyncFailureCounter();
+ String metricsName = hoodieSyncMetrics.getMetricsName("counter",
"meta_sync.recreate_table.failure");
+ long count =
metrics.getRegistry().getCounters().get(metricsName).getCount();
+ assertEquals(1, count, "recreate_table failure counter value should be 1");
+ }
+
+ @Test
+ void testIncrementRecreateAndSyncFailureCounter_WithoutMetricsNamePrefix() {
+ when(metricsConfig.getMetricReporterMetricsNamePrefix()).thenReturn("");
+ hoodieSyncMetrics = new HoodieMetaSyncMetrics(syncConfig,
"TestHiveSyncTool");
+ metrics = hoodieSyncMetrics.getMetrics();
+ hoodieSyncMetrics.incrementRecreateAndSyncFailureCounter();
+ String metricsName = hoodieSyncMetrics.getMetricsName("counter",
"meta_sync.recreate_table.failure");
+ long count =
metrics.getRegistry().getCounters().get(metricsName).getCount();
+ assertEquals(1, count, "recreate_table failure counter value should be 1");
+ }
+}