This is an automated email from the ASF dual-hosted git repository.
rgoers pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/logging-flume.git
The following commit(s) were added to refs/heads/trunk by this push:
new b9ac63142 Move monitoring services to flume-ng-instrumentation modules
(#445)
b9ac63142 is described below
commit b9ac63142a1a75f233b98bde4a274a8e395fe6e4
Author: Piotr P. Karwasz <[email protected]>
AuthorDate: Thu Jun 11 15:47:02 2026 +0200
Move monitoring services to flume-ng-instrumentation modules (#445)
* Move HTTPSource to its own flume-http-source module
Keeping HTTPSource in flume-ng-core forced the heavy Jetty/Gson HTTP
stack onto every core consumer. Extract it into a dedicated optional
source module under flume-ng-sources, like taildir, so it ships only
when needed.
The package (org.apache.flume.source.http) is unchanged, so the
SourceType.HTTP reflective mapping and existing "http" configs keep
working. Core retains Jetty/Gson for its metrics server and
HTTPServerConstraintUtil.
Assisted-By: Claude Opus 4.8 (1M context) <[email protected]>
* Make flume-http-source self-contained for HTTP constraints
HTTPSource was reaching into flume-ng-core for HTTPServerConstraintUtil.
Move that helper into the module next to its only caller and make it
package-private, so the HTTP source no longer depends on a core internal
for its Jetty constraint handling. Declare jetty-security directly here
(managed in flume-parent) since the helper uses it.
Assisted-By: Claude Opus 4.8 (1M context) <[email protected]>
* Move monitoring services to flume-ng-instrumentation modules
Split the Ganglia, HTTP and Prometheus MonitorService implementations
out of flume-ng-core into three modules under a new flume-ng-instrumentation
parent, each in its own package so there are no split packages.
Replace the hardcoded MonitoringType enum with ServiceLoader discovery:
MonitorService gains a getType() default method, each provider declares
itself via META-INF/services, and the node selects one by matching
flume.monitoring.type against getType() (FQCN fallback preserved). The
modules are wired through the BOM and bundled by flume-ng-dist only.
Core keeps the MonitorService interface and JMXPollUtil, and drops the now
unused gson and prometheus dependencies.
Assisted-By: Claude Opus 4.8 (1M context) <[email protected]>
* Declare gson dependency in flume-taildir-source
The taildir source uses gson for its position file but was getting it
transitively from flume-ng-core, which no longer provides it after the
monitoring split. Without an explicit dependency the position file
handling fails at runtime with NoClassDefFoundError.
Assisted-By: Claude Opus 4.8 (1M context) <[email protected]>
* Upgrade Prometheus and Jetty
* Fix Prometheus
* Migrate flume-http-source to Jetty 12 and jakarta.servlet
Finish the Prometheus and Jetty upgrade for the modules split out of
flume-ng-core: port HTTPSource and its handlers and tests from
javax.servlet / Jetty 9 to jakarta.servlet / Jetty 12 (ee11), and align
the flume-http-monitor, flume-prometheus-monitor and flume-http-source
poms with the new artifacts.
The HTTPS tests now generate their TLS material with Bouncy Castle (the
server keystore in a temporary folder, the client truststore in memory)
instead of a checked-in keystore, keeping Jetty 12's SNI host checking
enabled.
Assisted-By: Claude Opus 4.8 (1M context) <[email protected]>
---------
Co-authored-by: Ralph Goers <[email protected]>
---
flume-bom/pom.xml | 20 ++
flume-ng-core/pom.xml | 19 +-
.../flume/instrumentation/MonitorService.java | 16 +-
.../flume/source/TestDefaultSourceFactory.java | 1 -
flume-ng-dist/pom.xml | 16 ++
.../flume-ganglia-monitor}/pom.xml | 27 +--
.../instrumentation/ganglia}/GangliaServer.java | 8 +-
...org.apache.flume.instrumentation.MonitorService | 1 +
.../flume-http-monitor}/pom.xml | 43 +++--
.../instrumentation/http/HTTPMetricsServer.java | 74 ++++----
...org.apache.flume.instrumentation.MonitorService | 1 +
.../instrumentation/http/BaseHTTPMetricsTest.java | 0
.../http/TestHTTPMetricsServer.java | 2 +-
.../flume/instrumentation/util/JMXTestUtils.java | 29 ++-
.../flume-prometheus-monitor}/pom.xml | 47 +++--
.../prometheus}/PrometheusHTTPMetricsServer.java | 207 +++++++++------------
...org.apache.flume.instrumentation.MonitorService | 1 +
.../prometheus}/BaseHTTPMetricsTest.java | 2 +-
.../prometheus}/TestPrometheusMetricsServer.java | 26 +--
.../pom.xml | 51 ++---
.../java/org/apache/flume/node/Application.java | 31 +--
.../flume/node/TestHttpConfigurationSource.java | 12 +-
flume-ng-sources/flume-http-source/pom.xml | 21 ++-
.../org/apache/flume/source/http/BLOBHandler.java | 2 +-
.../source/http/HTTPServerConstraintUtil.java | 36 ++--
.../org/apache/flume/source/http/HTTPSource.java | 12 +-
.../flume/source/http/HTTPSourceHandler.java | 2 +-
.../org/apache/flume/source/http/JSONHandler.java | 2 +-
.../http/FlumeHttpServletRequestWrapper.java | 54 +++---
.../apache/flume/source/http/TestBLOBHandler.java | 6 +-
.../apache/flume/source/http/TestHTTPSource.java | 89 +++++----
.../apache/flume/source/http/TestJSONHandler.java | 2 +-
.../apache/flume/source/http/X509Certificates.java | 95 ++++++++++
.../src/test/resources/jettykeystore | Bin 1355 -> 0 bytes
flume-ng-sources/flume-taildir-source/pom.xml | 5 +
flume-parent/pom.xml | 30 ++-
pom.xml | 1 +
37 files changed, 572 insertions(+), 419 deletions(-)
diff --git a/flume-bom/pom.xml b/flume-bom/pom.xml
index 5e1dec6f3..774b2cab9 100644
--- a/flume-bom/pom.xml
+++ b/flume-bom/pom.xml
@@ -187,6 +187,26 @@
<artifactId>flume-scribe-source</artifactId>
<version>${flume-scribe.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.flume.flume-ng-instrumentation</groupId>
+ <artifactId>flume-ganglia-monitor</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.flume.flume-ng-instrumentation</groupId>
+ <artifactId>flume-http-monitor</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.flume.flume-ng-instrumentation</groupId>
+ <artifactId>flume-prometheus-monitor</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.flume.flume-ng-sources</groupId>
+ <artifactId>flume-http-source</artifactId>
+ <version>${project.version}</version>
+ </dependency>
<dependency>
<groupId>org.apache.flume.flume-ng-sources</groupId>
<artifactId>flume-http-source</artifactId>
diff --git a/flume-ng-core/pom.xml b/flume-ng-core/pom.xml
index 2c1ee40fa..f73b9627b 100644
--- a/flume-ng-core/pom.xml
+++ b/flume-ng-core/pom.xml
@@ -132,8 +132,8 @@
</dependency>
<dependency>
- <groupId>org.eclipse.jetty</groupId>
- <artifactId>jetty-servlet</artifactId>
+ <groupId>org.eclipse.jetty.ee11</groupId>
+ <artifactId>jetty-ee11-servlet</artifactId>
</dependency>
<dependency>
@@ -151,11 +151,6 @@
<artifactId>jetty-jmx</artifactId>
</dependency>
- <dependency>
- <groupId>com.google.code.gson</groupId>
- <artifactId>gson</artifactId>
- </dependency>
-
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
@@ -179,16 +174,6 @@
<artifactId>mina-core</artifactId>
</dependency>
- <dependency>
- <groupId>io.prometheus</groupId>
- <artifactId>simpleclient</artifactId>
- </dependency>
-
- <dependency>
- <groupId>io.prometheus</groupId>
- <artifactId>simpleclient_servlet</artifactId>
- </dependency>
-
</dependencies>
<build>
<plugins>
diff --git
a/flume-ng-core/src/main/java/org/apache/flume/instrumentation/MonitorService.java
b/flume-ng-core/src/main/java/org/apache/flume/instrumentation/MonitorService.java
index d15ff613e..bb1b7d2fe 100644
---
a/flume-ng-core/src/main/java/org/apache/flume/instrumentation/MonitorService.java
+++
b/flume-ng-core/src/main/java/org/apache/flume/instrumentation/MonitorService.java
@@ -26,7 +26,19 @@ import org.apache.flume.conf.Configurable;
*/
public interface MonitorService extends Configurable {
- public void start();
+ void start();
- public void stop();
+ void stop();
+
+ /**
+ * The configuration type name used to select this monitoring service.
+ *
+ * <p>>(Implementations discovered through the {@link
java.util.ServiceLoader} are matched against the
+ * {@code flume.monitoring.type} value using this method.</p>
+ *
+ * @return the type name, or {@code null} if this service is not
selectable by type.
+ */
+ default String getType() {
+ return null;
+ }
}
diff --git
a/flume-ng-core/src/test/java/org/apache/flume/source/TestDefaultSourceFactory.java
b/flume-ng-core/src/test/java/org/apache/flume/source/TestDefaultSourceFactory.java
index 2947b2c3f..df445b6cc 100644
---
a/flume-ng-core/src/test/java/org/apache/flume/source/TestDefaultSourceFactory.java
+++
b/flume-ng-core/src/test/java/org/apache/flume/source/TestDefaultSourceFactory.java
@@ -62,7 +62,6 @@ public class TestDefaultSourceFactory {
verifySourceCreation("netcat-src", "netcat", NetcatSource.class);
verifySourceCreation("netcat-udp-src", "netcatudp",
NetcatUdpSource.class);
verifySourceCreation("exec-src", "exec", ExecSource.class);
- // verifySourceCreation("avro-src", "avro", AvroSource.class);
verifySourceCreation("syslogtcp-src", "syslogtcp",
SyslogTcpSource.class);
verifySourceCreation("multiport_syslogtcp-src", "multiport_syslogtcp",
MultiportSyslogTCPSource.class);
verifySourceCreation("syslogudp-src", "syslogudp",
SyslogUDPSource.class);
diff --git a/flume-ng-dist/pom.xml b/flume-ng-dist/pom.xml
index 92273e741..7bfe27428 100644
--- a/flume-ng-dist/pom.xml
+++ b/flume-ng-dist/pom.xml
@@ -126,6 +126,22 @@
<groupId>org.apache.flume</groupId>
<artifactId>flume-legacy-thrift-source</artifactId>
</dependency>-->
+ <dependency>
+ <groupId>org.apache.flume.flume-ng-instrumentation</groupId>
+ <artifactId>flume-ganglia-monitor</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.flume.flume-ng-instrumentation</groupId>
+ <artifactId>flume-http-monitor</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.flume.flume-ng-instrumentation</groupId>
+ <artifactId>flume-prometheus-monitor</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.flume.flume-ng-sources</groupId>
+ <artifactId>flume-http-source</artifactId>
+ </dependency>
<dependency>
<groupId>org.apache.flume.flume-ng-sources</groupId>
<artifactId>flume-http-source</artifactId>
diff --git a/flume-ng-sources/flume-taildir-source/pom.xml
b/flume-ng-instrumentation/flume-ganglia-monitor/pom.xml
similarity index 69%
copy from flume-ng-sources/flume-taildir-source/pom.xml
copy to flume-ng-instrumentation/flume-ganglia-monitor/pom.xml
index ffbc5a059..3262d4272 100644
--- a/flume-ng-sources/flume-taildir-source/pom.xml
+++ b/flume-ng-instrumentation/flume-ganglia-monitor/pom.xml
@@ -21,19 +21,18 @@
<parent>
<groupId>org.apache.flume</groupId>
- <artifactId>flume-ng-sources</artifactId>
+ <artifactId>flume-ng-instrumentation</artifactId>
<version>2.0.0-SNAPSHOT</version>
</parent>
- <groupId>org.apache.flume.flume-ng-sources</groupId>
- <artifactId>flume-taildir-source</artifactId>
- <name>Flume Taildir Source</name>
+ <groupId>org.apache.flume.flume-ng-instrumentation</groupId>
+ <artifactId>flume-ganglia-monitor</artifactId>
+ <name>Flume Ganglia Monitor</name>
<properties>
- <!-- TODO fix spotbugs/pmd violations -->
- <spotbugs.maxAllowedViolations>24</spotbugs.maxAllowedViolations>
- <pmd.maxAllowedViolations>3</pmd.maxAllowedViolations>
- <module.name>org.apache.flume.source.taildir</module.name>
+ <!-- TODO fix spotbugs violations -->
+ <spotbugs.maxAllowedViolations>13</spotbugs.maxAllowedViolations>
+ <module.name>org.apache.flume.instrumentation.ganglia</module.name>
</properties>
<dependencies>
@@ -41,12 +40,6 @@
<groupId>org.apache.flume</groupId>
<artifactId>flume-ng-sdk</artifactId>
</dependency>
- <dependency>
- <groupId>org.apache.flume</groupId>
- <artifactId>flume-ng-sdk</artifactId>
- <classifier>tests</classifier>
- <scope>test</scope>
- </dependency>
<dependency>
<groupId>org.apache.flume</groupId>
<artifactId>flume-ng-core</artifactId>
@@ -58,12 +51,6 @@
<scope>test</scope>
</dependency>
- <dependency>
- <groupId>org.mockito</groupId>
- <artifactId>mockito-core</artifactId>
- <scope>test</scope>
- </dependency>
-
</dependencies>
</project>
diff --git
a/flume-ng-core/src/main/java/org/apache/flume/instrumentation/GangliaServer.java
b/flume-ng-instrumentation/flume-ganglia-monitor/src/main/java/org/apache/flume/instrumentation/ganglia/GangliaServer.java
similarity index 98%
rename from
flume-ng-core/src/main/java/org/apache/flume/instrumentation/GangliaServer.java
rename to
flume-ng-instrumentation/flume-ganglia-monitor/src/main/java/org/apache/flume/instrumentation/ganglia/GangliaServer.java
index cf9535930..367557eff 100644
---
a/flume-ng-core/src/main/java/org/apache/flume/instrumentation/GangliaServer.java
+++
b/flume-ng-instrumentation/flume-ganglia-monitor/src/main/java/org/apache/flume/instrumentation/ganglia/GangliaServer.java
@@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.flume.instrumentation;
+package org.apache.flume.instrumentation.ganglia;
import java.net.DatagramPacket;
import java.net.DatagramSocket;
@@ -32,6 +32,7 @@ import org.apache.flume.Context;
import org.apache.flume.FlumeException;
import org.apache.flume.api.HostInfo;
import org.apache.flume.conf.ConfigurationException;
+import org.apache.flume.instrumentation.MonitorService;
import org.apache.flume.instrumentation.util.JMXPollUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -134,6 +135,11 @@ public class GangliaServer implements MonitorService {
offset = 0;
}
+ @Override
+ public String getType() {
+ return "ganglia";
+ }
+
/**
* Start this server, causing it to poll JMX at the configured frequency.
*/
diff --git
a/flume-ng-instrumentation/flume-ganglia-monitor/src/main/resources/META-INF/services/org.apache.flume.instrumentation.MonitorService
b/flume-ng-instrumentation/flume-ganglia-monitor/src/main/resources/META-INF/services/org.apache.flume.instrumentation.MonitorService
new file mode 100644
index 000000000..bacdb80df
--- /dev/null
+++
b/flume-ng-instrumentation/flume-ganglia-monitor/src/main/resources/META-INF/services/org.apache.flume.instrumentation.MonitorService
@@ -0,0 +1 @@
+org.apache.flume.instrumentation.ganglia.GangliaServer
diff --git a/flume-ng-sources/flume-taildir-source/pom.xml
b/flume-ng-instrumentation/flume-http-monitor/pom.xml
similarity index 68%
copy from flume-ng-sources/flume-taildir-source/pom.xml
copy to flume-ng-instrumentation/flume-http-monitor/pom.xml
index ffbc5a059..fdb209cb2 100644
--- a/flume-ng-sources/flume-taildir-source/pom.xml
+++ b/flume-ng-instrumentation/flume-http-monitor/pom.xml
@@ -21,19 +21,18 @@
<parent>
<groupId>org.apache.flume</groupId>
- <artifactId>flume-ng-sources</artifactId>
+ <artifactId>flume-ng-instrumentation</artifactId>
<version>2.0.0-SNAPSHOT</version>
</parent>
- <groupId>org.apache.flume.flume-ng-sources</groupId>
- <artifactId>flume-taildir-source</artifactId>
- <name>Flume Taildir Source</name>
+ <groupId>org.apache.flume.flume-ng-instrumentation</groupId>
+ <artifactId>flume-http-monitor</artifactId>
+ <name>Flume HTTP Monitor</name>
<properties>
- <!-- TODO fix spotbugs/pmd violations -->
- <spotbugs.maxAllowedViolations>24</spotbugs.maxAllowedViolations>
- <pmd.maxAllowedViolations>3</pmd.maxAllowedViolations>
- <module.name>org.apache.flume.source.taildir</module.name>
+ <!-- TODO fix spotbugs violations -->
+ <spotbugs.maxAllowedViolations>3</spotbugs.maxAllowedViolations>
+ <module.name>org.apache.flume.instrumentation.http</module.name>
</properties>
<dependencies>
@@ -43,24 +42,32 @@
</dependency>
<dependency>
<groupId>org.apache.flume</groupId>
- <artifactId>flume-ng-sdk</artifactId>
- <classifier>tests</classifier>
- <scope>test</scope>
+ <artifactId>flume-ng-core</artifactId>
</dependency>
+
<dependency>
- <groupId>org.apache.flume</groupId>
- <artifactId>flume-ng-core</artifactId>
+ <groupId>com.google.code.gson</groupId>
+ <artifactId>gson</artifactId>
</dependency>
<dependency>
- <groupId>junit</groupId>
- <artifactId>junit</artifactId>
- <scope>test</scope>
+ <groupId>org.eclipse.jetty</groupId>
+ <artifactId>jetty-server</artifactId>
</dependency>
<dependency>
- <groupId>org.mockito</groupId>
- <artifactId>mockito-core</artifactId>
+ <groupId>org.eclipse.jetty</groupId>
+ <artifactId>jetty-util</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>jakarta.servlet</groupId>
+ <artifactId>jakarta.servlet-api</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
diff --git
a/flume-ng-core/src/main/java/org/apache/flume/instrumentation/http/HTTPMetricsServer.java
b/flume-ng-instrumentation/flume-http-monitor/src/main/java/org/apache/flume/instrumentation/http/HTTPMetricsServer.java
similarity index 66%
rename from
flume-ng-core/src/main/java/org/apache/flume/instrumentation/http/HTTPMetricsServer.java
rename to
flume-ng-instrumentation/flume-http-monitor/src/main/java/org/apache/flume/instrumentation/http/HTTPMetricsServer.java
index 1fefc5ec7..4272de4ea 100644
---
a/flume-ng-core/src/main/java/org/apache/flume/instrumentation/http/HTTPMetricsServer.java
+++
b/flume-ng-instrumentation/flume-http-monitor/src/main/java/org/apache/flume/instrumentation/http/HTTPMetricsServer.java
@@ -18,21 +18,21 @@ package org.apache.flume.instrumentation.http;
import com.google.gson.Gson;
import com.google.gson.reflect.TypeToken;
-import java.io.IOException;
-import java.lang.reflect.Type;
+import jakarta.servlet.http.HttpServletResponse;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
import java.util.Map;
-import javax.servlet.ServletException;
-import javax.servlet.http.HttpServletRequest;
-import javax.servlet.http.HttpServletResponse;
import org.apache.flume.Context;
import org.apache.flume.instrumentation.MonitorService;
import org.apache.flume.instrumentation.util.JMXPollUtil;
+import org.eclipse.jetty.server.Handler;
import org.eclipse.jetty.server.HttpConfiguration;
import org.eclipse.jetty.server.HttpConnectionFactory;
import org.eclipse.jetty.server.Request;
+import org.eclipse.jetty.server.Response;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.ServerConnector;
-import org.eclipse.jetty.server.handler.AbstractHandler;
+import org.eclipse.jetty.util.Callback;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -54,6 +54,11 @@ public class HTTPMetricsServer implements MonitorService {
private static int DEFAULT_PORT = 41414;
public static String CONFIG_PORT = "port";
+ @Override
+ public String getType() {
+ return "http";
+ }
+
@Override
public void start() {
jettyServer = new Server();
@@ -94,45 +99,44 @@ public class HTTPMetricsServer implements MonitorService {
port = context.getInteger(CONFIG_PORT, DEFAULT_PORT);
}
- private class HTTPMetricsHandler extends AbstractHandler {
+ private class HTTPMetricsHandler extends Handler.Abstract {
- Type mapType = new TypeToken<Map<String, Map<String, String>>>()
{}.getType();
+ java.lang.reflect.Type mapType = new TypeToken<Map<String, Map<String,
String>>>() {}.getType();
Gson gson = new Gson();
@Override
- public void handle(String target, Request r1, HttpServletRequest
request, HttpServletResponse response)
- throws IOException, ServletException {
- // /metrics is the only place to pull metrics.
- // If we want to use any other url for something else, we should
make sure
- // that for metrics only /metrics is used to prevent backward
- // compatibility issues.
- if (request.getMethod().equalsIgnoreCase("TRACE")
- || request.getMethod().equalsIgnoreCase("OPTIONS")) {
- response.sendError(HttpServletResponse.SC_FORBIDDEN);
- response.flushBuffer();
- ((Request) request).setHandled(true);
- return;
+ public boolean handle(Request request, Response response, Callback
callback) throws Exception {
+ String method = request.getMethod();
+ String path = Request.getPathInContext(request);
+
+ if (method.equalsIgnoreCase("TRACE") ||
method.equalsIgnoreCase("OPTIONS")) {
+ response.setStatus(HttpServletResponse.SC_FORBIDDEN);
+ callback.succeeded();
+ return true;
}
- if (target.equals("/")) {
- response.setContentType("text/html;charset=utf-8");
+
+ if ("/".equals(path)) {
response.setStatus(HttpServletResponse.SC_OK);
- response.getWriter().write("For Flume metrics please click" +
" <a href = \"./metrics\"> here</a>.");
- response.flushBuffer();
- ((Request) request).setHandled(true);
- return;
- } else if (target.equalsIgnoreCase("/metrics")) {
- response.setContentType("application/json;charset=utf-8");
+ response.getHeaders().put("Content-Type",
"text/html;charset=utf-8");
+ String html = "For Flume metrics please click <a
href=\"./metrics\"> here</a>.";
+ response.write(true,
ByteBuffer.wrap(html.getBytes(StandardCharsets.UTF_8)), callback);
+ return true;
+ }
+
+ if ("/metrics".equalsIgnoreCase(path)) {
response.setStatus(HttpServletResponse.SC_OK);
+ response.getHeaders().put("Content-Type",
"application/json;charset=utf-8");
+
Map<String, Map<String, String>> metricsMap =
JMXPollUtil.getAllMBeans();
String json = gson.toJson(metricsMap, mapType);
- response.getWriter().write(json);
- response.flushBuffer();
- ((Request) request).setHandled(true);
- return;
+
+ response.write(true,
ByteBuffer.wrap(json.getBytes(StandardCharsets.UTF_8)), callback);
+ return true;
}
- response.sendError(HttpServletResponse.SC_NOT_FOUND);
- response.flushBuffer();
- // Not handling the request returns a Not found error page.
+
+ response.setStatus(HttpServletResponse.SC_NOT_FOUND);
+ callback.succeeded();
+ return true;
}
}
}
diff --git
a/flume-ng-instrumentation/flume-http-monitor/src/main/resources/META-INF/services/org.apache.flume.instrumentation.MonitorService
b/flume-ng-instrumentation/flume-http-monitor/src/main/resources/META-INF/services/org.apache.flume.instrumentation.MonitorService
new file mode 100644
index 000000000..94fc8860a
--- /dev/null
+++
b/flume-ng-instrumentation/flume-http-monitor/src/main/resources/META-INF/services/org.apache.flume.instrumentation.MonitorService
@@ -0,0 +1 @@
+org.apache.flume.instrumentation.http.HTTPMetricsServer
diff --git
a/flume-ng-core/src/test/java/org/apache/flume/instrumentation/http/BaseHTTPMetricsTest.java
b/flume-ng-instrumentation/flume-http-monitor/src/test/java/org/apache/flume/instrumentation/http/BaseHTTPMetricsTest.java
similarity index 100%
copy from
flume-ng-core/src/test/java/org/apache/flume/instrumentation/http/BaseHTTPMetricsTest.java
copy to
flume-ng-instrumentation/flume-http-monitor/src/test/java/org/apache/flume/instrumentation/http/BaseHTTPMetricsTest.java
diff --git
a/flume-ng-core/src/test/java/org/apache/flume/instrumentation/http/TestHTTPMetricsServer.java
b/flume-ng-instrumentation/flume-http-monitor/src/test/java/org/apache/flume/instrumentation/http/TestHTTPMetricsServer.java
similarity index 98%
rename from
flume-ng-core/src/test/java/org/apache/flume/instrumentation/http/TestHTTPMetricsServer.java
rename to
flume-ng-instrumentation/flume-http-monitor/src/test/java/org/apache/flume/instrumentation/http/TestHTTPMetricsServer.java
index 3cb5e4bd8..2a603badb 100644
---
a/flume-ng-core/src/test/java/org/apache/flume/instrumentation/http/TestHTTPMetricsServer.java
+++
b/flume-ng-instrumentation/flume-http-monitor/src/test/java/org/apache/flume/instrumentation/http/TestHTTPMetricsServer.java
@@ -18,13 +18,13 @@ package org.apache.flume.instrumentation.http;
import com.google.gson.Gson;
import com.google.gson.reflect.TypeToken;
+import jakarta.servlet.http.HttpServletResponse;
import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.lang.reflect.Type;
import java.net.HttpURLConnection;
import java.net.URL;
import java.util.Map;
-import javax.servlet.http.HttpServletResponse;
import org.apache.flume.Context;
import org.apache.flume.instrumentation.MonitorService;
import org.apache.flume.instrumentation.util.JMXTestUtils;
diff --git
a/flume-ng-core/src/main/java/org/apache/flume/instrumentation/MonitoringType.java
b/flume-ng-instrumentation/flume-http-monitor/src/test/java/org/apache/flume/instrumentation/util/JMXTestUtils.java
similarity index 53%
rename from
flume-ng-core/src/main/java/org/apache/flume/instrumentation/MonitoringType.java
rename to
flume-ng-instrumentation/flume-http-monitor/src/test/java/org/apache/flume/instrumentation/util/JMXTestUtils.java
index c0a5dad68..454473431 100644
---
a/flume-ng-core/src/main/java/org/apache/flume/instrumentation/MonitoringType.java
+++
b/flume-ng-instrumentation/flume-http-monitor/src/test/java/org/apache/flume/instrumentation/util/JMXTestUtils.java
@@ -14,24 +14,23 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.flume.instrumentation;
+package org.apache.flume.instrumentation.util;
+
+import java.util.Map;
+import org.junit.Assert;
/**
- * Enum for Monitoring types.
+ *
*/
-public enum MonitoringType {
- OTHER(null),
- GANGLIA(org.apache.flume.instrumentation.GangliaServer.class),
- HTTP(org.apache.flume.instrumentation.http.HTTPMetricsServer.class),
-
PROMETHEUS(org.apache.flume.instrumentation.http.PrometheusHTTPMetricsServer.class);
-
- private Class<? extends MonitorService> monitoringClass;
-
- private MonitoringType(Class<? extends MonitorService> klass) {
- this.monitoringClass = klass;
- }
+public class JMXTestUtils {
- public Class<? extends MonitorService> getMonitorClass() {
- return this.monitoringClass;
+ public static void checkChannelCounterParams(Map<String, String> attrs) {
+ Assert.assertNotNull(attrs.get("StartTime"));
+ Assert.assertNotNull(attrs.get("StopTime"));
+ Assert.assertTrue(Long.parseLong(attrs.get("ChannelSize")) != 0);
+ Assert.assertTrue(Long.parseLong(attrs.get("EventPutAttemptCount")) ==
2);
+ Assert.assertTrue(Long.parseLong(attrs.get("EventTakeAttemptCount"))
== 1);
+ Assert.assertTrue(Long.parseLong(attrs.get("EventPutSuccessCount")) ==
2);
+ Assert.assertTrue(Long.parseLong(attrs.get("EventTakeSuccessCount"))
== 1);
}
}
diff --git a/flume-ng-sources/flume-taildir-source/pom.xml
b/flume-ng-instrumentation/flume-prometheus-monitor/pom.xml
similarity index 62%
copy from flume-ng-sources/flume-taildir-source/pom.xml
copy to flume-ng-instrumentation/flume-prometheus-monitor/pom.xml
index ffbc5a059..ed7a8014a 100644
--- a/flume-ng-sources/flume-taildir-source/pom.xml
+++ b/flume-ng-instrumentation/flume-prometheus-monitor/pom.xml
@@ -21,19 +21,19 @@
<parent>
<groupId>org.apache.flume</groupId>
- <artifactId>flume-ng-sources</artifactId>
+ <artifactId>flume-ng-instrumentation</artifactId>
<version>2.0.0-SNAPSHOT</version>
</parent>
- <groupId>org.apache.flume.flume-ng-sources</groupId>
- <artifactId>flume-taildir-source</artifactId>
- <name>Flume Taildir Source</name>
+ <groupId>org.apache.flume.flume-ng-instrumentation</groupId>
+ <artifactId>flume-prometheus-monitor</artifactId>
+ <name>Flume Prometheus Monitor</name>
<properties>
<!-- TODO fix spotbugs/pmd violations -->
- <spotbugs.maxAllowedViolations>24</spotbugs.maxAllowedViolations>
- <pmd.maxAllowedViolations>3</pmd.maxAllowedViolations>
- <module.name>org.apache.flume.source.taildir</module.name>
+ <spotbugs.maxAllowedViolations>6</spotbugs.maxAllowedViolations>
+ <pmd.maxAllowedViolations>1</pmd.maxAllowedViolations>
+ <module.name>org.apache.flume.instrumentation.prometheus</module.name>
</properties>
<dependencies>
@@ -43,24 +43,37 @@
</dependency>
<dependency>
<groupId>org.apache.flume</groupId>
- <artifactId>flume-ng-sdk</artifactId>
- <classifier>tests</classifier>
- <scope>test</scope>
+ <artifactId>flume-ng-core</artifactId>
</dependency>
<dependency>
- <groupId>org.apache.flume</groupId>
- <artifactId>flume-ng-core</artifactId>
+ <groupId>org.apache.flume.flume-ng-instrumentation</groupId>
+ <artifactId>flume-http-monitor</artifactId>
+ <version>${project.version}</version>
</dependency>
<dependency>
- <groupId>junit</groupId>
- <artifactId>junit</artifactId>
- <scope>test</scope>
+ <groupId>io.prometheus</groupId>
+ <artifactId>prometheus-metrics-core</artifactId>
</dependency>
<dependency>
- <groupId>org.mockito</groupId>
- <artifactId>mockito-core</artifactId>
+ <groupId>io.prometheus</groupId>
+ <artifactId>prometheus-metrics-exporter-servlet-jakarta</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.eclipse.jetty</groupId>
+ <artifactId>jetty-server</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.eclipse.jetty.ee11</groupId>
+ <artifactId>jetty-ee11-servlet</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
diff --git
a/flume-ng-core/src/main/java/org/apache/flume/instrumentation/http/PrometheusHTTPMetricsServer.java
b/flume-ng-instrumentation/flume-prometheus-monitor/src/main/java/org/apache/flume/instrumentation/prometheus/PrometheusHTTPMetricsServer.java
similarity index 51%
rename from
flume-ng-core/src/main/java/org/apache/flume/instrumentation/http/PrometheusHTTPMetricsServer.java
rename to
flume-ng-instrumentation/flume-prometheus-monitor/src/main/java/org/apache/flume/instrumentation/prometheus/PrometheusHTTPMetricsServer.java
index 3512050eb..b243bee6a 100644
---
a/flume-ng-core/src/main/java/org/apache/flume/instrumentation/http/PrometheusHTTPMetricsServer.java
+++
b/flume-ng-instrumentation/flume-prometheus-monitor/src/main/java/org/apache/flume/instrumentation/prometheus/PrometheusHTTPMetricsServer.java
@@ -14,19 +14,17 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.flume.instrumentation.http;
+package org.apache.flume.instrumentation.prometheus;
import com.google.common.base.Throwables;
-import io.prometheus.client.Collector;
-import io.prometheus.client.CounterMetricFamily;
-import io.prometheus.client.GaugeMetricFamily;
-import io.prometheus.client.exporter.MetricsServlet;
+import io.prometheus.metrics.core.metrics.Counter;
+import io.prometheus.metrics.core.metrics.Gauge;
+import io.prometheus.metrics.exporter.servlet.jakarta.PrometheusMetricsServlet;
+import io.prometheus.metrics.model.registry.PrometheusRegistry;
import java.lang.management.ManagementFactory;
import java.lang.reflect.Method;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.HashMap;
-import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
@@ -39,39 +37,41 @@ import javax.management.MBeanServer;
import javax.management.ObjectInstance;
import javax.management.ObjectName;
import javax.management.ReflectionException;
-import org.apache.flume.instrumentation.MonitorService;
+import org.apache.flume.instrumentation.http.HTTPMetricsServer;
+import org.eclipse.jetty.ee11.servlet.ServletContextHandler;
+import org.eclipse.jetty.ee11.servlet.ServletHolder;
import org.eclipse.jetty.server.HttpConfiguration;
import org.eclipse.jetty.server.HttpConnectionFactory;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.ServerConnector;
-import org.eclipse.jetty.servlet.ServletContextHandler;
-import org.eclipse.jetty.servlet.ServletHolder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* A Monitor service implementation that runs a web server on a configurable
- * port and returns the metrics for components in JSON format. <p> Optional
+ * port and returns the metrics for components in Prometheus format. <p>
Optional
* parameters: <p> <tt>port</tt> : The port on which the server should listen
- * to.<p> Returns metrics in the following format: <p>
- *
- * {<p> "componentName1":{"metric1" : "metricValue1","metric2":"metricValue2"}
- * <p> "componentName1":{"metric3" : "metricValue3","metric4":"metricValue4"}
- * <p> }
+ * to.<p> Returns metrics in Prometheus text format via /metrics endpoint
*/
-public class PrometheusHTTPMetricsServer extends HTTPMetricsServer implements
MonitorService {
+public class PrometheusHTTPMetricsServer extends HTTPMetricsServer {
private static final String PROM_DEFAULT_PREFIX = "Flume_";
private Server jettyServer;
private static Logger LOG =
LoggerFactory.getLogger(PrometheusHTTPMetricsServer.class);
private static MBeanServer mbeanServer =
ManagementFactory.getPlatformMBeanServer();
- private FlumePrometheusCollector requests;
+ private FlumePrometheusCollector metricsCollector;
+
+ @Override
+ public String getType() {
+ return "prometheus";
+ }
@Override
public void start() {
- requests = new FlumePrometheusCollector().register();
+ metricsCollector = new FlumePrometheusCollector();
+ metricsCollector.register();
jettyServer = new Server();
// We can use Contexts etc if we have many urls to handle. For one url,
@@ -84,24 +84,27 @@ public class PrometheusHTTPMetricsServer extends
HTTPMetricsServer implements Mo
ServletContextHandler context = new ServletContextHandler();
context.setContextPath("/");
jettyServer.setHandler(context);
- context.addServlet(new ServletHolder(new MetricsServlet()),
"/metrics");
+ context.addServlet(new ServletHolder(new PrometheusMetricsServlet()),
"/metrics");
try {
jettyServer.start();
while (!jettyServer.isStarted()) {
Thread.sleep(500);
}
} catch (Exception ex) {
- LOG.error("Error starting Jetty. JSON Metrics may not be
available.", ex);
+ LOG.error("Error starting Jetty. Prometheus Metrics may not be
available.", ex);
}
}
- class FlumePrometheusCollector extends Collector {
-
- public List<MetricFamilySamples> collect() {
+ class FlumePrometheusCollector {
+ private final Map<String, Counter> counters = new HashMap<>();
+ private final Map<String, Gauge> gauges = new HashMap<>();
+ private final PrometheusRegistry registry =
PrometheusRegistry.defaultRegistry;
- Map<Object, Map<String, MetricFamilySamples>> counterMetricMap =
new HashMap<>();
- List<Collector.MetricFamilySamples> mfs = new ArrayList<>();
+ public void register() {
+ collectMetrics();
+ }
+ private void collectMetrics() {
Set<ObjectInstance> queryMBeans;
try {
queryMBeans = mbeanServer.queryMBeans(null, null);
@@ -109,57 +112,44 @@ public class PrometheusHTTPMetricsServer extends
HTTPMetricsServer implements Mo
for (ObjectInstance obj : queryMBeans) {
try {
if
(obj.getObjectName().toString().startsWith("org.apache.flume")) {
- processFlumeMetric(counterMetricMap, mfs, obj);
+ processFlumeMetric(obj);
} else if
((obj.getObjectName().toString().startsWith("kafka.consumer")
||
obj.getObjectName().toString().startsWith("kafka.producer"))
&&
obj.getObjectName().toString().contains("metrics")) {
- processKafkaMetric(counterMetricMap, mfs, obj);
+ processKafkaMetric(obj);
}
} catch (Exception e) {
LOG.error("Unable to poll JMX for metrics.", e);
}
}
- return mfs;
} catch (Exception ex) {
LOG.error("Could not get Mbeans for monitoring", ex);
Throwables.propagate(ex);
- return null;
}
}
- private void processFlumeMetric(
- Map<Object, Map<String, MetricFamilySamples>> counterMetricMap,
- List<MetricFamilySamples> mfs,
- ObjectInstance obj)
+ private void processFlumeMetric(ObjectInstance obj)
throws ClassNotFoundException, InstanceNotFoundException,
IntrospectionException, ReflectionException {
- Class mbeanClass = Class.forName(obj.getClassName());
- Map<String, MetricFamilySamples> metricsMap;
-
- if (!counterMetricMap.containsKey(mbeanClass)) {
- metricsMap = new HashMap<>();
-
- for (Method method : mbeanClass.getMethods()) {
- String methodName = method.getName();
- if (methodName.startsWith("increment") &&
methodName.length() > "increment".length()) {
- String counterName = PROM_DEFAULT_PREFIX +
methodName.substring("increment".length());
- createCounterIfNotExists(mfs, metricsMap, counterName);
- } else if (methodName.startsWith("addTo")) {
- String counterName = PROM_DEFAULT_PREFIX +
methodName.substring("addTo".length());
- createCounterIfNotExists(mfs, metricsMap, counterName);
- } else if (methodName.startsWith("set")) {
- String counterName = PROM_DEFAULT_PREFIX +
methodName.substring("set".length());
- createGaugeIfNotExists(mfs, metricsMap, counterName,
Arrays.asList("component"));
- }
+ Class<?> mbeanClass = Class.forName(obj.getClassName());
+
+ // First pass: create counters and gauges based on method names
+ for (Method method : mbeanClass.getMethods()) {
+ String methodName = method.getName();
+ if (methodName.startsWith("increment") && methodName.length()
> "increment".length()) {
+ String counterName = PROM_DEFAULT_PREFIX +
methodName.substring("increment".length());
+ createCounterIfNotExists(counterName);
+ } else if (methodName.startsWith("addTo")) {
+ String counterName = PROM_DEFAULT_PREFIX +
methodName.substring("addTo".length());
+ createCounterIfNotExists(counterName);
+ } else if (methodName.startsWith("set")) {
+ String gaugeName = PROM_DEFAULT_PREFIX +
methodName.substring("set".length());
+ createGaugeIfNotExists(gaugeName);
}
-
- counterMetricMap.put(mbeanClass, metricsMap);
-
- } else {
- metricsMap = counterMetricMap.get(mbeanClass);
}
+ // Second pass: get attribute values and update metrics
MBeanAttributeInfo[] attrs =
mbeanServer.getMBeanInfo(obj.getObjectName()).getAttributes();
String[] strAtts = new String[attrs.length];
@@ -174,26 +164,25 @@ public class PrometheusHTTPMetricsServer extends
HTTPMetricsServer implements Mo
for (Object attr : attrList) {
Attribute localAttr = (Attribute) attr;
if (!localAttr.getName().equalsIgnoreCase("type")) {
- MetricFamilySamples samples =
metricsMap.get(PROM_DEFAULT_PREFIX + localAttr.getName());
- if (samples instanceof CounterMetricFamily) {
- ((CounterMetricFamily) samples)
- .addMetric(
- Arrays.asList(component),
-
Double.valueOf(localAttr.getValue().toString()));
- } else if (samples instanceof GaugeMetricFamily) {
- ((GaugeMetricFamily) samples)
- .addMetric(
- Arrays.asList(component),
-
Double.valueOf(localAttr.getValue().toString()));
+ String metricName = PROM_DEFAULT_PREFIX +
localAttr.getName();
+ double value =
Double.parseDouble(localAttr.getValue().toString());
+
+ Counter counter = counters.get(metricName);
+ if (counter != null) {
+ // For counters, we label by component
+ counter.labelValues(component).inc(value);
+ }
+
+ Gauge gauge = gauges.get(metricName);
+ if (gauge != null) {
+ // For gauges, we label by component
+ gauge.labelValues(component).set(value);
}
}
}
}
- private void processKafkaMetric(
- Map<Object, Map<String, MetricFamilySamples>> counterMetricMap,
- List<MetricFamilySamples> mfs,
- ObjectInstance obj)
+ private void processKafkaMetric(ObjectInstance obj)
throws InstanceNotFoundException, IntrospectionException,
ReflectionException {
ObjectName objectName = obj.getObjectName();
@@ -205,14 +194,9 @@ public class PrometheusHTTPMetricsServer extends
HTTPMetricsServer implements Mo
makeStringPromSafe(key),
objectName.getKeyPropertyList().get(key));
}
- // We create a unique name for the metric based on the metric that
came from Kafka, plus
- // all of the properties. Unfortunately Kafka does not have unique
metric names and therefore
- // you can end up with metrics with differing property lists
(which you can't have.
String metricKey = qualifiedType + "_" + String.join("_",
properties.keySet()) + "_";
- Map<String, MetricFamilySamples> metricsMap;
-
- // Get the attribute list now as we'll need it to create the gauge
+ // Get the attribute list now as we'll need it to create gauges
MBeanAttributeInfo[] attrs =
mbeanServer.getMBeanInfo(obj.getObjectName()).getAttributes();
String[] strAtts = new String[attrs.length];
@@ -220,22 +204,10 @@ public class PrometheusHTTPMetricsServer extends
HTTPMetricsServer implements Mo
strAtts[i] = attrs[i].getName();
}
- // We pre-create each metric (once) before populating it once for
each matching mbean
- if (!counterMetricMap.containsKey(metricKey)) {
- metricsMap = new HashMap<>();
-
- for (String attr : strAtts) {
- createGaugeIfNotExists(
- mfs,
- metricsMap,
- metricKey + "_" + makeStringPromSafe(attr),
- new ArrayList<>(properties.keySet()));
- }
-
- counterMetricMap.put(metricKey, metricsMap);
-
- } else {
- metricsMap = counterMetricMap.get(metricKey);
+ // Pre-create each metric (once) before populating it
+ for (String attr : strAtts) {
+ String gaugeName = metricKey + "_" + makeStringPromSafe(attr);
+ createGaugeIfNotExists(gaugeName);
}
AttributeList attrList =
mbeanServer.getAttributes(obj.getObjectName(), strAtts);
@@ -244,42 +216,43 @@ public class PrometheusHTTPMetricsServer extends
HTTPMetricsServer implements Mo
Attribute localAttr = (Attribute) attr;
try {
-
- GaugeMetricFamily samples = (GaugeMetricFamily)
- metricsMap.get(metricKey + "_" +
makeStringPromSafe(localAttr.getName()));
- samples.addMetric(
- new ArrayList<>(properties.values()),
- Double.valueOf(localAttr.getValue().toString()));
+ String gaugeName = metricKey + "_" +
makeStringPromSafe(localAttr.getName());
+ Gauge gauge = gauges.get(gaugeName);
+ if (gauge != null) {
+ double value =
Double.parseDouble(localAttr.getValue().toString());
+ gauge.labelValues(new
ArrayList<>(properties.values()).toArray(new String[0]))
+ .set(value);
+ }
} catch (Exception e) {
LOG.warn("Metric {} could not be monitored", metricKey, e);
}
}
}
- // Prometeus is really unhappy with metrics with , or - in, so replace
them
+ // Prometheus is really unhappy with metrics with , or - in, so
replace them
private String makeStringPromSafe(String input) {
return input.replaceAll("[.\\-]", "");
}
- private void createCounterIfNotExists(
- List<MetricFamilySamples> mfs, Map<String,
MetricFamilySamples> metricsMap, String counterName) {
- if (!metricsMap.containsKey(counterName)) {
- CounterMetricFamily labeledCounter =
- new CounterMetricFamily(counterName, counterName,
Arrays.asList("component"));
- metricsMap.put(counterName, labeledCounter);
- mfs.add(labeledCounter);
+ private void createCounterIfNotExists(String counterName) {
+ if (!counters.containsKey(counterName)) {
+ Counter counter = Counter.builder()
+ .name(counterName)
+ .help(counterName)
+ .labelNames("component")
+ .register();
+ counters.put(counterName, counter);
}
}
- private void createGaugeIfNotExists(
- List<MetricFamilySamples> mfs,
- Map<String, MetricFamilySamples> metricsMap,
- String gaugeName,
- List<String> labelNames) {
- if (!metricsMap.containsKey(gaugeName)) {
- GaugeMetricFamily labelledGauge = new
GaugeMetricFamily(gaugeName, gaugeName, labelNames);
- metricsMap.put(gaugeName, labelledGauge);
- mfs.add(labelledGauge);
+ private void createGaugeIfNotExists(String gaugeName) {
+ if (!gauges.containsKey(gaugeName)) {
+ Gauge gauge = Gauge.builder()
+ .name(gaugeName)
+ .help(gaugeName)
+ .labelNames("component")
+ .register();
+ gauges.put(gaugeName, gauge);
}
}
}
diff --git
a/flume-ng-instrumentation/flume-prometheus-monitor/src/main/resources/META-INF/services/org.apache.flume.instrumentation.MonitorService
b/flume-ng-instrumentation/flume-prometheus-monitor/src/main/resources/META-INF/services/org.apache.flume.instrumentation.MonitorService
new file mode 100644
index 000000000..042e10826
--- /dev/null
+++
b/flume-ng-instrumentation/flume-prometheus-monitor/src/main/resources/META-INF/services/org.apache.flume.instrumentation.MonitorService
@@ -0,0 +1 @@
+org.apache.flume.instrumentation.prometheus.PrometheusHTTPMetricsServer
diff --git
a/flume-ng-core/src/test/java/org/apache/flume/instrumentation/http/BaseHTTPMetricsTest.java
b/flume-ng-instrumentation/flume-prometheus-monitor/src/test/java/org/apache/flume/instrumentation/prometheus/BaseHTTPMetricsTest.java
similarity index 98%
rename from
flume-ng-core/src/test/java/org/apache/flume/instrumentation/http/BaseHTTPMetricsTest.java
rename to
flume-ng-instrumentation/flume-prometheus-monitor/src/test/java/org/apache/flume/instrumentation/prometheus/BaseHTTPMetricsTest.java
index 7ae7f36ac..1ac5812f6 100644
---
a/flume-ng-core/src/test/java/org/apache/flume/instrumentation/http/BaseHTTPMetricsTest.java
+++
b/flume-ng-instrumentation/flume-prometheus-monitor/src/test/java/org/apache/flume/instrumentation/prometheus/BaseHTTPMetricsTest.java
@@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.flume.instrumentation.http;
+package org.apache.flume.instrumentation.prometheus;
import java.net.ServerSocket;
import org.apache.flume.Channel;
diff --git
a/flume-ng-core/src/test/java/org/apache/flume/instrumentation/http/TestPrometheusMetricsServer.java
b/flume-ng-instrumentation/flume-prometheus-monitor/src/test/java/org/apache/flume/instrumentation/prometheus/TestPrometheusMetricsServer.java
similarity index 85%
rename from
flume-ng-core/src/test/java/org/apache/flume/instrumentation/http/TestPrometheusMetricsServer.java
rename to
flume-ng-instrumentation/flume-prometheus-monitor/src/test/java/org/apache/flume/instrumentation/prometheus/TestPrometheusMetricsServer.java
index f6b78b9eb..671444e91 100644
---
a/flume-ng-core/src/test/java/org/apache/flume/instrumentation/http/TestPrometheusMetricsServer.java
+++
b/flume-ng-instrumentation/flume-prometheus-monitor/src/test/java/org/apache/flume/instrumentation/prometheus/TestPrometheusMetricsServer.java
@@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.flume.instrumentation.http;
+package org.apache.flume.instrumentation.prometheus;
import java.io.BufferedReader;
import java.io.InputStreamReader;
@@ -54,18 +54,18 @@ public class TestPrometheusMetricsServer extends
BaseHTTPMetricsTest {
}
reader.close();
String[] targetOutputs = {
- "ChannelSize{component=\"pmemChannel\",} 1.0\n",
- "Flume_ChannelSize{component=\"memChannel\",} 1.0\n",
- "Flume_ChannelCapacity{component=\"pmemChannel\",} 0.0\n",
- "Flume_ChannelCapacity{component=\"memChannel\",} 100.0\n",
- "Flume_EventPutAttemptCount_total{component=\"pmemChannel\",}
2.0\n",
- "Flume_EventPutAttemptCount_total{component=\"memChannel\",}
2.0\n",
- "Flume_EventTakeAttemptCount_total{component=\"pmemChannel\",}
1.0\n",
- "Flume_EventTakeAttemptCount_total{component=\"memChannel\",}
1.0\n",
- "Flume_EventPutSuccessCount_total{component=\"pmemChannel\",}
2.0\n",
- "Flume_EventPutSuccessCount_total{component=\"memChannel\",}
2.0\n",
- "Flume_EventTakeSuccessCount_total{component=\"pmemChannel\",}
1.0\n",
- "Flume_EventTakeSuccessCount_total{component=\"memChannel\",}
1.0\n"
+ "Flume_ChannelSize{component=\"pmemChannel\"} 1.0\n",
+ "Flume_ChannelSize{component=\"memChannel\"} 1.0\n",
+ "Flume_ChannelCapacity{component=\"pmemChannel\"} 0.0\n",
+ "Flume_ChannelCapacity{component=\"memChannel\"} 100.0\n",
+ "Flume_EventPutAttemptCount_total{component=\"pmemChannel\"}
2.0\n",
+ "Flume_EventPutAttemptCount_total{component=\"memChannel\"} 2.0\n",
+ "Flume_EventTakeAttemptCount_total{component=\"pmemChannel\"}
1.0\n",
+ "Flume_EventTakeAttemptCount_total{component=\"memChannel\"}
1.0\n",
+ "Flume_EventPutSuccessCount_total{component=\"pmemChannel\"}
2.0\n",
+ "Flume_EventPutSuccessCount_total{component=\"memChannel\"} 2.0\n",
+ "Flume_EventTakeSuccessCount_total{component=\"pmemChannel\"}
1.0\n",
+ "Flume_EventTakeSuccessCount_total{component=\"memChannel\"} 1.0\n"
};
for (String target : targetOutputs) {
diff --git a/flume-ng-sources/flume-taildir-source/pom.xml
b/flume-ng-instrumentation/pom.xml
similarity index 50%
copy from flume-ng-sources/flume-taildir-source/pom.xml
copy to flume-ng-instrumentation/pom.xml
index ffbc5a059..e664afbbb 100644
--- a/flume-ng-sources/flume-taildir-source/pom.xml
+++ b/flume-ng-instrumentation/pom.xml
@@ -21,49 +21,20 @@
<parent>
<groupId>org.apache.flume</groupId>
- <artifactId>flume-ng-sources</artifactId>
+ <artifactId>flume-parent</artifactId>
<version>2.0.0-SNAPSHOT</version>
+ <relativePath>../flume-parent/pom.xml</relativePath>
</parent>
- <groupId>org.apache.flume.flume-ng-sources</groupId>
- <artifactId>flume-taildir-source</artifactId>
- <name>Flume Taildir Source</name>
+ <groupId>org.apache.flume</groupId>
+ <artifactId>flume-ng-instrumentation</artifactId>
+ <packaging>pom</packaging>
+ <name>Flume Instrumentation</name>
- <properties>
- <!-- TODO fix spotbugs/pmd violations -->
- <spotbugs.maxAllowedViolations>24</spotbugs.maxAllowedViolations>
- <pmd.maxAllowedViolations>3</pmd.maxAllowedViolations>
- <module.name>org.apache.flume.source.taildir</module.name>
- </properties>
-
- <dependencies>
- <dependency>
- <groupId>org.apache.flume</groupId>
- <artifactId>flume-ng-sdk</artifactId>
- </dependency>
- <dependency>
- <groupId>org.apache.flume</groupId>
- <artifactId>flume-ng-sdk</artifactId>
- <classifier>tests</classifier>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>org.apache.flume</groupId>
- <artifactId>flume-ng-core</artifactId>
- </dependency>
-
- <dependency>
- <groupId>junit</groupId>
- <artifactId>junit</artifactId>
- <scope>test</scope>
- </dependency>
-
- <dependency>
- <groupId>org.mockito</groupId>
- <artifactId>mockito-core</artifactId>
- <scope>test</scope>
- </dependency>
-
- </dependencies>
+ <modules>
+ <module>flume-ganglia-monitor</module>
+ <module>flume-http-monitor</module>
+ <module>flume-prometheus-monitor</module>
+ </modules>
</project>
diff --git a/flume-ng-node/src/main/java/org/apache/flume/node/Application.java
b/flume-ng-node/src/main/java/org/apache/flume/node/Application.java
index 158297b4a..eeb5c2f01 100644
--- a/flume-ng-node/src/main/java/org/apache/flume/node/Application.java
+++ b/flume-ng-node/src/main/java/org/apache/flume/node/Application.java
@@ -32,6 +32,7 @@ import java.util.List;
import java.util.Locale;
import java.util.Map.Entry;
import java.util.Properties;
+import java.util.ServiceLoader;
import java.util.Set;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.commons.cli.CommandLine;
@@ -49,7 +50,6 @@ import org.apache.flume.SinkRunner;
import org.apache.flume.Source;
import org.apache.flume.SourceRunner;
import org.apache.flume.instrumentation.MonitorService;
-import org.apache.flume.instrumentation.MonitoringType;
import org.apache.flume.lifecycle.LifecycleAware;
import org.apache.flume.lifecycle.LifecycleState;
import org.apache.flume.lifecycle.LifecycleSupervisor;
@@ -253,23 +253,13 @@ public class Application {
this.loadMonitoring();
}
- @SuppressWarnings("unchecked")
private void loadMonitoring() {
Properties systemProps = System.getProperties();
Set<String> keys = systemProps.stringPropertyNames();
try {
if (keys.contains(CONF_MONITOR_CLASS)) {
String monitorType =
systemProps.getProperty(CONF_MONITOR_CLASS);
- Class<? extends MonitorService> klass;
- try {
- // Is it a known type?
- klass =
MonitoringType.valueOf(monitorType.toUpperCase(Locale.ENGLISH))
- .getMonitorClass();
- } catch (Exception e) {
- // Not a known type, use FQCN
- klass = (Class<? extends MonitorService>)
Class.forName(monitorType);
- }
- this.monitorServer = klass.getConstructor().newInstance();
+ this.monitorServer = createMonitorService(monitorType);
Context context = new Context();
for (String key : keys) {
if (key.startsWith(CONF_MONITOR_PREFIX)) {
@@ -284,6 +274,23 @@ public class Application {
}
}
+ /**
+ * Resolves the configured monitoring type to a {@link MonitorService}.
+ *
+ * <p>The type is first matched (case-insensitively) against the {@link
MonitorService#getType()} of the providers
+ * registered through the {@link ServiceLoader}; if none matches, it is
treated as a fully qualified class name.</p>
+ */
+ private MonitorService createMonitorService(String monitorType) throws
ReflectiveOperationException {
+ for (MonitorService service : ServiceLoader.load(MonitorService.class,
Application.class.getClassLoader())) {
+ if (monitorType.equalsIgnoreCase(service.getType())) {
+ return service;
+ }
+ }
+ // Not a known type, use the fully qualified class name.
+ Class<? extends MonitorService> klass =
Class.forName(monitorType).asSubclass(MonitorService.class);
+ return klass.getConstructor().newInstance();
+ }
+
public static void main(String[] args) {
Properties initProps = loadConfigOpts();
diff --git
a/flume-ng-node/src/test/java/org/apache/flume/node/TestHttpConfigurationSource.java
b/flume-ng-node/src/test/java/org/apache/flume/node/TestHttpConfigurationSource.java
index 39c348649..058115fe6 100644
---
a/flume-ng-node/src/test/java/org/apache/flume/node/TestHttpConfigurationSource.java
+++
b/flume-ng-node/src/test/java/org/apache/flume/node/TestHttpConfigurationSource.java
@@ -16,6 +16,9 @@
*/
package org.apache.flume.node;
+import jakarta.servlet.ServletException;
+import jakarta.servlet.http.HttpServletRequest;
+import jakarta.servlet.http.HttpServletResponse;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
@@ -24,18 +27,15 @@ import java.nio.file.Files;
import java.util.Base64;
import java.util.Enumeration;
import java.util.Properties;
-import javax.servlet.ServletException;
-import javax.servlet.http.HttpServletRequest;
-import javax.servlet.http.HttpServletResponse;
import org.apache.flume.conf.ConfigurationException;
import org.apache.flume.node.net.AuthorizationProvider;
import org.apache.flume.node.net.BasicAuthorizationProvider;
+import org.eclipse.jetty.ee11.servlet.DefaultServlet;
+import org.eclipse.jetty.ee11.servlet.ServletContextHandler;
+import org.eclipse.jetty.ee11.servlet.ServletHolder;
import org.eclipse.jetty.http.HttpHeader;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.ServerConnector;
-import org.eclipse.jetty.servlet.DefaultServlet;
-import org.eclipse.jetty.servlet.ServletContextHandler;
-import org.eclipse.jetty.servlet.ServletHolder;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
diff --git a/flume-ng-sources/flume-http-source/pom.xml
b/flume-ng-sources/flume-http-source/pom.xml
index 75df4f28f..1329f7a21 100644
--- a/flume-ng-sources/flume-http-source/pom.xml
+++ b/flume-ng-sources/flume-http-source/pom.xml
@@ -68,8 +68,13 @@
</dependency>
<dependency>
- <groupId>org.eclipse.jetty</groupId>
- <artifactId>jetty-servlet</artifactId>
+ <groupId>org.eclipse.jetty.ee11</groupId>
+ <artifactId>jetty-ee11-servlet</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>jakarta.servlet</groupId>
+ <artifactId>jakarta.servlet-api</artifactId>
</dependency>
<dependency>
@@ -98,6 +103,18 @@
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.bouncycastle</groupId>
+ <artifactId>bcpkix-jdk18on</artifactId>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.bouncycastle</groupId>
+ <artifactId>bcprov-jdk18on</artifactId>
+ <scope>test</scope>
+ </dependency>
+
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
diff --git
a/flume-ng-sources/flume-http-source/src/main/java/org/apache/flume/source/http/BLOBHandler.java
b/flume-ng-sources/flume-http-source/src/main/java/org/apache/flume/source/http/BLOBHandler.java
index 17bd2b3da..69f02e983 100644
---
a/flume-ng-sources/flume-http-source/src/main/java/org/apache/flume/source/http/BLOBHandler.java
+++
b/flume-ng-sources/flume-http-source/src/main/java/org/apache/flume/source/http/BLOBHandler.java
@@ -17,12 +17,12 @@
package org.apache.flume.source.http;
import com.google.common.base.Preconditions;
+import jakarta.servlet.http.HttpServletRequest;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import javax.servlet.http.HttpServletRequest;
import org.apache.commons.io.IOUtils;
import org.apache.commons.io.output.ByteArrayOutputStream;
import org.apache.flume.Context;
diff --git
a/flume-ng-sources/flume-http-source/src/main/java/org/apache/flume/source/http/HTTPServerConstraintUtil.java
b/flume-ng-sources/flume-http-source/src/main/java/org/apache/flume/source/http/HTTPServerConstraintUtil.java
index cf1a3932d..5ec4a82b7 100644
---
a/flume-ng-sources/flume-http-source/src/main/java/org/apache/flume/source/http/HTTPServerConstraintUtil.java
+++
b/flume-ng-sources/flume-http-source/src/main/java/org/apache/flume/source/http/HTTPServerConstraintUtil.java
@@ -16,11 +16,9 @@
*/
package org.apache.flume.source.http;
-import org.eclipse.jetty.security.ConstraintMapping;
-import org.eclipse.jetty.security.ConstraintSecurityHandler;
-import org.eclipse.jetty.util.security.Constraint;
-
-// Most of the code in this class is copied from HBASE-10473
+import org.eclipse.jetty.ee11.servlet.security.ConstraintMapping;
+import org.eclipse.jetty.ee11.servlet.security.ConstraintSecurityHandler;
+import org.eclipse.jetty.security.Constraint;
/**
* Utility class to define constraints on Jetty HTTP servers
@@ -34,22 +32,24 @@ class HTTPServerConstraintUtil {
* @return ConstraintSecurityHandler for use with Jetty servlet
*/
static ConstraintSecurityHandler enforceConstraints() {
- Constraint c = new Constraint();
- c.setAuthenticate(true);
+ // 1. Create a constraint that denies TRACE and OPTIONS access
+ Constraint constraint = Constraint.from("Deny Methods",
Constraint.Authorization.FORBIDDEN);
- ConstraintMapping cmt = new ConstraintMapping();
- cmt.setConstraint(c);
- cmt.setMethod("TRACE");
- cmt.setPathSpec("/*");
+ // 2. Map the constraint to methods TRACE and OPTIONS on all paths
+ ConstraintMapping traceMapping = new ConstraintMapping();
+ traceMapping.setPathSpec("/*");
+ traceMapping.setMethod("TRACE");
+ traceMapping.setConstraint(constraint);
- ConstraintMapping cmo = new ConstraintMapping();
- cmo.setConstraint(c);
- cmo.setMethod("OPTIONS");
- cmo.setPathSpec("/*");
+ ConstraintMapping optionsMapping = new ConstraintMapping();
+ optionsMapping.setPathSpec("/*");
+ optionsMapping.setMethod("OPTIONS");
+ optionsMapping.setConstraint(constraint);
- ConstraintSecurityHandler sh = new ConstraintSecurityHandler();
- sh.setConstraintMappings(new ConstraintMapping[] {cmt, cmo});
+ // 3. Configure the ConstraintSecurityHandler
+ ConstraintSecurityHandler securityHandler = new
ConstraintSecurityHandler();
+ securityHandler.setConstraintMappings(new ConstraintMapping[]
{traceMapping, optionsMapping});
- return sh;
+ return securityHandler;
}
}
diff --git
a/flume-ng-sources/flume-http-source/src/main/java/org/apache/flume/source/http/HTTPSource.java
b/flume-ng-sources/flume-http-source/src/main/java/org/apache/flume/source/http/HTTPSource.java
index 567e0631c..9d3b3c63f 100644
---
a/flume-ng-sources/flume-http-source/src/main/java/org/apache/flume/source/http/HTTPSource.java
+++
b/flume-ng-sources/flume-http-source/src/main/java/org/apache/flume/source/http/HTTPSource.java
@@ -18,14 +18,14 @@ package org.apache.flume.source.http;
import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
+import jakarta.servlet.http.HttpServlet;
+import jakarta.servlet.http.HttpServletRequest;
+import jakarta.servlet.http.HttpServletResponse;
import java.io.IOException;
import java.lang.management.ManagementFactory;
import java.util.Collections;
import java.util.List;
import java.util.Map;
-import javax.servlet.http.HttpServlet;
-import javax.servlet.http.HttpServletRequest;
-import javax.servlet.http.HttpServletResponse;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.EventDrivenSource;
@@ -34,6 +34,8 @@ import org.apache.flume.exception.ChannelException;
import org.apache.flume.instrumentation.SourceCounter;
import org.apache.flume.source.SslContextAwareAbstractSource;
import org.apache.flume.tools.FlumeBeanConfigurator;
+import org.eclipse.jetty.ee11.servlet.ServletContextHandler;
+import org.eclipse.jetty.ee11.servlet.ServletHolder;
import org.eclipse.jetty.http.HttpVersion;
import org.eclipse.jetty.jmx.MBeanContainer;
import org.eclipse.jetty.server.HttpConfiguration;
@@ -42,8 +44,6 @@ import org.eclipse.jetty.server.SecureRequestCustomizer;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.ServerConnector;
import org.eclipse.jetty.server.SslConnectionFactory;
-import org.eclipse.jetty.servlet.ServletContextHandler;
-import org.eclipse.jetty.servlet.ServletHolder;
import org.eclipse.jetty.util.ssl.SslContextFactory;
import org.eclipse.jetty.util.thread.QueuedThreadPool;
import org.slf4j.Logger;
@@ -162,7 +162,7 @@ public class HTTPSource extends
SslContextAwareAbstractSource implements EventDr
ServerConnector connector = getSslContextSupplier()
.get()
.map(sslContext -> {
- SslContextFactory sslCtxFactory = new SslContextFactory();
+ SslContextFactory.Server sslCtxFactory = new
SslContextFactory.Server();
sslCtxFactory.setSslContext(sslContext);
sslCtxFactory.setExcludeProtocols(getExcludeProtocols().toArray(new String[]
{}));
sslCtxFactory.setIncludeProtocols(getIncludeProtocols().toArray(new String[]
{}));
diff --git
a/flume-ng-sources/flume-http-source/src/main/java/org/apache/flume/source/http/HTTPSourceHandler.java
b/flume-ng-sources/flume-http-source/src/main/java/org/apache/flume/source/http/HTTPSourceHandler.java
index b0aa5a079..3840ec7b5 100644
---
a/flume-ng-sources/flume-http-source/src/main/java/org/apache/flume/source/http/HTTPSourceHandler.java
+++
b/flume-ng-sources/flume-http-source/src/main/java/org/apache/flume/source/http/HTTPSourceHandler.java
@@ -16,8 +16,8 @@
*/
package org.apache.flume.source.http;
+import jakarta.servlet.http.HttpServletRequest;
import java.util.List;
-import javax.servlet.http.HttpServletRequest;
import org.apache.flume.Event;
import org.apache.flume.conf.Configurable;
diff --git
a/flume-ng-sources/flume-http-source/src/main/java/org/apache/flume/source/http/JSONHandler.java
b/flume-ng-sources/flume-http-source/src/main/java/org/apache/flume/source/http/JSONHandler.java
index 961766a0f..a6e10b820 100644
---
a/flume-ng-sources/flume-http-source/src/main/java/org/apache/flume/source/http/JSONHandler.java
+++
b/flume-ng-sources/flume-http-source/src/main/java/org/apache/flume/source/http/JSONHandler.java
@@ -20,12 +20,12 @@ import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import com.google.gson.JsonSyntaxException;
import com.google.gson.reflect.TypeToken;
+import jakarta.servlet.http.HttpServletRequest;
import java.io.BufferedReader;
import java.lang.reflect.Type;
import java.nio.charset.UnsupportedCharsetException;
import java.util.ArrayList;
import java.util.List;
-import javax.servlet.http.HttpServletRequest;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.event.EventBuilder;
diff --git
a/flume-ng-sources/flume-http-source/src/test/java/org/apache/flume/source/http/FlumeHttpServletRequestWrapper.java
b/flume-ng-sources/flume-http-source/src/test/java/org/apache/flume/source/http/FlumeHttpServletRequestWrapper.java
index e13b739fa..3e6a40f1c 100644
---
a/flume-ng-sources/flume-http-source/src/test/java/org/apache/flume/source/http/FlumeHttpServletRequestWrapper.java
+++
b/flume-ng-sources/flume-http-source/src/test/java/org/apache/flume/source/http/FlumeHttpServletRequestWrapper.java
@@ -16,6 +16,21 @@
*/
package org.apache.flume.source.http;
+import jakarta.servlet.AsyncContext;
+import jakarta.servlet.DispatcherType;
+import jakarta.servlet.RequestDispatcher;
+import jakarta.servlet.ServletConnection;
+import jakarta.servlet.ServletContext;
+import jakarta.servlet.ServletException;
+import jakarta.servlet.ServletInputStream;
+import jakarta.servlet.ServletRequest;
+import jakarta.servlet.ServletResponse;
+import jakarta.servlet.http.Cookie;
+import jakarta.servlet.http.HttpServletRequest;
+import jakarta.servlet.http.HttpServletResponse;
+import jakarta.servlet.http.HttpSession;
+import jakarta.servlet.http.HttpUpgradeHandler;
+import jakarta.servlet.http.Part;
import java.io.BufferedReader;
import java.io.ByteArrayInputStream;
import java.io.IOException;
@@ -26,20 +41,6 @@ import java.util.Collection;
import java.util.Enumeration;
import java.util.Locale;
import java.util.Map;
-import javax.servlet.AsyncContext;
-import javax.servlet.DispatcherType;
-import javax.servlet.RequestDispatcher;
-import javax.servlet.ServletContext;
-import javax.servlet.ServletException;
-import javax.servlet.ServletInputStream;
-import javax.servlet.ServletRequest;
-import javax.servlet.ServletResponse;
-import javax.servlet.http.Cookie;
-import javax.servlet.http.HttpServletRequest;
-import javax.servlet.http.HttpServletResponse;
-import javax.servlet.http.HttpSession;
-import javax.servlet.http.HttpUpgradeHandler;
-import javax.servlet.http.Part;
/**
*
@@ -179,11 +180,6 @@ class FlumeHttpServletRequestWrapper implements
HttpServletRequest {
throw new UnsupportedOperationException("Not supported yet.");
}
- @Override
- public boolean isRequestedSessionIdFromUrl() {
- throw new UnsupportedOperationException("Not supported yet.");
- }
-
@Override
public Object getAttribute(String name) {
throw new UnsupportedOperationException("Not supported yet.");
@@ -304,11 +300,6 @@ class FlumeHttpServletRequestWrapper implements
HttpServletRequest {
throw new UnsupportedOperationException("Not supported yet.");
}
- @Override
- public String getRealPath(String path) {
- throw new UnsupportedOperationException("Not supported yet.");
- }
-
@Override
public int getRemotePort() {
throw new UnsupportedOperationException("Not supported yet.");
@@ -403,4 +394,19 @@ class FlumeHttpServletRequestWrapper implements
HttpServletRequest {
public <T extends HttpUpgradeHandler> T upgrade(Class<T> arg0) throws
IOException, ServletException {
throw new UnsupportedOperationException("Not supported yet.");
}
+
+ @Override
+ public String getRequestId() {
+ throw new UnsupportedOperationException("Not supported yet.");
+ }
+
+ @Override
+ public String getProtocolRequestId() {
+ throw new UnsupportedOperationException("Not supported yet.");
+ }
+
+ @Override
+ public ServletConnection getServletConnection() {
+ throw new UnsupportedOperationException("Not supported yet.");
+ }
}
diff --git
a/flume-ng-sources/flume-http-source/src/test/java/org/apache/flume/source/http/TestBLOBHandler.java
b/flume-ng-sources/flume-http-source/src/test/java/org/apache/flume/source/http/TestBLOBHandler.java
index 655bd7d32..317738536 100644
---
a/flume-ng-sources/flume-http-source/src/test/java/org/apache/flume/source/http/TestBLOBHandler.java
+++
b/flume-ng-sources/flume-http-source/src/test/java/org/apache/flume/source/http/TestBLOBHandler.java
@@ -20,15 +20,15 @@ import static org.junit.Assert.assertEquals;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
+import jakarta.servlet.ReadListener;
+import jakarta.servlet.ServletInputStream;
+import jakarta.servlet.http.HttpServletRequest;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import javax.servlet.ReadListener;
-import javax.servlet.ServletInputStream;
-import javax.servlet.http.HttpServletRequest;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.junit.Before;
diff --git
a/flume-ng-sources/flume-http-source/src/test/java/org/apache/flume/source/http/TestHTTPSource.java
b/flume-ng-sources/flume-http-source/src/test/java/org/apache/flume/source/http/TestHTTPSource.java
index 7fda74cf0..fc950b8d7 100644
---
a/flume-ng-sources/flume-http-source/src/test/java/org/apache/flume/source/http/TestHTTPSource.java
+++
b/flume-ng-sources/flume-http-source/src/test/java/org/apache/flume/source/http/TestHTTPSource.java
@@ -23,6 +23,9 @@ import static org.mockito.Mockito.doThrow;
import com.google.common.collect.Maps;
import com.google.gson.Gson;
import com.google.gson.reflect.TypeToken;
+import jakarta.servlet.http.HttpServletResponse;
+import java.io.File;
+import java.io.FileOutputStream;
import java.io.IOException;
import java.lang.management.ManagementFactory;
import java.lang.reflect.Type;
@@ -32,8 +35,9 @@ import java.net.ServerSocket;
import java.net.Socket;
import java.net.URL;
import java.net.UnknownHostException;
-import java.security.SecureRandom;
-import java.security.cert.CertificateException;
+import java.security.KeyPair;
+import java.security.KeyStore;
+import java.security.cert.X509Certificate;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
@@ -46,14 +50,10 @@ import javax.management.ObjectInstance;
import javax.management.ObjectName;
import javax.management.Query;
import javax.management.QueryExp;
-import javax.net.ssl.HostnameVerifier;
import javax.net.ssl.HttpsURLConnection;
import javax.net.ssl.SSLContext;
-import javax.net.ssl.SSLSession;
import javax.net.ssl.SSLSocket;
-import javax.net.ssl.TrustManager;
-import javax.net.ssl.X509TrustManager;
-import javax.servlet.http.HttpServletResponse;
+import javax.net.ssl.TrustManagerFactory;
import org.apache.flume.Channel;
import org.apache.flume.ChannelSelector;
import org.apache.flume.Context;
@@ -72,7 +72,6 @@ import org.apache.http.client.methods.HttpOptions;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.client.methods.HttpRequestBase;
import org.apache.http.client.methods.HttpTrace;
-import org.apache.http.conn.ssl.NoopHostnameVerifier;
import org.apache.http.entity.StringEntity;
import org.apache.http.impl.client.HttpClientBuilder;
import org.junit.After;
@@ -80,7 +79,9 @@ import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
+import org.junit.ClassRule;
import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
import org.mockito.Mockito;
/**
@@ -120,14 +121,14 @@ public class TestHTTPSource {
Context sslContext = new Context();
sslContext.put(HTTPSourceConfigurationConstants.CONFIG_PORT,
String.valueOf(port));
sslContext.put(HTTPSourceConfigurationConstants.SSL_ENABLED, "true");
- sslContext.put(HTTPSourceConfigurationConstants.SSL_KEYSTORE_PASSWORD,
"password");
- sslContext.put(HTTPSourceConfigurationConstants.SSL_KEYSTORE,
"src/test/resources/jettykeystore");
+ sslContext.put(HTTPSourceConfigurationConstants.SSL_KEYSTORE_PASSWORD,
KEYSTORE_PASSWORD);
+ sslContext.put(HTTPSourceConfigurationConstants.SSL_KEYSTORE,
serverKeystorePath);
return sslContext;
}
private static Context getDefaultSecureContextGlobalKeystore(int port)
throws IOException {
- System.setProperty("javax.net.ssl.keyStore",
"src/test/resources/jettykeystore");
- System.setProperty("javax.net.ssl.keyStorePassword", "password");
+ System.setProperty("javax.net.ssl.keyStore", serverKeystorePath);
+ System.setProperty("javax.net.ssl.keyStorePassword",
KEYSTORE_PASSWORD);
Context sslContext = new Context();
sslContext.put(HTTPSourceConfigurationConstants.CONFIG_PORT,
String.valueOf(port));
@@ -135,8 +136,40 @@ public class TestHTTPSource {
return sslContext;
}
+ private static final String KEYSTORE_PASSWORD = "password";
+
+ @ClassRule
+ public static final TemporaryFolder TEMP_FOLDER = new TemporaryFolder();
+
+ // HTTPSource loads its keystore from a file, so the server material is
written to disk; the
+ // client truststore below is kept in memory.
+ private static String serverKeystorePath;
+ private static KeyStore trustStore;
+
+ private static void generateSslStores() throws Exception {
+ KeyPair keyPair = X509Certificates.generateKeyPair();
+ X509Certificate certificate =
X509Certificates.generateSelfSignedCertificate(keyPair, "CN=localhost");
+
+ KeyStore keyStore = KeyStore.getInstance("JKS");
+ keyStore.load(null, null);
+ keyStore.setKeyEntry(
+ "jetty", keyPair.getPrivate(),
KEYSTORE_PASSWORD.toCharArray(), new X509Certificate[] {certificate});
+ File keystoreFile = TEMP_FOLDER.newFile("keystore.jks");
+ try (FileOutputStream out = new FileOutputStream(keystoreFile)) {
+ keyStore.store(out, KEYSTORE_PASSWORD.toCharArray());
+ }
+ serverKeystorePath = keystoreFile.getAbsolutePath();
+
+ // The test client only needs to trust the self-signed server
certificate.
+ trustStore = KeyStore.getInstance("JKS");
+ trustStore.load(null, null);
+ trustStore.setCertificateEntry("server", certificate);
+ }
+
@BeforeClass
public static void setUpClass() throws Exception {
+ generateSslStores();
+
httpSource = new HTTPSource();
httpChannel = new MemoryChannel();
httpPort = findFreePort();
@@ -526,26 +559,6 @@ public class TestHTTPSource {
HttpsURLConnection httpsURLConnection = null;
Transaction transaction = null;
try {
- TrustManager[] trustAllCerts = {
- new X509TrustManager() {
- @Override
- public void
checkClientTrusted(java.security.cert.X509Certificate[] x509Certificates,
String s)
- throws CertificateException {
- // noop
- }
-
- @Override
- public void
checkServerTrusted(java.security.cert.X509Certificate[] x509Certificates,
String s)
- throws CertificateException {
- // noop
- }
-
- public java.security.cert.X509Certificate[]
getAcceptedIssuers() {
- return null;
- }
- }
- };
-
SSLContext sc = null;
javax.net.ssl.SSLSocketFactory factory = null;
if (System.getProperty("java.vendor").contains("IBM")) {
@@ -554,12 +567,9 @@ public class TestHTTPSource {
sc = SSLContext.getInstance("SSL");
}
- HostnameVerifier hv = new HostnameVerifier() {
- public boolean verify(String arg0, SSLSession arg1) {
- return true;
- }
- };
- sc.init(null, trustAllCerts, new SecureRandom());
+ TrustManagerFactory tmf =
TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm());
+ tmf.init(trustStore);
+ sc.init(null, tmf.getTrustManagers(), null);
if (protocol != null) {
factory = new
DisabledProtocolsSocketFactory(sc.getSocketFactory(), protocol);
@@ -567,8 +577,7 @@ public class TestHTTPSource {
factory = sc.getSocketFactory();
}
HttpsURLConnection.setDefaultSSLSocketFactory(factory);
-
HttpsURLConnection.setDefaultHostnameVerifier(NoopHostnameVerifier.INSTANCE);
- URL sslUrl = new URL("https://0.0.0.0:" + port);
+ URL sslUrl = new URL("https://localhost:" + port);
httpsURLConnection = (HttpsURLConnection) sslUrl.openConnection();
httpsURLConnection.setDoInput(true);
httpsURLConnection.setDoOutput(true);
diff --git
a/flume-ng-sources/flume-http-source/src/test/java/org/apache/flume/source/http/TestJSONHandler.java
b/flume-ng-sources/flume-http-source/src/test/java/org/apache/flume/source/http/TestJSONHandler.java
index d5f90f7e2..a5837a85d 100644
---
a/flume-ng-sources/flume-http-source/src/test/java/org/apache/flume/source/http/TestJSONHandler.java
+++
b/flume-ng-sources/flume-http-source/src/test/java/org/apache/flume/source/http/TestJSONHandler.java
@@ -20,12 +20,12 @@ import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.gson.Gson;
import com.google.gson.reflect.TypeToken;
+import jakarta.servlet.http.HttpServletRequest;
import java.lang.reflect.Type;
import java.nio.charset.UnsupportedCharsetException;
import java.util.List;
import java.util.Map;
import java.util.Random;
-import javax.servlet.http.HttpServletRequest;
import junit.framework.Assert;
import org.apache.flume.Event;
import org.apache.flume.event.JSONEvent;
diff --git
a/flume-ng-sources/flume-http-source/src/test/java/org/apache/flume/source/http/X509Certificates.java
b/flume-ng-sources/flume-http-source/src/test/java/org/apache/flume/source/http/X509Certificates.java
new file mode 100644
index 000000000..c98ece50f
--- /dev/null
+++
b/flume-ng-sources/flume-http-source/src/test/java/org/apache/flume/source/http/X509Certificates.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flume.source.http;
+
+import java.math.BigInteger;
+import java.security.KeyPair;
+import java.security.KeyPairGenerator;
+import java.security.cert.X509Certificate;
+import java.util.Date;
+import java.util.Random;
+import org.bouncycastle.asn1.x500.X500Name;
+import org.bouncycastle.asn1.x509.BasicConstraints;
+import org.bouncycastle.asn1.x509.ExtendedKeyUsage;
+import org.bouncycastle.asn1.x509.Extension;
+import org.bouncycastle.asn1.x509.KeyPurposeId;
+import org.bouncycastle.asn1.x509.KeyUsage;
+import org.bouncycastle.cert.X509CertificateHolder;
+import org.bouncycastle.cert.jcajce.JcaX509CertificateConverter;
+import org.bouncycastle.cert.jcajce.JcaX509v3CertificateBuilder;
+import org.bouncycastle.operator.ContentSigner;
+import org.bouncycastle.operator.jcajce.JcaContentSignerBuilder;
+
+/**
+ * Utility class to generate X.509 certificates for testing purposes.
+ */
+final class X509Certificates {
+
+ private static final long MINUTE_IN_MILLIS = 60_000L;
+ private static final long YEAR_IN_MILLIS = 365L * 24 * 60 *
MINUTE_IN_MILLIS;
+
+ private static final KeyPairGenerator RSA_GENERATOR;
+ private static final Random RANDOM = new Random();
+
+ static {
+ try {
+ RSA_GENERATOR = KeyPairGenerator.getInstance("RSA");
+ RSA_GENERATOR.initialize(2048);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ static KeyPair generateKeyPair() {
+ return RSA_GENERATOR.generateKeyPair();
+ }
+
+ /**
+ * Create a self-signed X.509 server certificate for tests.
+ *
+ * @param keyPair the certificate key pair
+ * @param subjectDn the subject distinguished name (for example {@code
CN=localhost})
+ * @return a self-signed X.509 server certificate
+ * @throws Exception if certificate creation or signing fails
+ */
+ static X509Certificate generateSelfSignedCertificate(KeyPair keyPair,
String subjectDn) throws Exception {
+ long now = System.currentTimeMillis();
+ Date notBefore = new Date(now - MINUTE_IN_MILLIS);
+ Date notAfter = new Date(now + YEAR_IN_MILLIS);
+ BigInteger serial = BigInteger.valueOf(RANDOM.nextLong()).abs();
+
+ X500Name dn = new X500Name(subjectDn);
+ JcaX509v3CertificateBuilder builder =
+ new JcaX509v3CertificateBuilder(dn, serial, notBefore,
notAfter, dn, keyPair.getPublic());
+
+ builder.addExtension(Extension.basicConstraints, true, new
BasicConstraints(false));
+ // The required key usage for the server certificate depends on the
key exchange algorithm:
+ // - keyEncipherment for RSA key exchange (deprecated)
+ // - digitalSignature for ephemeral Diffie-Hellman key exchange (DHE
or ECDHE)
+ // - keyAgreement for static Diffie-Hellman key exchange (DH or ECDH)
+ builder.addExtension(Extension.keyUsage, true, new
KeyUsage(KeyUsage.digitalSignature | KeyUsage.keyAgreement));
+ builder.addExtension(Extension.extendedKeyUsage, false, new
ExtendedKeyUsage(KeyPurposeId.id_kp_serverAuth));
+
+ ContentSigner signer = new
JcaContentSignerBuilder("SHA256withRSA").build(keyPair.getPrivate());
+ X509CertificateHolder holder = builder.build(signer);
+ return new JcaX509CertificateConverter().getCertificate(holder);
+ }
+
+ private X509Certificates() {
+ // private constructor to prevent instantiation
+ }
+}
diff --git
a/flume-ng-sources/flume-http-source/src/test/resources/jettykeystore
b/flume-ng-sources/flume-http-source/src/test/resources/jettykeystore
deleted file mode 100644
index db76bcbc0..000000000
Binary files
a/flume-ng-sources/flume-http-source/src/test/resources/jettykeystore and
/dev/null differ
diff --git a/flume-ng-sources/flume-taildir-source/pom.xml
b/flume-ng-sources/flume-taildir-source/pom.xml
index ffbc5a059..c4e34bb06 100644
--- a/flume-ng-sources/flume-taildir-source/pom.xml
+++ b/flume-ng-sources/flume-taildir-source/pom.xml
@@ -52,6 +52,11 @@
<artifactId>flume-ng-core</artifactId>
</dependency>
+ <dependency>
+ <groupId>com.google.code.gson</groupId>
+ <artifactId>gson</artifactId>
+ </dependency>
+
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
diff --git a/flume-parent/pom.xml b/flume-parent/pom.xml
index 9035c0352..0961fa6b0 100644
--- a/flume-parent/pom.xml
+++ b/flume-parent/pom.xml
@@ -252,9 +252,11 @@
<hadoop3.version>3.2.2</hadoop3.version>
<hadoop.version>${hadoop3.version}</hadoop.version>
<httpcore.version>4.4.15</httpcore.version>
+ <bouncycastle.version>1.84</bouncycastle.version>
<httpclient.version>4.5.13</httpclient.version>
<irclib.version>1.10</irclib.version>
- <jetty.version>9.4.51.v20230217</jetty.version>
+ <jakarta-servlet.version>6.1.0</jakarta-servlet.version>
+ <jetty.version>12.1.9</jetty.version>
<junit.version>4.13.2</junit.version>
<log4j.version>2.26.0</log4j.version>
<mapdb.version>0.9.9</mapdb.version>
@@ -280,7 +282,7 @@
<netty-all.version>4.1.86.Final</netty-all.version>
<external.protobuf.version>4.35.0</external.protobuf.version>
<protobuf.plugin.version>0.6.1</protobuf.plugin.version>
- <prometheus.version>0.15.0</prometheus.version>
+ <prometheus.version>1.7.0</prometheus.version>
<rat.version>0.12</rat.version>
<snappy-java.version>1.1.8.4</snappy-java.version>
<slf4j.version>1.7.32</slf4j.version>
@@ -298,6 +300,14 @@
<dependencyManagement>
<dependencies>
<!-- Dependencies: build -->
+ <dependency>
+ <groupId>org.bouncycastle</groupId>
+ <artifactId>bc-jdk18on-bom</artifactId>
+ <version>${bouncycastle.version}</version>
+ <type>pom</type>
+ <scope>import</scope>
+ </dependency>
+
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
@@ -409,8 +419,16 @@
</dependency>
<dependency>
- <groupId>org.eclipse.jetty</groupId>
- <artifactId>jetty-servlet</artifactId>
+ <groupId>jakarta.servlet</groupId>
+ <artifactId>jakarta.servlet-api</artifactId>
+ <!-- Use 5.0.0 for Jetty 11 / EE9. Use 6.0.0 or 6.1.0 for Jetty 12
(EE10) -->
+ <version>${jakarta-servlet.version}</version>
+ <scope>provided</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.eclipse.jetty.ee11</groupId>
+ <artifactId>jetty-ee11-servlet</artifactId>
<version>${jetty.version}</version>
</dependency>
@@ -560,13 +578,13 @@
<dependency>
<groupId>io.prometheus</groupId>
- <artifactId>simpleclient</artifactId>
+ <artifactId>prometheus-metrics-core</artifactId>
<version>${prometheus.version}</version>
</dependency>
<dependency>
<groupId>io.prometheus</groupId>
- <artifactId>simpleclient_servlet</artifactId>
+ <artifactId>prometheus-metrics-exporter-servlet-jakarta</artifactId>
<version>${prometheus.version}</version>
</dependency>
diff --git a/pom.xml b/pom.xml
index 7e360462d..c13d27277 100644
--- a/pom.xml
+++ b/pom.xml
@@ -37,6 +37,7 @@
<module>flume-ng-core</module>
<module>flume-ng-configuration</module>
<module>flume-ng-sources</module>
+ <module>flume-ng-instrumentation</module>
<module>flume-ng-node</module>
<!-- Disable until all snapshots are published
<module>flume-ng-dist</module>