This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new ea6a6145223 [HUDI-7763] Fix that multiple jmx reporter can exist if
metadata enables (#11226)
ea6a6145223 is described below
commit ea6a6145223a4b029a97215b31edddf5b1774e9b
Author: JH <[email protected]>
AuthorDate: Thu Jun 27 17:08:59 2024 +0900
[HUDI-7763] Fix that multiple jmx reporter can exist if metadata enables
(#11226)
---
.../apache/hudi/metrics/TestHoodieJmxMetrics.java | 54 ++++++++++++++++++++++
.../apache/hudi/metrics/JmxMetricsReporter.java | 36 +++++++++++----
2 files changed, 81 insertions(+), 9 deletions(-)
diff --git
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/metrics/TestHoodieJmxMetrics.java
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/metrics/TestHoodieJmxMetrics.java
index 20542cc4cc2..80f42e93421 100644
---
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/metrics/TestHoodieJmxMetrics.java
+++
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/metrics/TestHoodieJmxMetrics.java
@@ -22,6 +22,7 @@ import org.apache.hudi.common.testutils.HoodieTestUtils;
import org.apache.hudi.common.testutils.NetworkTestUtils;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.config.metrics.HoodieMetricsConfig;
+import org.apache.hudi.exception.HoodieException;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
@@ -33,6 +34,8 @@ import org.mockito.junit.jupiter.MockitoExtension;
import java.util.UUID;
import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.mockito.Mockito.clearInvocations;
import static org.mockito.Mockito.when;
/**
@@ -78,4 +81,55 @@ public class TestHoodieJmxMetrics {
assertEquals("123", metrics.getRegistry().getGauges()
.get("jmx_metric2").getValue().toString());
}
+
+ @Test
+ public void testMultipleJmxReporterServer() {
+ String ports = "9889-9890";
+ clearInvocations(metricsConfig);
+
when(metricsConfig.getMetricsReporterType()).thenReturn(MetricsReporterType.JMX);
+ when(metricsConfig.getJmxHost()).thenReturn("localhost");
+ when(metricsConfig.getJmxPort()).thenReturn(ports);
+ when(metricsConfig.getBasePath()).thenReturn("s3://test" +
UUID.randomUUID());
+ hoodieMetrics = new HoodieMetrics(writeConfig,
HoodieTestUtils.getDefaultStorage());
+ metrics = hoodieMetrics.getMetrics();
+
+ clearInvocations(metricsConfig);
+
when(metricsConfig.getMetricsReporterType()).thenReturn(MetricsReporterType.JMX);
+ when(metricsConfig.getJmxHost()).thenReturn("localhost");
+ when(metricsConfig.getJmxPort()).thenReturn(ports);
+ when(metricsConfig.getBasePath()).thenReturn("s3://test2" +
UUID.randomUUID());
+
+ hoodieMetrics = new HoodieMetrics(writeConfig,
HoodieTestUtils.getDefaultStorage());
+ Metrics metrics2 = hoodieMetrics.getMetrics();
+
+ metrics.registerGauge("jmx_metric3", 123L);
+ assertEquals("123", metrics.getRegistry().getGauges()
+ .get("jmx_metric3").getValue().toString());
+
+ metrics2.registerGauge("jmx_metric4", 123L);
+ assertEquals("123", metrics2.getRegistry().getGauges()
+ .get("jmx_metric4").getValue().toString());
+ }
+
+ @Test
+ public void testMultipleJmxReporterServerFailedForOnePort() {
+ String ports = "9891";
+ clearInvocations(metricsConfig);
+
when(metricsConfig.getMetricsReporterType()).thenReturn(MetricsReporterType.JMX);
+ when(metricsConfig.getJmxHost()).thenReturn("localhost");
+ when(metricsConfig.getJmxPort()).thenReturn(ports);
+ when(metricsConfig.getBasePath()).thenReturn("s3://test" +
UUID.randomUUID());
+ hoodieMetrics = new HoodieMetrics(writeConfig,
HoodieTestUtils.getDefaultStorage());
+ metrics = hoodieMetrics.getMetrics();
+
+ clearInvocations(metricsConfig);
+
when(metricsConfig.getMetricsReporterType()).thenReturn(MetricsReporterType.JMX);
+ when(metricsConfig.getJmxHost()).thenReturn("localhost");
+ when(metricsConfig.getJmxPort()).thenReturn(ports);
+ when(metricsConfig.getBasePath()).thenReturn("s3://test2" +
UUID.randomUUID());
+
+ assertThrows(HoodieException.class, () -> {
+ hoodieMetrics = new HoodieMetrics(writeConfig,
HoodieTestUtils.getDefaultStorage());
+ });
+ }
}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/metrics/JmxMetricsReporter.java
b/hudi-common/src/main/java/org/apache/hudi/metrics/JmxMetricsReporter.java
index b341fc356f1..b56a41888ec 100644
--- a/hudi-common/src/main/java/org/apache/hudi/metrics/JmxMetricsReporter.java
+++ b/hudi-common/src/main/java/org/apache/hudi/metrics/JmxMetricsReporter.java
@@ -28,6 +28,7 @@ import org.slf4j.LoggerFactory;
import javax.management.MBeanServer;
import java.lang.management.ManagementFactory;
+import java.rmi.server.ExportException;
import java.util.Objects;
import java.util.stream.IntStream;
@@ -53,16 +54,13 @@ public class JmxMetricsReporter extends MetricsReporter {
host, portsConfig));
}
int[] ports = getPortRangeFromString(portsConfig);
- boolean successfullyStartedServer = false;
- for (int port : ports) {
- jmxReporterServer = createJmxReport(host, port);
- LOG.info("Started JMX server on port " + port + ".");
- successfullyStartedServer = true;
- break;
- }
+ initializeJmxReporterServer(host, ports);
+
+ boolean successfullyStartedServer = isServerCreated();
if (!successfullyStartedServer) {
throw new HoodieException(
- "Could not start JMX server on any configured port. Ports: " +
portsConfig);
+ "Could not start JMX server on any configured port. Ports: " +
portsConfig
+ + ". Maybe require port range for multiple hoodie tables");
}
LOG.info("Configured JMXReporter with {port:" + portsConfig + "}");
} catch (Exception e) {
@@ -72,9 +70,29 @@ public class JmxMetricsReporter extends MetricsReporter {
}
}
+ private boolean isServerCreated() {
+ return jmxReporterServer != null;
+ }
+
+ private void initializeJmxReporterServer(String host, int[] ports) {
+ for (int port : ports) {
+ try {
+ jmxReporterServer = createJmxReport(host, port);
+ LOG.info("Started JMX server on port " + port + ".");
+ break;
+ } catch (Exception e) {
+ if (e.getCause() instanceof ExportException) {
+ LOG.info("Skip for initializing jmx port " + port + " because of
already in use");
+ } else {
+ LOG.info("Failed to initialize jmx port " + port + ". " +
e.getMessage());
+ }
+ }
+ }
+ }
+
@Override
public void start() {
- if (jmxReporterServer != null) {
+ if (isServerCreated()) {
jmxReporterServer.start();
} else {
LOG.error("Cannot start as the jmxReporter is null.");