This is an automated email from the ASF dual-hosted git repository.
xianjingfeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git
The following commit(s) were added to refs/heads/master by this push:
new b8038cb2 [#864] feat(server): Introduce Jersey to strengthen REST API
(#939)
b8038cb2 is described below
commit b8038cb299997353c559f9f45ba2ea888fa72fa4
Author: xianjingfeng <[email protected]>
AuthorDate: Sat Jul 8 00:11:06 2023 +0800
[#864] feat(server): Introduce Jersey to strengthen REST API (#939)
### What changes were proposed in this pull request?
Introduce Jersey to strengthen REST API.
To use a new version of Jersey and being compatible with Hadoop 2.x, we
have to use a shaded version of Jersery.
However Uniffle doesn't do binary release currently, we choose to introduce
hbase's one as a temporary solution.
todo: release a shaded Jersey package for Uniffle's internal use or migrate
to Graddle.
### Why are the changes needed?
Fix: #864
### Does this PR introduce any user-facing change?
No.
### How was this patch tested?
Added UT.
---
common/pom.xml | 4 +
.../uniffle/common/web/CommonMetricsServlet.java | 111 ---------------------
.../uniffle/common/web/JerseyAutoDiscoverable.java | 35 +++++++
.../org/apache/uniffle/common/web/JettyServer.java | 32 ++++--
.../apache/uniffle/common/web/JsonConverter.java | 75 ++++++++++++++
.../common/web/resource/BaseMetricResource.java | 36 +++++++
.../common/web/resource/MetricResource.java | 75 ++++++++++++++
.../web/resource/PrometheusMetricResource.java | 58 +++++++++++
....glassfish.jersey.internal.spi.AutoDiscoverable | 1 +
.../apache/uniffle/common/metrics/TestUtils.java | 1 +
.../uniffle/coordinator/CoordinatorServer.java | 56 +++--------
.../coordinator/web/resource/APIResource.java | 36 +++++++
.../AdminResource.java} | 42 +++++---
.../coordinator/web/resource/ServerResource.java | 108 ++++++++++++++++++++
.../coordinator/web/servlet/BaseServlet.java | 84 ----------------
.../web/servlet/CancelDecommissionServlet.java | 50 ----------
.../web/servlet/DecommissionServlet.java | 50 ----------
.../coordinator/web/servlet/NodesServlet.java | 64 ------------
.../coordinator/metric/CoordinatorMetricsTest.java | 10 ++
pom.xml | 8 +-
.../org/apache/uniffle/server/ShuffleServer.java | 29 ++----
21 files changed, 514 insertions(+), 451 deletions(-)
diff --git a/common/pom.xml b/common/pom.xml
index daf590b9..dc20d6c9 100644
--- a/common/pom.xml
+++ b/common/pom.xml
@@ -145,6 +145,10 @@
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-util</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.apache.hbase.thirdparty</groupId>
+ <artifactId>hbase-shaded-jersey</artifactId>
+ </dependency>
</dependencies>
<build>
diff --git
a/common/src/main/java/org/apache/uniffle/common/web/CommonMetricsServlet.java
b/common/src/main/java/org/apache/uniffle/common/web/CommonMetricsServlet.java
deleted file mode 100644
index e7511349..00000000
---
a/common/src/main/java/org/apache/uniffle/common/web/CommonMetricsServlet.java
+++ /dev/null
@@ -1,111 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.uniffle.common.web;
-
-import java.io.BufferedWriter;
-import java.io.IOException;
-import java.io.Writer;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.Enumeration;
-import java.util.HashSet;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Set;
-import javax.servlet.ServletException;
-import javax.servlet.http.HttpServletRequest;
-import javax.servlet.http.HttpServletResponse;
-
-import com.fasterxml.jackson.databind.ObjectMapper;
-import io.prometheus.client.Collector;
-import io.prometheus.client.Collector.MetricFamilySamples.Sample;
-import io.prometheus.client.CollectorRegistry;
-import io.prometheus.client.exporter.MetricsServlet;
-
-public class CommonMetricsServlet extends MetricsServlet {
-
- final boolean isPrometheus;
- private CollectorRegistry registry;
-
- public CommonMetricsServlet(CollectorRegistry registry) {
- this(registry, false);
- }
-
- public CommonMetricsServlet(CollectorRegistry registry, boolean
isPrometheus) {
- super(registry);
- this.registry = registry;
- this.isPrometheus = isPrometheus;
- }
-
- protected void doGet(HttpServletRequest req, HttpServletResponse resp)
throws ServletException, IOException {
- if (isPrometheus) {
- super.doGet(req, resp);
- } else {
- resp.setStatus(200);
- resp.setContentType("text/plain; version=0.0.4; charset=utf-8");
-
- try (BufferedWriter writer = new BufferedWriter(resp.getWriter())) {
- toJson(writer, getSamples(req));
- writer.flush();
- }
- }
- }
-
- private Set<String> parse(HttpServletRequest req) {
- String[] includedParam = req.getParameterValues("name[]");
- return includedParam == null ? Collections.emptySet() : new
HashSet<>(Arrays.asList(includedParam));
- }
-
- private Enumeration<Collector.MetricFamilySamples>
getSamples(HttpServletRequest req) {
- return this.registry.filteredMetricFamilySamples(this.parse(req));
- }
-
- public void toJson(Writer writer, Enumeration<Collector.MetricFamilySamples>
mfs) throws IOException {
-
- List<Collector.MetricFamilySamples.Sample> metrics = new LinkedList<>();
- while (mfs.hasMoreElements()) {
- Collector.MetricFamilySamples metricFamilySamples = mfs.nextElement();
- metrics.addAll(metricFamilySamples.samples);
- }
-
- MetricsJsonObj res = new MetricsJsonObj(metrics,
System.currentTimeMillis());
- ObjectMapper objectMapper = new ObjectMapper();
- String json = objectMapper.writeValueAsString(res);
- writer.write(json);
- }
-
- private static class MetricsJsonObj {
-
- private final List<Collector.MetricFamilySamples.Sample> metrics;
- private final long timeStamp;
-
- MetricsJsonObj(List<Collector.MetricFamilySamples.Sample> metrics, long
timeStamp) {
- this.metrics = metrics;
- this.timeStamp = timeStamp;
- }
-
- public List<Sample> getMetrics() {
- return metrics;
- }
-
- public long getTimeStamp() {
- return timeStamp;
- }
-
- }
-}
diff --git
a/common/src/main/java/org/apache/uniffle/common/web/JerseyAutoDiscoverable.java
b/common/src/main/java/org/apache/uniffle/common/web/JerseyAutoDiscoverable.java
new file mode 100644
index 00000000..e3d371d7
--- /dev/null
+++
b/common/src/main/java/org/apache/uniffle/common/web/JerseyAutoDiscoverable.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.common.web;
+
+import javax.annotation.Priority;
+
+import org.apache.hbase.thirdparty.javax.ws.rs.core.Configuration;
+import org.apache.hbase.thirdparty.javax.ws.rs.core.FeatureContext;
+import
org.apache.hbase.thirdparty.org.glassfish.jersey.internal.spi.AutoDiscoverable;
+
+@Priority(AutoDiscoverable.DEFAULT_PRIORITY - 100)
+public class JerseyAutoDiscoverable implements AutoDiscoverable {
+ @Override
+ public void configure(FeatureContext context) {
+ Configuration config = context.getConfiguration();
+ if (!config.isRegistered(JsonConverter.class)) {
+ context.register(JsonConverter.class);
+ }
+ }
+}
diff --git
a/common/src/main/java/org/apache/uniffle/common/web/JettyServer.java
b/common/src/main/java/org/apache/uniffle/common/web/JettyServer.java
index 0065b0a0..a7a04043 100644
--- a/common/src/main/java/org/apache/uniffle/common/web/JettyServer.java
+++ b/common/src/main/java/org/apache/uniffle/common/web/JettyServer.java
@@ -22,11 +22,15 @@ import java.net.BindException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Set;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
-import javax.servlet.Servlet;
+import
org.apache.hbase.thirdparty.org.glassfish.jersey.server.ServerProperties;
+import
org.apache.hbase.thirdparty.org.glassfish.jersey.servlet.ServletContainer;
import org.eclipse.jetty.http.HttpVersion;
import org.eclipse.jetty.server.HttpConfiguration;
import org.eclipse.jetty.server.HttpConnectionFactory;
@@ -53,6 +57,8 @@ public class JettyServer {
private Server server;
private ServletContextHandler servletContextHandler;
private int httpPort;
+ private ServletHolder servletHolder;
+ private Set<String> reourcePackages = new HashSet<>();
public JettyServer(RssBaseConf conf) throws FileNotFoundException {
createServer(conf);
@@ -79,11 +85,6 @@ public class JettyServer {
}
}
- public void addServlet(Servlet servlet, String pathSpec) {
- servletContextHandler.addServlet(new ServletHolder(servlet), pathSpec);
- server.setHandler(servletContextHandler);
- }
-
private ExecutorThreadPool createThreadPool(RssBaseConf conf) {
int corePoolSize = conf.getInteger(RssBaseConf.JETTY_CORE_POOL_SIZE);
int maxPoolSize = conf.getInteger(RssBaseConf.JETTY_MAX_POOL_SIZE);
@@ -133,6 +134,21 @@ public class JettyServer {
servletContextHandler = new ServletContextHandler();
servletContextHandler.setContextPath("/");
server.setHandler(servletContextHandler);
+ servletHolder = servletContextHandler.addServlet(ServletContainer.class,
"/*");
+ }
+
+ public void addResourcePackages(String... packages) {
+ reourcePackages.addAll(Arrays.asList(packages));
+ servletHolder.setInitParameter(ServerProperties.PROVIDER_PACKAGES,
+ String.join(",", reourcePackages));
+ }
+
+ public void registerInstance(Class<?> clazz, Object instance) {
+ registerInstance(clazz.getCanonicalName(), instance);
+ }
+
+ public void registerInstance(String name, Object instance) {
+ servletContextHandler.setAttribute(name, instance);
}
public Server getServer() {
@@ -151,8 +167,4 @@ public class JettyServer {
public void stop() throws Exception {
server.stop();
}
-
- public ServletContextHandler getServletContextHandler() {
- return this.servletContextHandler;
- }
}
diff --git
a/common/src/main/java/org/apache/uniffle/common/web/JsonConverter.java
b/common/src/main/java/org/apache/uniffle/common/web/JsonConverter.java
new file mode 100644
index 00000000..b52f0559
--- /dev/null
+++ b/common/src/main/java/org/apache/uniffle/common/web/JsonConverter.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.common.web;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.OutputStreamWriter;
+import java.lang.annotation.Annotation;
+import java.lang.reflect.Type;
+import java.nio.charset.Charset;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.hbase.thirdparty.javax.ws.rs.Consumes;
+import org.apache.hbase.thirdparty.javax.ws.rs.Produces;
+import org.apache.hbase.thirdparty.javax.ws.rs.WebApplicationException;
+import org.apache.hbase.thirdparty.javax.ws.rs.core.MediaType;
+import org.apache.hbase.thirdparty.javax.ws.rs.core.MultivaluedMap;
+import org.apache.hbase.thirdparty.javax.ws.rs.ext.MessageBodyReader;
+import org.apache.hbase.thirdparty.javax.ws.rs.ext.MessageBodyWriter;
+import org.apache.hbase.thirdparty.javax.ws.rs.ext.Provider;
+
+@Provider
+@Consumes({MediaType.APPLICATION_JSON, "text/json", "application/*+json"})
+@Produces({MediaType.APPLICATION_JSON, "text/json", "application/*+json"})
+public class JsonConverter implements MessageBodyReader<Object>,
MessageBodyWriter<Object> {
+ private final ObjectMapper objectMapper = new ObjectMapper();
+
+ @Override
+ public boolean isWriteable(Class<?> type, Type genericType, Annotation[]
annotations,
+ MediaType mediaType) {
+ return true;
+ }
+
+ @Override
+ public void writeTo(Object t, Class<?> type, Type genericType, Annotation[]
annotations,
+ MediaType mediaType, MultivaluedMap<String, Object>
httpHeaders,
+ OutputStream entityStream) throws IOException,
WebApplicationException {
+ objectMapper.writeValue(new OutputStreamWriter(entityStream,
Charset.defaultCharset()), t);
+ }
+
+ @Override
+ public boolean isReadable(Class<?> type, Type genericType, Annotation[]
annotations,
+ MediaType mediaType) {
+ return true;
+ }
+
+ @Override
+ public Object readFrom(Class<Object> type, Type genericType, Annotation[]
annotations,
+ MediaType mediaType, MultivaluedMap<String, String>
httpHeaders,
+ InputStream entityStream) throws IOException,
WebApplicationException {
+ return objectMapper.readValue(entityStream, type);
+ }
+
+ @Override
+ public long getSize(Object t, Class<?> type, Type genericType, Annotation[]
annotations,
+ MediaType mediaType) {
+ return -1;
+ }
+}
diff --git
a/common/src/main/java/org/apache/uniffle/common/web/resource/BaseMetricResource.java
b/common/src/main/java/org/apache/uniffle/common/web/resource/BaseMetricResource.java
new file mode 100644
index 00000000..66351131
--- /dev/null
+++
b/common/src/main/java/org/apache/uniffle/common/web/resource/BaseMetricResource.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.common.web.resource;
+
+import javax.servlet.ServletContext;
+
+import io.prometheus.client.CollectorRegistry;
+
+import org.apache.uniffle.common.exception.InvalidRequestException;
+
+public abstract class BaseMetricResource {
+
+ protected CollectorRegistry getCollectorRegistry(ServletContext
servletContext, String type) {
+ CollectorRegistry registry = (CollectorRegistry)
servletContext.getAttribute(
+ CollectorRegistry.class.getCanonicalName() + "#" + type);
+ if (registry == null) {
+ throw new InvalidRequestException(String.format("Metric type[%s] not
supported", type));
+ }
+ return registry;
+ }
+}
diff --git
a/common/src/main/java/org/apache/uniffle/common/web/resource/MetricResource.java
b/common/src/main/java/org/apache/uniffle/common/web/resource/MetricResource.java
new file mode 100644
index 00000000..9979e6ed
--- /dev/null
+++
b/common/src/main/java/org/apache/uniffle/common/web/resource/MetricResource.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.common.web.resource;
+
+import java.util.Enumeration;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Set;
+import javax.servlet.ServletContext;
+
+import io.prometheus.client.Collector;
+import org.apache.hbase.thirdparty.javax.ws.rs.GET;
+import org.apache.hbase.thirdparty.javax.ws.rs.Path;
+import org.apache.hbase.thirdparty.javax.ws.rs.PathParam;
+import org.apache.hbase.thirdparty.javax.ws.rs.Produces;
+import org.apache.hbase.thirdparty.javax.ws.rs.QueryParam;
+import org.apache.hbase.thirdparty.javax.ws.rs.core.Context;
+import org.apache.hbase.thirdparty.javax.ws.rs.core.MediaType;
+
+@Path("/metrics")
+public class MetricResource extends BaseMetricResource {
+ @Context
+ protected ServletContext servletContext;
+
+ @GET
+ @Path("/{type}")
+ @Produces({ MediaType.APPLICATION_JSON })
+ public MetricsJsonObj metrics(
+ @PathParam("type") String type,
+ @QueryParam("name[]") Set<String> names) {
+ Enumeration<Collector.MetricFamilySamples> mfs =
+ getCollectorRegistry(servletContext,
type).filteredMetricFamilySamples(names);
+ List<Collector.MetricFamilySamples.Sample> metrics = new LinkedList<>();
+ while (mfs.hasMoreElements()) {
+ Collector.MetricFamilySamples metricFamilySamples = mfs.nextElement();
+ metrics.addAll(metricFamilySamples.samples);
+ }
+ return new MetricsJsonObj(metrics, System.currentTimeMillis());
+ }
+
+ private static class MetricsJsonObj {
+
+ private final List<Collector.MetricFamilySamples.Sample> metrics;
+ private final long timeStamp;
+
+ MetricsJsonObj(List<Collector.MetricFamilySamples.Sample> metrics, long
timeStamp) {
+ this.metrics = metrics;
+ this.timeStamp = timeStamp;
+ }
+
+ public List<Collector.MetricFamilySamples.Sample> getMetrics() {
+ return metrics;
+ }
+
+ public long getTimeStamp() {
+ return timeStamp;
+ }
+
+ }
+}
diff --git
a/common/src/main/java/org/apache/uniffle/common/web/resource/PrometheusMetricResource.java
b/common/src/main/java/org/apache/uniffle/common/web/resource/PrometheusMetricResource.java
new file mode 100644
index 00000000..3bfa64f6
--- /dev/null
+++
b/common/src/main/java/org/apache/uniffle/common/web/resource/PrometheusMetricResource.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.common.web.resource;
+
+import java.io.BufferedWriter;
+import java.io.IOException;
+import java.io.Writer;
+import java.util.Set;
+import javax.servlet.ServletContext;
+import javax.servlet.http.HttpServletResponse;
+
+import io.prometheus.client.exporter.common.TextFormat;
+import org.apache.hbase.thirdparty.javax.ws.rs.GET;
+import org.apache.hbase.thirdparty.javax.ws.rs.Path;
+import org.apache.hbase.thirdparty.javax.ws.rs.PathParam;
+import org.apache.hbase.thirdparty.javax.ws.rs.QueryParam;
+import org.apache.hbase.thirdparty.javax.ws.rs.core.Context;
+
+@Path("/prometheus/metrics")
+public class PrometheusMetricResource extends BaseMetricResource {
+ @Context
+ private HttpServletResponse httpServletResponse;
+ @Context
+ protected ServletContext servletContext;
+
+ @GET
+ @Path("/{type}")
+ public String metrics(
+ @PathParam("type") String type,
+ @QueryParam("name[]") Set<String> names) throws IOException {
+ httpServletResponse.setStatus(200);
+ httpServletResponse.setContentType("text/plain; version=0.0.4;
charset=utf-8");
+ Writer writer = new BufferedWriter(httpServletResponse.getWriter());
+
+ try {
+ TextFormat.write004(writer, getCollectorRegistry(servletContext,
type).filteredMetricFamilySamples(names));
+ writer.flush();
+ } finally {
+ writer.close();
+ }
+ return null;
+ }
+}
diff --git
a/common/src/main/resources/META-INF/services/org.apache.hbase.thirdparty.org.glassfish.jersey.internal.spi.AutoDiscoverable
b/common/src/main/resources/META-INF/services/org.apache.hbase.thirdparty.org.glassfish.jersey.internal.spi.AutoDiscoverable
new file mode 100644
index 00000000..a36c7e1a
--- /dev/null
+++
b/common/src/main/resources/META-INF/services/org.apache.hbase.thirdparty.org.glassfish.jersey.internal.spi.AutoDiscoverable
@@ -0,0 +1 @@
+org.apache.uniffle.common.web.JerseyAutoDiscoverable
diff --git
a/common/src/test/java/org/apache/uniffle/common/metrics/TestUtils.java
b/common/src/test/java/org/apache/uniffle/common/metrics/TestUtils.java
index ea623ea5..b0f6f483 100644
--- a/common/src/test/java/org/apache/uniffle/common/metrics/TestUtils.java
+++ b/common/src/test/java/org/apache/uniffle/common/metrics/TestUtils.java
@@ -49,6 +49,7 @@ public class TestUtils {
HttpURLConnection con = (HttpURLConnection) url.openConnection();
con.setDoOutput(true);
con.setRequestMethod("POST");
+ con.setRequestProperty("Content-type", "application/json");
StringBuilder content = new StringBuilder();
try (OutputStream outputStream = con.getOutputStream();) {
outputStream.write(postData.getBytes());
diff --git
a/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorServer.java
b/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorServer.java
index 286ea179..9c32dc68 100644
---
a/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorServer.java
+++
b/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorServer.java
@@ -35,17 +35,12 @@ import org.apache.uniffle.common.rpc.ServerInterface;
import org.apache.uniffle.common.security.SecurityConfig;
import org.apache.uniffle.common.security.SecurityContextFactory;
import org.apache.uniffle.common.util.RssUtils;
-import org.apache.uniffle.common.web.CommonMetricsServlet;
import org.apache.uniffle.common.web.JettyServer;
import org.apache.uniffle.coordinator.metric.CoordinatorGrpcMetrics;
import org.apache.uniffle.coordinator.metric.CoordinatorMetrics;
import org.apache.uniffle.coordinator.strategy.assignment.AssignmentStrategy;
import
org.apache.uniffle.coordinator.strategy.assignment.AssignmentStrategyFactory;
import org.apache.uniffle.coordinator.util.CoordinatorUtils;
-import org.apache.uniffle.coordinator.web.servlet.CancelDecommissionServlet;
-import org.apache.uniffle.coordinator.web.servlet.DecommissionServlet;
-import org.apache.uniffle.coordinator.web.servlet.NodesServlet;
-import org.apache.uniffle.coordinator.web.servlet.admin.RefreshCheckerServlet;
import static
org.apache.uniffle.common.config.RssBaseConf.RSS_SECURITY_HADOOP_KERBEROS_ENABLE;
import static
org.apache.uniffle.common.config.RssBaseConf.RSS_SECURITY_HADOOP_KERBEROS_KEYTAB_FILE;
@@ -157,8 +152,6 @@ public class CoordinatorServer extends ReconfigurableBase {
int port = coordinatorConf.getInteger(CoordinatorConf.RPC_SERVER_PORT);
id = ip + "-" + port;
LOG.info("Start to initialize coordinator {}", id);
- jettyServer = new JettyServer(coordinatorConf);
- registerRESTAPI();
// register metrics first to avoid NPE problem when add dynamic metrics
registerMetrics();
coordinatorConf.setString(CoordinatorUtils.COORDINATOR_ID, id);
@@ -175,7 +168,6 @@ public class CoordinatorServer extends ReconfigurableBase {
}
SecurityContextFactory.get().init(securityConfig);
-
// load default hadoop configuration
Configuration hadoopConf = new Configuration();
ClusterManagerFactory clusterManagerFactory = new
ClusterManagerFactory(coordinatorConf, hadoopConf);
@@ -189,22 +181,18 @@ public class CoordinatorServer extends ReconfigurableBase
{
applicationManager.getQuotaManager(), hadoopConf);
CoordinatorFactory coordinatorFactory = new CoordinatorFactory(this);
server = coordinatorFactory.getServer();
- }
-
- private void registerRESTAPI() throws Exception {
- LOG.info("Register REST API");
- jettyServer.addServlet(
- new NodesServlet(this),
- "/api/server/nodes");
- jettyServer.addServlet(
- new DecommissionServlet(this),
- "/api/server/decommission");
- jettyServer.addServlet(
- new CancelDecommissionServlet(this),
- "/api/server/cancelDecommission");
- jettyServer.addServlet(
- new RefreshCheckerServlet(this),
- "/api/admin/refreshChecker");
+ jettyServer = new JettyServer(coordinatorConf);
+ // register packages and instances for jersey
+
jettyServer.addResourcePackages("org.apache.uniffle.coordinator.web.resource",
+ "org.apache.uniffle.common.web.resource");
+ jettyServer.registerInstance(ClusterManager.class, clusterManager);
+ jettyServer.registerInstance(AccessManager.class, accessManager);
+ jettyServer.registerInstance(CollectorRegistry.class.getCanonicalName() +
"#server",
+ CoordinatorMetrics.getCollectorRegistry());
+ jettyServer.registerInstance(CollectorRegistry.class.getCanonicalName() +
"#grpc",
+ grpcMetrics.getCollectorRegistry());
+ jettyServer.registerInstance(CollectorRegistry.class.getCanonicalName() +
"#jvm",
+ JvmMetrics.getCollectorRegistry());
}
private void registerMetrics() throws Exception {
@@ -217,26 +205,6 @@ public class CoordinatorServer extends ReconfigurableBase {
CollectorRegistry jvmCollectorRegistry = new CollectorRegistry(true);
JvmMetrics.register(jvmCollectorRegistry, verbose);
- LOG.info("Add metrics servlet");
- jettyServer.addServlet(
- new CommonMetricsServlet(CoordinatorMetrics.getCollectorRegistry()),
- "/metrics/server");
- jettyServer.addServlet(
- new CommonMetricsServlet(grpcMetrics.getCollectorRegistry()),
- "/metrics/grpc");
- jettyServer.addServlet(
- new CommonMetricsServlet(JvmMetrics.getCollectorRegistry()),
- "/metrics/jvm");
- jettyServer.addServlet(
- new CommonMetricsServlet(CoordinatorMetrics.getCollectorRegistry(),
true),
- "/prometheus/metrics/server");
- jettyServer.addServlet(
- new CommonMetricsServlet(grpcMetrics.getCollectorRegistry(), true),
- "/prometheus/metrics/grpc");
- jettyServer.addServlet(
- new CommonMetricsServlet(JvmMetrics.getCollectorRegistry(), true),
- "/prometheus/metrics/jvm");
-
metricReporter = MetricReporterFactory.getMetricReporter(coordinatorConf,
id);
if (metricReporter != null) {
metricReporter.addCollectorRegistry(CoordinatorMetrics.getCollectorRegistry());
diff --git
a/coordinator/src/main/java/org/apache/uniffle/coordinator/web/resource/APIResource.java
b/coordinator/src/main/java/org/apache/uniffle/coordinator/web/resource/APIResource.java
new file mode 100644
index 00000000..a948ad28
--- /dev/null
+++
b/coordinator/src/main/java/org/apache/uniffle/coordinator/web/resource/APIResource.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.coordinator.web.resource;
+
+import org.apache.hbase.thirdparty.javax.ws.rs.Path;
+import org.apache.hbase.thirdparty.javax.ws.rs.Produces;
+import org.apache.hbase.thirdparty.javax.ws.rs.core.MediaType;
+
+@Path("api")
+@Produces({ MediaType.APPLICATION_JSON })
+public class APIResource {
+ @Path("server")
+ public Class<ServerResource> getServerResource() {
+ return ServerResource.class;
+ }
+
+ @Path("admin")
+ public Class<AdminResource> getAdminResource() {
+ return AdminResource.class;
+ }
+}
diff --git
a/coordinator/src/main/java/org/apache/uniffle/coordinator/web/servlet/admin/RefreshCheckerServlet.java
b/coordinator/src/main/java/org/apache/uniffle/coordinator/web/resource/AdminResource.java
similarity index 55%
rename from
coordinator/src/main/java/org/apache/uniffle/coordinator/web/servlet/admin/RefreshCheckerServlet.java
rename to
coordinator/src/main/java/org/apache/uniffle/coordinator/web/resource/AdminResource.java
index a61f1976..6cb773e5 100644
---
a/coordinator/src/main/java/org/apache/uniffle/coordinator/web/servlet/admin/RefreshCheckerServlet.java
+++
b/coordinator/src/main/java/org/apache/uniffle/coordinator/web/resource/AdminResource.java
@@ -15,36 +15,46 @@
* limitations under the License.
*/
-package org.apache.uniffle.coordinator.web.servlet.admin;
+package org.apache.uniffle.coordinator.web.resource;
import java.util.List;
+import javax.servlet.ServletContext;
import javax.servlet.http.HttpServletRequest;
-import javax.servlet.http.HttpServletResponse;
+import org.apache.hbase.thirdparty.javax.ws.rs.GET;
+import org.apache.hbase.thirdparty.javax.ws.rs.Path;
+import org.apache.hbase.thirdparty.javax.ws.rs.Produces;
+import org.apache.hbase.thirdparty.javax.ws.rs.core.Context;
+import org.apache.hbase.thirdparty.javax.ws.rs.core.MediaType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.uniffle.coordinator.CoordinatorServer;
+import org.apache.uniffle.coordinator.AccessManager;
+import org.apache.uniffle.coordinator.ServerNode;
import org.apache.uniffle.coordinator.access.checker.AccessChecker;
import org.apache.uniffle.coordinator.web.Response;
-import org.apache.uniffle.coordinator.web.servlet.BaseServlet;
-public class RefreshCheckerServlet extends BaseServlet {
-
- private static final Logger LOG =
LoggerFactory.getLogger(RefreshCheckerServlet.class);
- private final CoordinatorServer coordinator;
-
- public RefreshCheckerServlet(CoordinatorServer coordinator) {
- this.coordinator = coordinator;
- }
-
- @Override
- protected Response handleGet(HttpServletRequest req, HttpServletResponse
resp) {
- List<AccessChecker> accessCheckers =
coordinator.getAccessManager().getAccessCheckers();
+@Produces({MediaType.APPLICATION_JSON})
+public class AdminResource {
+ private static final Logger LOG =
LoggerFactory.getLogger(AdminResource.class);
+ @Context
+ private HttpServletRequest httpRequest;
+ @Context
+ protected ServletContext servletContext;
+
+ @GET
+ @Path("/refreshChecker")
+ public Response<List<ServerNode>> refreshChecker() {
+ List<AccessChecker> accessCheckers =
getAccessManager().getAccessCheckers();
LOG.info(
"The access checker {} has been refreshed, you can add the checker via
rss.coordinator.access.checkers.",
accessCheckers);
accessCheckers.forEach(AccessChecker::refreshAccessChecker);
return Response.success(null);
}
+
+ private AccessManager getAccessManager() {
+ return (AccessManager) servletContext.getAttribute(
+ AccessManager.class.getCanonicalName());
+ }
}
diff --git
a/coordinator/src/main/java/org/apache/uniffle/coordinator/web/resource/ServerResource.java
b/coordinator/src/main/java/org/apache/uniffle/coordinator/web/resource/ServerResource.java
new file mode 100644
index 00000000..776cd286
--- /dev/null
+++
b/coordinator/src/main/java/org/apache/uniffle/coordinator/web/resource/ServerResource.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.coordinator.web.resource;
+
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.stream.Collectors;
+import javax.servlet.ServletContext;
+
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.hbase.thirdparty.javax.ws.rs.GET;
+import org.apache.hbase.thirdparty.javax.ws.rs.POST;
+import org.apache.hbase.thirdparty.javax.ws.rs.Path;
+import org.apache.hbase.thirdparty.javax.ws.rs.Produces;
+import org.apache.hbase.thirdparty.javax.ws.rs.QueryParam;
+import org.apache.hbase.thirdparty.javax.ws.rs.core.Context;
+import org.apache.hbase.thirdparty.javax.ws.rs.core.MediaType;
+
+import org.apache.uniffle.common.ServerStatus;
+import org.apache.uniffle.coordinator.ClusterManager;
+import org.apache.uniffle.coordinator.ServerNode;
+import org.apache.uniffle.coordinator.web.Response;
+import org.apache.uniffle.coordinator.web.request.CancelDecommissionRequest;
+import org.apache.uniffle.coordinator.web.request.DecommissionRequest;
+
+@Produces({ MediaType.APPLICATION_JSON })
+public class ServerResource {
+ @Context
+ protected ServletContext servletContext;
+
+ @GET
+ @Path("/nodes")
+ public Response<List<ServerNode>> nodes(@QueryParam("id") String id,
@QueryParam("status") String status) {
+ ClusterManager clusterManager = getClusterManager();
+ List<ServerNode> serverList;
+ if (ServerStatus.UNHEALTHY.name().equalsIgnoreCase(status)) {
+ serverList = clusterManager.getUnhealthyServerList();
+ } else if (ServerStatus.LOST.name().equalsIgnoreCase(status)) {
+ serverList = clusterManager.getLostServerList();
+ } else {
+ serverList = clusterManager.getServerList(Collections.emptySet());
+ }
+ serverList = serverList.stream().filter(server -> {
+ if (id != null && !id.equals(server.getId())) {
+ return false;
+ }
+ if (status != null && !server.getStatus().toString().equals(status)) {
+ return false;
+ }
+ return true;
+ }).collect(Collectors.toList());
+ serverList.sort(Comparator.comparing(ServerNode::getId));
+ return Response.success(serverList);
+ }
+
+ @POST
+ @Path("/cancelDecommission")
+ @Produces({ MediaType.APPLICATION_JSON })
+ public Response<Object> cancelDecommission(CancelDecommissionRequest params)
{
+ if (CollectionUtils.isEmpty(params.getServerIds())) {
+ return Response.fail("Parameter[serverIds] should not be null!");
+ }
+ ClusterManager clusterManager = getClusterManager();
+ try {
+ params.getServerIds().forEach(clusterManager::cancelDecommission);
+ } catch (Exception e) {
+ return Response.fail(e.getMessage());
+ }
+ return Response.success(null);
+ }
+
+ @POST
+ @Path("/decommission")
+ @Produces({ MediaType.APPLICATION_JSON })
+ public Response<Object> decommission(DecommissionRequest params) {
+ if (CollectionUtils.isEmpty(params.getServerIds())) {
+ return Response.fail("Parameter[serverIds] should not be null!");
+ }
+ ClusterManager clusterManager = getClusterManager();
+ try {
+ params.getServerIds().forEach(clusterManager::decommission);
+ } catch (Exception e) {
+ return Response.fail(e.getMessage());
+ }
+ return Response.success(null);
+ }
+
+ private ClusterManager getClusterManager() {
+ return (ClusterManager) servletContext.getAttribute(
+ ClusterManager.class.getCanonicalName());
+ }
+}
diff --git
a/coordinator/src/main/java/org/apache/uniffle/coordinator/web/servlet/BaseServlet.java
b/coordinator/src/main/java/org/apache/uniffle/coordinator/web/servlet/BaseServlet.java
deleted file mode 100644
index 99948dae..00000000
---
a/coordinator/src/main/java/org/apache/uniffle/coordinator/web/servlet/BaseServlet.java
+++ /dev/null
@@ -1,84 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.uniffle.coordinator.web.servlet;
-
-import java.io.IOException;
-import java.io.OutputStream;
-import java.util.concurrent.Callable;
-import javax.servlet.ServletException;
-import javax.servlet.http.HttpServlet;
-import javax.servlet.http.HttpServletRequest;
-import javax.servlet.http.HttpServletResponse;
-
-import com.fasterxml.jackson.annotation.JsonInclude;
-import com.fasterxml.jackson.databind.ObjectMapper;
-
-import org.apache.uniffle.coordinator.web.Response;
-
-public abstract class BaseServlet<T> extends HttpServlet {
- public static final String JSON_MIME_TYPE = "application/json";
- final ObjectMapper mapper = new
ObjectMapper().setSerializationInclusion(JsonInclude.Include.NON_NULL);
-
- @Override
- protected void doGet(HttpServletRequest req, HttpServletResponse resp)
throws IOException {
- writeJSON(resp, handlerRequest(() -> handleGet(req, resp)));
- }
-
- @Override
- protected void doPost(HttpServletRequest req, HttpServletResponse resp)
throws IOException {
- writeJSON(resp, handlerRequest(() -> handlePost(req, resp)));
- }
-
- private Response<T> handlerRequest(
- Callable<Response<T>> function) {
- Response<T> response;
- try {
- // todo: Do something for authentication
- response = function.call();
- } catch (Exception e) {
- response = Response.fail(e.getMessage());
- }
- return response;
- }
-
- protected Response<T> handleGet(
- HttpServletRequest req,
- HttpServletResponse resp) throws ServletException, IOException {
- throw new IOException("Method not support!");
- }
-
- protected Response<T> handlePost(
- HttpServletRequest req,
- HttpServletResponse resp) throws ServletException, IOException {
- throw new IOException("Method not support!");
- }
-
- protected void writeJSON(final HttpServletResponse resp, final Object obj)
- throws IOException {
- if (obj == null) {
- return;
- }
- resp.setContentType(JSON_MIME_TYPE);
- final OutputStream stream = resp.getOutputStream();
- mapper.writeValue(stream, obj);
- }
-
- protected <T> T parseParamsFromJson(HttpServletRequest req, Class<T> clazz)
throws IOException {
- return mapper.readValue(req.getInputStream(), clazz);
- }
-}
diff --git
a/coordinator/src/main/java/org/apache/uniffle/coordinator/web/servlet/CancelDecommissionServlet.java
b/coordinator/src/main/java/org/apache/uniffle/coordinator/web/servlet/CancelDecommissionServlet.java
deleted file mode 100644
index 24c77f8c..00000000
---
a/coordinator/src/main/java/org/apache/uniffle/coordinator/web/servlet/CancelDecommissionServlet.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.uniffle.coordinator.web.servlet;
-
-import java.io.IOException;
-import javax.servlet.http.HttpServletRequest;
-import javax.servlet.http.HttpServletResponse;
-
-import org.apache.commons.collections.CollectionUtils;
-
-import org.apache.uniffle.coordinator.ClusterManager;
-import org.apache.uniffle.coordinator.CoordinatorServer;
-import org.apache.uniffle.coordinator.web.Response;
-import org.apache.uniffle.coordinator.web.request.CancelDecommissionRequest;
-
-public class CancelDecommissionServlet extends BaseServlet<Object> {
- private final CoordinatorServer coordinator;
-
- public CancelDecommissionServlet(CoordinatorServer coordinator) {
- this.coordinator = coordinator;
- }
-
- @Override
- protected Response<Object> handlePost(HttpServletRequest req,
HttpServletResponse resp) throws IOException {
- CancelDecommissionRequest params = parseParamsFromJson(req,
CancelDecommissionRequest.class);
- if (CollectionUtils.isEmpty(params.getServerIds())) {
- return Response.fail("Parameter[serverIds] should not be null!");
- }
- ClusterManager clusterManager = coordinator.getClusterManager();
- params.getServerIds().forEach((serverId) -> {
- clusterManager.cancelDecommission(serverId);
- });
- return Response.success(null);
- }
-}
diff --git
a/coordinator/src/main/java/org/apache/uniffle/coordinator/web/servlet/DecommissionServlet.java
b/coordinator/src/main/java/org/apache/uniffle/coordinator/web/servlet/DecommissionServlet.java
deleted file mode 100644
index 3f3ab1ef..00000000
---
a/coordinator/src/main/java/org/apache/uniffle/coordinator/web/servlet/DecommissionServlet.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.uniffle.coordinator.web.servlet;
-
-import java.io.IOException;
-import javax.servlet.http.HttpServletRequest;
-import javax.servlet.http.HttpServletResponse;
-
-import org.apache.commons.collections.CollectionUtils;
-
-import org.apache.uniffle.coordinator.ClusterManager;
-import org.apache.uniffle.coordinator.CoordinatorServer;
-import org.apache.uniffle.coordinator.web.Response;
-import org.apache.uniffle.coordinator.web.request.DecommissionRequest;
-
-public class DecommissionServlet extends BaseServlet<Object> {
- private final CoordinatorServer coordinator;
-
- public DecommissionServlet(CoordinatorServer coordinator) {
- this.coordinator = coordinator;
- }
-
- @Override
- protected Response<Object> handlePost(HttpServletRequest req,
HttpServletResponse resp) throws IOException {
- DecommissionRequest params = parseParamsFromJson(req,
DecommissionRequest.class);
- if (CollectionUtils.isEmpty(params.getServerIds())) {
- return Response.fail("Parameter[serverIds] should not be null!");
- }
- ClusterManager clusterManager = coordinator.getClusterManager();
- params.getServerIds().forEach((serverId) -> {
- clusterManager.decommission(serverId);
- });
- return Response.success(null);
- }
-}
diff --git
a/coordinator/src/main/java/org/apache/uniffle/coordinator/web/servlet/NodesServlet.java
b/coordinator/src/main/java/org/apache/uniffle/coordinator/web/servlet/NodesServlet.java
deleted file mode 100644
index ea073cad..00000000
---
a/coordinator/src/main/java/org/apache/uniffle/coordinator/web/servlet/NodesServlet.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.uniffle.coordinator.web.servlet;
-
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.List;
-import java.util.stream.Collectors;
-import javax.servlet.http.HttpServletRequest;
-import javax.servlet.http.HttpServletResponse;
-
-import org.apache.uniffle.common.ServerStatus;
-import org.apache.uniffle.coordinator.CoordinatorServer;
-import org.apache.uniffle.coordinator.ServerNode;
-import org.apache.uniffle.coordinator.web.Response;
-
-
-public class NodesServlet extends BaseServlet<List<ServerNode>> {
- private final CoordinatorServer coordinator;
-
- public NodesServlet(CoordinatorServer coordinator) {
- this.coordinator = coordinator;
- }
-
- @Override
- protected Response handleGet(HttpServletRequest req, HttpServletResponse
resp) {
- List<ServerNode> serverList;
- String id = req.getParameter("id");
- String status = req.getParameter("status");
- if (ServerStatus.UNHEALTHY.name().equalsIgnoreCase(status)) {
- serverList = coordinator.getClusterManager().getUnhealthyServerList();
- } else if (ServerStatus.LOST.name().equalsIgnoreCase(status)) {
- serverList = coordinator.getClusterManager().getLostServerList();
- } else {
- serverList =
coordinator.getClusterManager().getServerList(Collections.EMPTY_SET);
- }
- serverList = serverList.stream().filter((server) -> {
- if (id != null && !id.equals(server.getId())) {
- return false;
- }
- if (status != null && !server.getStatus().toString().equals(status)) {
- return false;
- }
- return true;
- }).collect(Collectors.toList());
- Collections.sort(serverList, Comparator.comparing(ServerNode::getId));
- return Response.success(serverList);
- }
-}
diff --git
a/coordinator/src/test/java/org/apache/uniffle/coordinator/metric/CoordinatorMetricsTest.java
b/coordinator/src/test/java/org/apache/uniffle/coordinator/metric/CoordinatorMetricsTest.java
index 454e6bb8..d0ebae2f 100644
---
a/coordinator/src/test/java/org/apache/uniffle/coordinator/metric/CoordinatorMetricsTest.java
+++
b/coordinator/src/test/java/org/apache/uniffle/coordinator/metric/CoordinatorMetricsTest.java
@@ -99,6 +99,16 @@ public class CoordinatorMetricsTest {
assertEquals(10, actualMetrics);
}
+ @Test
+ public void testCoordinatorMetricsWithNames() throws Exception {
+ String content = TestUtils.httpGet(SERVER_METRICS_URL
+ + "?name[]=total_app_num&name[]=running_app_num");
+ ObjectMapper mapper = new ObjectMapper();
+ JsonNode actualObj = mapper.readTree(content);
+ assertEquals(2, actualObj.size());
+ assertEquals(2, actualObj.get("metrics").size());
+ }
+
@Test
public void testJvmMetrics() throws Exception {
String content = TestUtils.httpGet(SERVER_JVM_URL);
diff --git a/pom.xml b/pom.xml
index 64704b8f..ac908b60 100644
--- a/pom.xml
+++ b/pom.xml
@@ -54,6 +54,7 @@
<java.version>1.8</java.version>
<javax.annotation.version>1.3.2</javax.annotation.version>
<jetty.version>9.3.24.v20180605</jetty.version>
+ <hbase.thirdparty.version>4.1.4</hbase.thirdparty.version>
<junit.jupiter.version>5.8.2</junit.jupiter.version>
<junit.platform.version>1.8.2</junit.platform.version>
<system.stubs.version>2.0.1</system.stubs.version>
@@ -693,7 +694,11 @@
<artifactId>jetty-util</artifactId>
<version>${jetty.version}</version>
</dependency>
-
+ <dependency>
+ <groupId>org.apache.hbase.thirdparty</groupId>
+ <artifactId>hbase-shaded-jersey</artifactId>
+ <version>${hbase.thirdparty.version}</version>
+ </dependency>
</dependencies>
</dependencyManagement>
@@ -1022,6 +1027,7 @@
<exclude>DISCLAIMER-WIP</exclude>
<exclude>NOTICE</exclude>
<exclude>**/target/**</exclude>
+ <exclude>**/src/main/resources/META-INF/services/*</exclude>
<exclude>src/test/resources/empty</exclude>
<exclude>**/dependency-reduced-pom.xml</exclude>
<exclude>.github/PULL_REQUEST_TEMPLATE</exclude>
diff --git a/server/src/main/java/org/apache/uniffle/server/ShuffleServer.java
b/server/src/main/java/org/apache/uniffle/server/ShuffleServer.java
index 13a44009..9b5eadfd 100644
--- a/server/src/main/java/org/apache/uniffle/server/ShuffleServer.java
+++ b/server/src/main/java/org/apache/uniffle/server/ShuffleServer.java
@@ -49,7 +49,6 @@ import org.apache.uniffle.common.util.Constants;
import org.apache.uniffle.common.util.ExitUtils;
import org.apache.uniffle.common.util.RssUtils;
import org.apache.uniffle.common.util.ThreadUtils;
-import org.apache.uniffle.common.web.CommonMetricsServlet;
import org.apache.uniffle.common.web.JettyServer;
import org.apache.uniffle.server.buffer.ShuffleBufferManager;
import org.apache.uniffle.server.netty.StreamServer;
@@ -207,6 +206,14 @@ public class ShuffleServer {
jettyServer = new JettyServer(shuffleServerConf);
registerMetrics();
+ // register packages and instances for jersey
+ jettyServer.addResourcePackages("org.apache.uniffle.common.web.resource");
+ jettyServer.registerInstance(CollectorRegistry.class.getCanonicalName() +
"#server",
+ ShuffleServerMetrics.getCollectorRegistry());
+ jettyServer.registerInstance(CollectorRegistry.class.getCanonicalName() +
"#grpc",
+ grpcMetrics.getCollectorRegistry());
+ jettyServer.registerInstance(CollectorRegistry.class.getCanonicalName() +
"#jvm",
+ JvmMetrics.getCollectorRegistry());
SecurityConfig securityConfig = null;
if (shuffleServerConf.getBoolean(RSS_SECURITY_HADOOP_KERBEROS_ENABLE)) {
@@ -275,26 +282,6 @@ public class ShuffleServer {
CollectorRegistry jvmCollectorRegistry = new CollectorRegistry(true);
boolean verbose =
shuffleServerConf.getBoolean(ShuffleServerConf.RSS_JVM_METRICS_VERBOSE_ENABLE);
JvmMetrics.register(jvmCollectorRegistry, verbose);
-
- LOG.info("Add metrics servlet");
- jettyServer.addServlet(
- new CommonMetricsServlet(ShuffleServerMetrics.getCollectorRegistry()),
- "/metrics/server");
- jettyServer.addServlet(
- new CommonMetricsServlet(grpcMetrics.getCollectorRegistry()),
- "/metrics/grpc");
- jettyServer.addServlet(
- new CommonMetricsServlet(JvmMetrics.getCollectorRegistry()),
- "/metrics/jvm");
- jettyServer.addServlet(
- new CommonMetricsServlet(ShuffleServerMetrics.getCollectorRegistry(),
true),
- "/prometheus/metrics/server");
- jettyServer.addServlet(
- new CommonMetricsServlet(grpcMetrics.getCollectorRegistry(), true),
- "/prometheus/metrics/grpc");
- jettyServer.addServlet(
- new CommonMetricsServlet(JvmMetrics.getCollectorRegistry(), true),
- "/prometheus/metrics/jvm");
}
private void initMetricsReporter() throws Exception {