This is an automated email from the ASF dual-hosted git repository.
xianjingfeng pushed a commit to branch branch-0.7
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git
The following commit(s) were added to refs/heads/branch-0.7 by this push:
new bcb591ef [#796][0.7] bug: Fix the issues of MetricReporter (#821)
bcb591ef is described below
commit bcb591efe3ce1a6ad74d9cde533b6489b2f25db0
Author: xianjingfeng <[email protected]>
AuthorDate: Thu Apr 13 14:58:10 2023 +0800
[#796][0.7] bug: Fix the issues of MetricReporter (#821)
### What changes were proposed in this pull request?
1.Support custom config keys defined in plugins
2.Refactor the logic for load config file
3.Fix some issues of metricReporter.
### Why are the changes needed?
Metric reporter is unusable.
Fix: #796
### Does this PR introduce any user-facing change?
No.
### How was this patch tested?
UT and Manual testing
---
.../apache/uniffle/common/config/RssBaseConf.java | 17 +++++-------
.../org/apache/uniffle/common/config/RssConf.java | 32 ++++++++++++++++++++++
.../common/metrics/MetricReporterFactory.java | 2 +-
.../PrometheusPushGatewayMetricReporter.java | 2 +-
.../common/metrics/MetricReporterFactoryTest.java} | 31 +++++++++++----------
.../uniffle/coordinator/CoordinatorConf.java | 20 +-------------
.../uniffle/coordinator/CoordinatorServer.java | 3 ++
.../uniffle/coordinator/CoordinatorConfTest.java | 2 ++
coordinator/src/test/resources/coordinator.conf | 1 +
docs/coordinator_guide.md | 1 +
docs/server_guide.md | 1 +
.../org/apache/uniffle/server/ShuffleServer.java | 3 ++
.../apache/uniffle/server/ShuffleServerConf.java | 26 +-----------------
.../uniffle/server/ShuffleServerConfTest.java | 5 ++--
server/src/test/resources/confTest.conf | 1 +
15 files changed, 75 insertions(+), 72 deletions(-)
diff --git
a/common/src/main/java/org/apache/uniffle/common/config/RssBaseConf.java
b/common/src/main/java/org/apache/uniffle/common/config/RssBaseConf.java
index 761e5bf3..ae6bce0d 100644
--- a/common/src/main/java/org/apache/uniffle/common/config/RssBaseConf.java
+++ b/common/src/main/java/org/apache/uniffle/common/config/RssBaseConf.java
@@ -21,6 +21,7 @@ import java.util.List;
import java.util.Map;
import org.apache.uniffle.common.ClientType;
+import org.apache.uniffle.common.util.RssUtils;
public class RssBaseConf extends RssConf {
@@ -212,21 +213,17 @@ public class RssBaseConf extends RssConf {
.defaultValue(5L)
.withDescription("Reconfigure check interval.");
- public boolean loadCommonConf(Map<String, String> properties) {
+ public boolean loadConfFromFile(String fileName, List<ConfigOption<Object>>
configOptions) {
+ Map<String, String> properties = RssUtils.getPropertiesFromFile(fileName);
if (properties == null) {
return false;
}
+ return loadCommonConf(properties) && loadConf(properties, configOptions,
true);
+ }
+ public boolean loadCommonConf(Map<String, String> properties) {
List<ConfigOption<Object>> configOptions =
ConfigUtils.getAllConfigOptions(RssBaseConf.class);
- properties.forEach((k, v) -> {
- configOptions.forEach(config -> {
- if (config.key().equalsIgnoreCase(k)) {
- set(config, ConfigUtils.convertValue(v, config.getClazz()));
- }
- });
- });
-
- return true;
+ return loadConf(properties, configOptions, false);
}
}
diff --git a/common/src/main/java/org/apache/uniffle/common/config/RssConf.java
b/common/src/main/java/org/apache/uniffle/common/config/RssConf.java
index f7f832a1..1f9da666 100644
--- a/common/src/main/java/org/apache/uniffle/common/config/RssConf.java
+++ b/common/src/main/java/org/apache/uniffle/common/config/RssConf.java
@@ -18,10 +18,12 @@
package org.apache.uniffle.common.config;
import java.util.Arrays;
+import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Collectors;
import com.google.common.collect.Sets;
@@ -600,6 +602,36 @@ public class RssConf implements Cloneable {
return getRawValue(configOption.key());
}
+ /**
+ * loadConf
+ * @param properties all config items in configration file
+ * @param configOptions the config items defined in base config class
+ * @param includeMissingKey if include the keys which not defined in base
config class
+ * @return true if load successfully, otherwise false
+ */
+ public boolean loadConf(
+ Map<String, String> properties,
+ List<ConfigOption<Object>> configOptions,
+ boolean includeMissingKey) {
+ if (properties == null || configOptions == null) {
+ return false;
+ }
+ Map<String, ConfigOption<Object>> configOptionMap =
+ configOptions.stream().collect(Collectors.toMap(c ->
c.key().toLowerCase(), c -> c));
+ properties.forEach((k, v) -> {
+ ConfigOption<Object> config = configOptionMap.get(k.toLowerCase());
+ if (config == null) {
+ // if the key is not defined in configOptions, set it as a string value
+ if (includeMissingKey) {
+ setString(k, v);
+ }
+ } else {
+ set(config, ConfigUtils.convertValue(v, config.getClazz()));
+ }
+ });
+ return true;
+ }
+
@Override
public int hashCode() {
int hash = 0;
diff --git
a/common/src/main/java/org/apache/uniffle/common/metrics/MetricReporterFactory.java
b/common/src/main/java/org/apache/uniffle/common/metrics/MetricReporterFactory.java
index d66576df..d25a9527 100644
---
a/common/src/main/java/org/apache/uniffle/common/metrics/MetricReporterFactory.java
+++
b/common/src/main/java/org/apache/uniffle/common/metrics/MetricReporterFactory.java
@@ -33,7 +33,7 @@ public class MetricReporterFactory {
}
Class<?> klass = Class.forName(name);
Constructor<?> constructor;
- constructor = klass.getConstructor(conf.getClass(), instanceId.getClass());
+ constructor = klass.getConstructor(RssConf.class, String.class);
return (AbstractMetricReporter) constructor.newInstance(conf, instanceId);
}
}
diff --git
a/common/src/main/java/org/apache/uniffle/common/metrics/prometheus/PrometheusPushGatewayMetricReporter.java
b/common/src/main/java/org/apache/uniffle/common/metrics/prometheus/PrometheusPushGatewayMetricReporter.java
index 8795d5b6..1b55c886 100644
---
a/common/src/main/java/org/apache/uniffle/common/metrics/prometheus/PrometheusPushGatewayMetricReporter.java
+++
b/common/src/main/java/org/apache/uniffle/common/metrics/prometheus/PrometheusPushGatewayMetricReporter.java
@@ -68,7 +68,7 @@ public class PrometheusPushGatewayMetricReporter extends
AbstractMetricReporter
scheduledExecutorService.scheduleWithFixedDelay(() -> {
for (CollectorRegistry registry : registryList) {
try {
- pushGateway.push(registry, jobName, groupingKey);
+ pushGateway.pushAdd(registry, jobName, groupingKey);
} catch (Throwable e) {
LOG.error("Failed to send metrics to push gateway.", e);
}
diff --git
a/common/src/main/java/org/apache/uniffle/common/metrics/MetricReporterFactory.java
b/common/src/test/java/org/apache/uniffle/common/metrics/MetricReporterFactoryTest.java
similarity index 58%
copy from
common/src/main/java/org/apache/uniffle/common/metrics/MetricReporterFactory.java
copy to
common/src/test/java/org/apache/uniffle/common/metrics/MetricReporterFactoryTest.java
index d66576df..e3071a37 100644
---
a/common/src/main/java/org/apache/uniffle/common/metrics/MetricReporterFactory.java
+++
b/common/src/test/java/org/apache/uniffle/common/metrics/MetricReporterFactoryTest.java
@@ -17,23 +17,26 @@
package org.apache.uniffle.common.metrics;
-import java.lang.reflect.Constructor;
-
-import org.apache.commons.lang3.StringUtils;
+import org.junit.jupiter.api.Test;
import org.apache.uniffle.common.config.RssBaseConf;
import org.apache.uniffle.common.config.RssConf;
+import
org.apache.uniffle.common.metrics.prometheus.PrometheusPushGatewayMetricReporter;
+
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class MetricReporterFactoryTest {
+
+ @Test
+ public void testGetMetricReporter() throws Exception {
+ CustomRssConf conf = new CustomRssConf();
+ conf.set(RssBaseConf.RSS_METRICS_REPORTER_CLASS,
+ PrometheusPushGatewayMetricReporter.class.getCanonicalName());
+ MetricReporter metricReporter =
MetricReporterFactory.getMetricReporter(conf, "1");
+ assertTrue(metricReporter instanceof PrometheusPushGatewayMetricReporter);
+ }
+
+ class CustomRssConf extends RssConf {
-public class MetricReporterFactory {
-
- public static MetricReporter getMetricReporter(RssConf conf, String
instanceId) throws Exception {
- String name = conf.get(RssBaseConf.RSS_METRICS_REPORTER_CLASS);
- if (StringUtils.isEmpty(name)) {
- return null;
- }
- Class<?> klass = Class.forName(name);
- Constructor<?> constructor;
- constructor = klass.getConstructor(conf.getClass(), instanceId.getClass());
- return (AbstractMetricReporter) constructor.newInstance(conf, instanceId);
}
}
diff --git
a/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorConf.java
b/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorConf.java
index 6f2b9413..424d7d1e 100644
---
a/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorConf.java
+++
b/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorConf.java
@@ -18,13 +18,11 @@
package org.apache.uniffle.coordinator;
import java.util.List;
-import java.util.Map;
import org.apache.uniffle.common.config.ConfigOption;
import org.apache.uniffle.common.config.ConfigOptions;
import org.apache.uniffle.common.config.ConfigUtils;
import org.apache.uniffle.common.config.RssBaseConf;
-import org.apache.uniffle.common.util.RssUtils;
import
org.apache.uniffle.coordinator.strategy.assignment.AbstractAssignmentStrategy;
import
org.apache.uniffle.coordinator.strategy.assignment.AssignmentStrategyFactory;
@@ -209,22 +207,6 @@ public class CoordinatorConf extends RssBaseConf {
}
public boolean loadConfFromFile(String fileName) {
- Map<String, String> properties = RssUtils.getPropertiesFromFile(fileName);
-
- if (properties == null) {
- return false;
- }
-
- loadCommonConf(properties);
-
- List<ConfigOption<Object>> configOptions =
ConfigUtils.getAllConfigOptions(CoordinatorConf.class);
- properties.forEach((k, v) -> {
- configOptions.forEach(config -> {
- if (config.key().equalsIgnoreCase(k)) {
- set(config, ConfigUtils.convertValue(v, config.getClazz()));
- }
- });
- });
- return true;
+ return loadConfFromFile(fileName,
ConfigUtils.getAllConfigOptions(CoordinatorConf.class));
}
}
diff --git
a/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorServer.java
b/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorServer.java
index a016df6a..fa648983 100644
---
a/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorServer.java
+++
b/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorServer.java
@@ -101,6 +101,9 @@ public class CoordinatorServer extends ReconfigurableBase {
startReconfigureThread();
jettyServer.start();
server.start();
+ if (metricReporter != null) {
+ metricReporter.start();
+ }
Runtime.getRuntime().addShutdownHook(new Thread() {
@Override
diff --git
a/coordinator/src/test/java/org/apache/uniffle/coordinator/CoordinatorConfTest.java
b/coordinator/src/test/java/org/apache/uniffle/coordinator/CoordinatorConfTest.java
index 3d06f79f..0ece6b82 100644
---
a/coordinator/src/test/java/org/apache/uniffle/coordinator/CoordinatorConfTest.java
+++
b/coordinator/src/test/java/org/apache/uniffle/coordinator/CoordinatorConfTest.java
@@ -46,6 +46,8 @@ public class CoordinatorConfTest {
assertEquals(256, conf.getInteger(CoordinatorConf.JETTY_CORE_POOL_SIZE));
assertEquals(60 * 1000,
conf.getLong(CoordinatorConf.COORDINATOR_EXCLUDE_NODES_CHECK_INTERVAL));
+ // test custom keys defined in plugins
+ assertEquals("v1", conf.getString("plugin.custom.key", null));
}
}
diff --git a/coordinator/src/test/resources/coordinator.conf
b/coordinator/src/test/resources/coordinator.conf
index a29bfdfb..82e6889c 100644
--- a/coordinator/src/test/resources/coordinator.conf
+++ b/coordinator/src/test/resources/coordinator.conf
@@ -28,3 +28,4 @@ rss.coordinator.access.candidates.updateIntervalSec 1
rss.coordinator.access.loadChecker.serverNum.threshold 2
rss.coordinator.access.loadChecker.memory.percentage 20.0
rss.coordinator.dynamicClientConf.updateIntervalSec 1
+plugin.custom.key v1
diff --git a/docs/coordinator_guide.md b/docs/coordinator_guide.md
index d53a5b37..dc7f1975 100644
--- a/docs/coordinator_guide.md
+++ b/docs/coordinator_guide.md
@@ -126,6 +126,7 @@ PrometheusPushGatewayMetricReporter is one of the built-in
metrics reporter, whi
|Property Name|Default| Description
|
|---|---|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
+|rss.metrics.reporter.class|org.apache.uniffle.common.metrics.<br/>prometheus.PrometheusPushGatewayMetricReporter|The
class of metrics reporter.|
|rss.metrics.prometheus.pushgateway.addr|-| The PushGateway server host URL
including scheme, host name, and port.
|
|rss.metrics.prometheus.pushgateway.groupingkey|-| Specifies the grouping key
which is the group and global labels of all metrics. The label name and value
are separated by '=', and labels are separated by ';', e.g., k1=v1;k2=v2.
Please ensure that your grouping key meets the [Prometheus
requirements](https://prometheus.io/docs/concepts/data_model/#metric-names-and-labels).
|
|rss.metrics.prometheus.pushgateway.jobname|-| The job name under which
metrics will be pushed.
|
diff --git a/docs/server_guide.md b/docs/server_guide.md
index e3f9a99c..11f6782d 100644
--- a/docs/server_guide.md
+++ b/docs/server_guide.md
@@ -98,6 +98,7 @@ PrometheusPushGatewayMetricReporter is one of the built-in
metrics reporter, whi
|Property Name|Default| Description
|
|---|---|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
+|rss.metrics.reporter.class|org.apache.uniffle.common.metrics.<br/>prometheus.PrometheusPushGatewayMetricReporter|The
class of metrics reporter.|
|rss.metrics.prometheus.pushgateway.addr|-| The PushGateway server host URL
including scheme, host name, and port.
|
|rss.metrics.prometheus.pushgateway.groupingkey|-| Specifies the grouping key
which is the group and global labels of all metrics. The label name and value
are separated by '=', and labels are separated by ';', e.g., k1=v1;k2=v2.
Please ensure that your grouping key meets the [Prometheus
requirements](https://prometheus.io/docs/concepts/data_model/#metric-names-and-labels).
|
|rss.metrics.prometheus.pushgateway.jobname|-| The job name under which
metrics will be pushed.
|
diff --git a/server/src/main/java/org/apache/uniffle/server/ShuffleServer.java
b/server/src/main/java/org/apache/uniffle/server/ShuffleServer.java
index 796ba305..e1058dd8 100644
--- a/server/src/main/java/org/apache/uniffle/server/ShuffleServer.java
+++ b/server/src/main/java/org/apache/uniffle/server/ShuffleServer.java
@@ -110,6 +110,9 @@ public class ShuffleServer {
registerHeartBeat.startHeartBeat();
jettyServer.start();
server.start();
+ if (metricReporter != null) {
+ metricReporter.start();
+ }
Runtime.getRuntime().addShutdownHook(new Thread() {
@Override
diff --git
a/server/src/main/java/org/apache/uniffle/server/ShuffleServerConf.java
b/server/src/main/java/org/apache/uniffle/server/ShuffleServerConf.java
index 5b181075..fc2f8024 100644
--- a/server/src/main/java/org/apache/uniffle/server/ShuffleServerConf.java
+++ b/server/src/main/java/org/apache/uniffle/server/ShuffleServerConf.java
@@ -18,7 +18,6 @@
package org.apache.uniffle.server;
import java.util.List;
-import java.util.Map;
import org.apache.hadoop.conf.Configuration;
@@ -26,7 +25,6 @@ import org.apache.uniffle.common.config.ConfigOption;
import org.apache.uniffle.common.config.ConfigOptions;
import org.apache.uniffle.common.config.ConfigUtils;
import org.apache.uniffle.common.config.RssBaseConf;
-import org.apache.uniffle.common.util.RssUtils;
public class ShuffleServerConf extends RssBaseConf {
@@ -380,28 +378,6 @@ public class ShuffleServerConf extends RssBaseConf {
}
public boolean loadConfFromFile(String fileName) {
- Map<String, String> properties = RssUtils.getPropertiesFromFile(fileName);
-
- if (properties == null) {
- return false;
- }
-
- loadCommonConf(properties);
-
- List<ConfigOption<Object>> configOptions =
ConfigUtils.getAllConfigOptions(ShuffleServerConf.class);
-
- properties.forEach((k, v) -> {
- configOptions.forEach(config -> {
- if (config.key().equalsIgnoreCase(k)) {
- set(config, ConfigUtils.convertValue(v, config.getClazz()));
- }
- });
-
- if (k.startsWith(PREFIX_HADOOP_CONF)) {
- setString(k, v);
- }
- });
-
- return true;
+ return loadConfFromFile(fileName,
ConfigUtils.getAllConfigOptions(ShuffleServerConf.class));
}
}
diff --git
a/server/src/test/java/org/apache/uniffle/server/ShuffleServerConfTest.java
b/server/src/test/java/org/apache/uniffle/server/ShuffleServerConfTest.java
index f70ca342..07f180a5 100644
--- a/server/src/test/java/org/apache/uniffle/server/ShuffleServerConfTest.java
+++ b/server/src/test/java/org/apache/uniffle/server/ShuffleServerConfTest.java
@@ -61,8 +61,9 @@ public class ShuffleServerConfTest {
assertFalse(shuffleServerConf.loadConfFromFile("/var/tmp/null"));
assertEquals(2,
shuffleServerConf.getLong(ShuffleServerConf.SERVER_BUFFER_CAPACITY));
assertEquals("value1",
shuffleServerConf.getString("rss.server.hadoop.a.b", ""));
- assertEquals("", shuffleServerConf.getString("rss.server.had.a.b", ""));
- assertEquals("GRPC",
shuffleServerConf.getString(ShuffleServerConf.RPC_SERVER_TYPE));
+ assertEquals("value2", shuffleServerConf.getString("rss.server.had.a.b",
""));
+ assertEquals("GRPC",
shuffleServerConf.get(ShuffleServerConf.RPC_SERVER_TYPE));
+ assertEquals("v1", shuffleServerConf.getString("plugin.custom.key", null));
}
@Test
diff --git a/server/src/test/resources/confTest.conf
b/server/src/test/resources/confTest.conf
index 69170b4f..6007fb90 100644
--- a/server/src/test/resources/confTest.conf
+++ b/server/src/test/resources/confTest.conf
@@ -25,3 +25,4 @@ rss.server.hadoop.a.b value1
rss.server.had.a.b value2
rss.server.multistorage.enable true
rss.server.cluster.hadoop.clustere1.
+plugin.custom.key v1