This is an automated email from the ASF dual-hosted git repository.

rgoers pushed a commit to branch Prometheus-and-Jetty
in repository https://gitbox.apache.org/repos/asf/logging-flume.git

commit ee634d488ff0367b49aa4fa041f3d59fecc8dc2d
Author: Ralph Goers <[email protected]>
AuthorDate: Tue Jun 9 08:15:52 2026 -0700

    Upgrade Prometheus and Jetty
---
 flume-ng-core/pom.xml                              |   9 +-
 .../instrumentation/http/HTTPMetricsServer.java    |  69 +-
 .../http/PrometheusHTTPMetricsServer.java          | 200 +++---
 .../org/apache/flume/source/http/BLOBHandler.java  |  98 ---
 .../flume/source/http/HTTPBadRequestException.java |  42 --
 .../org/apache/flume/source/http/HTTPSource.java   | 296 ---------
 .../http/HTTPSourceConfigurationConstants.java     |  45 --
 .../flume/source/http/HTTPSourceHandler.java       |  42 --
 .../org/apache/flume/source/http/JSONHandler.java  | 133 ----
 .../flume/tools/HTTPServerConstraintUtil.java      |  36 +-
 .../http/TestHTTPMetricsServer.java                |   2 +-
 .../flume/source/TestDefaultSourceFactory.java     |   5 -
 .../http/FlumeHttpServletRequestWrapper.java       | 406 ------------
 .../apache/flume/source/http/TestBLOBHandler.java  | 173 -----
 .../apache/flume/source/http/TestHTTPSource.java   | 732 ---------------------
 .../apache/flume/source/http/TestJSONHandler.java  | 235 -------
 flume-parent/pom.xml                               |  21 +-
 17 files changed, 156 insertions(+), 2388 deletions(-)

diff --git a/flume-ng-core/pom.xml b/flume-ng-core/pom.xml
index 2c1ee40fa..a5a4d128e 100644
--- a/flume-ng-core/pom.xml
+++ b/flume-ng-core/pom.xml
@@ -131,9 +131,10 @@
       <artifactId>netty-all</artifactId>
     </dependency>
 
+
     <dependency>
-      <groupId>org.eclipse.jetty</groupId>
-      <artifactId>jetty-servlet</artifactId>
+      <groupId>org.eclipse.jetty.ee11</groupId>
+      <artifactId>jetty-ee11-servlet</artifactId>
     </dependency>
 
     <dependency>
@@ -181,12 +182,12 @@
 
     <dependency>
       <groupId>io.prometheus</groupId>
-      <artifactId>simpleclient</artifactId>
+      <artifactId>prometheus-metrics-core</artifactId>
     </dependency>
 
     <dependency>
       <groupId>io.prometheus</groupId>
-      <artifactId>simpleclient_servlet</artifactId>
+      <artifactId>prometheus-metrics-exporter-servlet-jakarta</artifactId>
     </dependency>
 
   </dependencies>
diff --git 
a/flume-ng-core/src/main/java/org/apache/flume/instrumentation/http/HTTPMetricsServer.java
 
b/flume-ng-core/src/main/java/org/apache/flume/instrumentation/http/HTTPMetricsServer.java
index 1fefc5ec7..506ad59ed 100644
--- 
a/flume-ng-core/src/main/java/org/apache/flume/instrumentation/http/HTTPMetricsServer.java
+++ 
b/flume-ng-core/src/main/java/org/apache/flume/instrumentation/http/HTTPMetricsServer.java
@@ -18,21 +18,21 @@ package org.apache.flume.instrumentation.http;
 
 import com.google.gson.Gson;
 import com.google.gson.reflect.TypeToken;
-import java.io.IOException;
-import java.lang.reflect.Type;
+import jakarta.servlet.http.HttpServletResponse;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
 import java.util.Map;
-import javax.servlet.ServletException;
-import javax.servlet.http.HttpServletRequest;
-import javax.servlet.http.HttpServletResponse;
 import org.apache.flume.Context;
 import org.apache.flume.instrumentation.MonitorService;
 import org.apache.flume.instrumentation.util.JMXPollUtil;
+import org.eclipse.jetty.server.Handler;
 import org.eclipse.jetty.server.HttpConfiguration;
 import org.eclipse.jetty.server.HttpConnectionFactory;
 import org.eclipse.jetty.server.Request;
+import org.eclipse.jetty.server.Response;
 import org.eclipse.jetty.server.Server;
 import org.eclipse.jetty.server.ServerConnector;
-import org.eclipse.jetty.server.handler.AbstractHandler;
+import org.eclipse.jetty.util.Callback;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -94,45 +94,44 @@ public class HTTPMetricsServer implements MonitorService {
         port = context.getInteger(CONFIG_PORT, DEFAULT_PORT);
     }
 
-    private class HTTPMetricsHandler extends AbstractHandler {
+    private class HTTPMetricsHandler extends Handler.Abstract {
 
-        Type mapType = new TypeToken<Map<String, Map<String, String>>>() 
{}.getType();
+        java.lang.reflect.Type mapType = new TypeToken<Map<String, Map<String, 
String>>>() {}.getType();
         Gson gson = new Gson();
 
         @Override
-        public void handle(String target, Request r1, HttpServletRequest 
request, HttpServletResponse response)
-                throws IOException, ServletException {
-            // /metrics is the only place to pull metrics.
-            // If we want to use any other url for something else, we should 
make sure
-            // that for metrics only /metrics is used to prevent backward
-            // compatibility issues.
-            if (request.getMethod().equalsIgnoreCase("TRACE")
-                    || request.getMethod().equalsIgnoreCase("OPTIONS")) {
-                response.sendError(HttpServletResponse.SC_FORBIDDEN);
-                response.flushBuffer();
-                ((Request) request).setHandled(true);
-                return;
+        public boolean handle(Request request, Response response, Callback 
callback) throws Exception {
+            String method = request.getMethod();
+            String path = Request.getPathInContext(request);
+
+            if (method.equalsIgnoreCase("TRACE") || 
method.equalsIgnoreCase("OPTIONS")) {
+                response.setStatus(HttpServletResponse.SC_FORBIDDEN);
+                callback.succeeded();
+                return true;
             }
-            if (target.equals("/")) {
-                response.setContentType("text/html;charset=utf-8");
+
+            if ("/".equals(path)) {
                 response.setStatus(HttpServletResponse.SC_OK);
-                response.getWriter().write("For Flume metrics please click" + 
" <a href = \"./metrics\"> here</a>.");
-                response.flushBuffer();
-                ((Request) request).setHandled(true);
-                return;
-            } else if (target.equalsIgnoreCase("/metrics")) {
-                response.setContentType("application/json;charset=utf-8");
+                response.getHeaders().put("Content-Type", 
"text/html;charset=utf-8");
+                String html = "For Flume metrics please click <a 
href=\"./metrics\"> here</a>.";
+                response.write(true, 
ByteBuffer.wrap(html.getBytes(StandardCharsets.UTF_8)), callback);
+                return true;
+            }
+
+            if ("/metrics".equalsIgnoreCase(path)) {
                 response.setStatus(HttpServletResponse.SC_OK);
+                response.getHeaders().put("Content-Type", 
"application/json;charset=utf-8");
+
                 Map<String, Map<String, String>> metricsMap = 
JMXPollUtil.getAllMBeans();
                 String json = gson.toJson(metricsMap, mapType);
-                response.getWriter().write(json);
-                response.flushBuffer();
-                ((Request) request).setHandled(true);
-                return;
+
+                response.write(true, 
ByteBuffer.wrap(json.getBytes(StandardCharsets.UTF_8)), callback);
+                return true;
             }
-            response.sendError(HttpServletResponse.SC_NOT_FOUND);
-            response.flushBuffer();
-            // Not handling the request returns a Not found error page.
+
+            response.setStatus(HttpServletResponse.SC_NOT_FOUND);
+            callback.succeeded();
+            return true;
         }
     }
 }
diff --git 
a/flume-ng-core/src/main/java/org/apache/flume/instrumentation/http/PrometheusHTTPMetricsServer.java
 
b/flume-ng-core/src/main/java/org/apache/flume/instrumentation/http/PrometheusHTTPMetricsServer.java
index 3512050eb..ea876b131 100644
--- 
a/flume-ng-core/src/main/java/org/apache/flume/instrumentation/http/PrometheusHTTPMetricsServer.java
+++ 
b/flume-ng-core/src/main/java/org/apache/flume/instrumentation/http/PrometheusHTTPMetricsServer.java
@@ -17,16 +17,14 @@
 package org.apache.flume.instrumentation.http;
 
 import com.google.common.base.Throwables;
-import io.prometheus.client.Collector;
-import io.prometheus.client.CounterMetricFamily;
-import io.prometheus.client.GaugeMetricFamily;
-import io.prometheus.client.exporter.MetricsServlet;
+import io.prometheus.metrics.core.metrics.Counter;
+import io.prometheus.metrics.core.metrics.Gauge;
+import io.prometheus.metrics.model.registry.PrometheusRegistry;
+import io.prometheus.metrics.exporter.servlet.jakarta.PrometheusMetricsServlet;
 import java.lang.management.ManagementFactory;
 import java.lang.reflect.Method;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.HashMap;
-import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.TreeMap;
@@ -44,20 +42,16 @@ import org.eclipse.jetty.server.HttpConfiguration;
 import org.eclipse.jetty.server.HttpConnectionFactory;
 import org.eclipse.jetty.server.Server;
 import org.eclipse.jetty.server.ServerConnector;
-import org.eclipse.jetty.servlet.ServletContextHandler;
-import org.eclipse.jetty.servlet.ServletHolder;
+import org.eclipse.jetty.ee11.servlet.ServletContextHandler;
+import org.eclipse.jetty.ee11.servlet.ServletHolder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
  * A Monitor service implementation that runs a web server on a configurable
- * port and returns the metrics for components in JSON format. <p> Optional
+ * port and returns the metrics for components in Prometheus format. <p> 
Optional
  * parameters: <p> <tt>port</tt> : The port on which the server should listen
- * to.<p> Returns metrics in the following format: <p>
- *
- * {<p> "componentName1":{"metric1" : "metricValue1","metric2":"metricValue2"}
- * <p> "componentName1":{"metric3" : "metricValue3","metric4":"metricValue4"}
- * <p> }
+ * to.<p> Returns metrics in Prometheus text format via /metrics endpoint
  */
 public class PrometheusHTTPMetricsServer extends HTTPMetricsServer implements 
MonitorService {
 
@@ -66,12 +60,13 @@ public class PrometheusHTTPMetricsServer extends 
HTTPMetricsServer implements Mo
     private static Logger LOG = 
LoggerFactory.getLogger(PrometheusHTTPMetricsServer.class);
     private static MBeanServer mbeanServer = 
ManagementFactory.getPlatformMBeanServer();
 
-    private FlumePrometheusCollector requests;
+    private FlumePrometheusCollector metricsCollector;
 
     @Override
     public void start() {
 
-        requests = new FlumePrometheusCollector().register();
+        metricsCollector = new FlumePrometheusCollector();
+        metricsCollector.register();
 
         jettyServer = new Server();
         // We can use Contexts etc if we have many urls to handle. For one url,
@@ -84,24 +79,27 @@ public class PrometheusHTTPMetricsServer extends 
HTTPMetricsServer implements Mo
         ServletContextHandler context = new ServletContextHandler();
         context.setContextPath("/");
         jettyServer.setHandler(context);
-        context.addServlet(new ServletHolder(new MetricsServlet()), 
"/metrics");
+        context.addServlet(new ServletHolder(new PrometheusMetricsServlet()), 
"/metrics");
         try {
             jettyServer.start();
             while (!jettyServer.isStarted()) {
                 Thread.sleep(500);
             }
         } catch (Exception ex) {
-            LOG.error("Error starting Jetty. JSON Metrics may not be 
available.", ex);
+            LOG.error("Error starting Jetty. Prometheus Metrics may not be 
available.", ex);
         }
     }
 
-    class FlumePrometheusCollector extends Collector {
-
-        public List<MetricFamilySamples> collect() {
+    class FlumePrometheusCollector {
+        private final Map<String, Counter> counters = new HashMap<>();
+        private final Map<String, Gauge> gauges = new HashMap<>();
+        private final PrometheusRegistry registry = 
PrometheusRegistry.defaultRegistry;
 
-            Map<Object, Map<String, MetricFamilySamples>> counterMetricMap = 
new HashMap<>();
-            List<Collector.MetricFamilySamples> mfs = new ArrayList<>();
+        public void register() {
+            collectMetrics();
+        }
 
+        private void collectMetrics() {
             Set<ObjectInstance> queryMBeans;
             try {
                 queryMBeans = mbeanServer.queryMBeans(null, null);
@@ -109,57 +107,44 @@ public class PrometheusHTTPMetricsServer extends 
HTTPMetricsServer implements Mo
                 for (ObjectInstance obj : queryMBeans) {
                     try {
                         if 
(obj.getObjectName().toString().startsWith("org.apache.flume")) {
-                            processFlumeMetric(counterMetricMap, mfs, obj);
+                            processFlumeMetric(obj);
                         } else if 
((obj.getObjectName().toString().startsWith("kafka.consumer")
-                                        || 
obj.getObjectName().toString().startsWith("kafka.producer"))
+                                || 
obj.getObjectName().toString().startsWith("kafka.producer"))
                                 && 
obj.getObjectName().toString().contains("metrics")) {
-                            processKafkaMetric(counterMetricMap, mfs, obj);
+                            processKafkaMetric(obj);
                         }
 
                     } catch (Exception e) {
                         LOG.error("Unable to poll JMX for metrics.", e);
                     }
                 }
-                return mfs;
 
             } catch (Exception ex) {
                 LOG.error("Could not get Mbeans for monitoring", ex);
                 Throwables.propagate(ex);
-                return null;
             }
         }
 
-        private void processFlumeMetric(
-                Map<Object, Map<String, MetricFamilySamples>> counterMetricMap,
-                List<MetricFamilySamples> mfs,
-                ObjectInstance obj)
+        private void processFlumeMetric(ObjectInstance obj)
                 throws ClassNotFoundException, InstanceNotFoundException, 
IntrospectionException, ReflectionException {
-            Class mbeanClass = Class.forName(obj.getClassName());
-            Map<String, MetricFamilySamples> metricsMap;
-
-            if (!counterMetricMap.containsKey(mbeanClass)) {
-                metricsMap = new HashMap<>();
-
-                for (Method method : mbeanClass.getMethods()) {
-                    String methodName = method.getName();
-                    if (methodName.startsWith("increment") && 
methodName.length() > "increment".length()) {
-                        String counterName = PROM_DEFAULT_PREFIX + 
methodName.substring("increment".length());
-                        createCounterIfNotExists(mfs, metricsMap, counterName);
-                    } else if (methodName.startsWith("addTo")) {
-                        String counterName = PROM_DEFAULT_PREFIX + 
methodName.substring("addTo".length());
-                        createCounterIfNotExists(mfs, metricsMap, counterName);
-                    } else if (methodName.startsWith("set")) {
-                        String counterName = PROM_DEFAULT_PREFIX + 
methodName.substring("set".length());
-                        createGaugeIfNotExists(mfs, metricsMap, counterName, 
Arrays.asList("component"));
-                    }
+            Class<?> mbeanClass = Class.forName(obj.getClassName());
+
+            // First pass: create counters and gauges based on method names
+            for (Method method : mbeanClass.getMethods()) {
+                String methodName = method.getName();
+                if (methodName.startsWith("increment") && methodName.length() 
> "increment".length()) {
+                    String counterName = PROM_DEFAULT_PREFIX + 
methodName.substring("increment".length());
+                    createCounterIfNotExists(counterName);
+                } else if (methodName.startsWith("addTo")) {
+                    String counterName = PROM_DEFAULT_PREFIX + 
methodName.substring("addTo".length());
+                    createCounterIfNotExists(counterName);
+                } else if (methodName.startsWith("set")) {
+                    String gaugeName = PROM_DEFAULT_PREFIX + 
methodName.substring("set".length());
+                    createGaugeIfNotExists(gaugeName);
                 }
-
-                counterMetricMap.put(mbeanClass, metricsMap);
-
-            } else {
-                metricsMap = counterMetricMap.get(mbeanClass);
             }
 
+            // Second pass: get attribute values and update metrics
             MBeanAttributeInfo[] attrs =
                     
mbeanServer.getMBeanInfo(obj.getObjectName()).getAttributes();
             String[] strAtts = new String[attrs.length];
@@ -174,26 +159,25 @@ public class PrometheusHTTPMetricsServer extends 
HTTPMetricsServer implements Mo
             for (Object attr : attrList) {
                 Attribute localAttr = (Attribute) attr;
                 if (!localAttr.getName().equalsIgnoreCase("type")) {
-                    MetricFamilySamples samples = 
metricsMap.get(PROM_DEFAULT_PREFIX + localAttr.getName());
-                    if (samples instanceof CounterMetricFamily) {
-                        ((CounterMetricFamily) samples)
-                                .addMetric(
-                                        Arrays.asList(component),
-                                        
Double.valueOf(localAttr.getValue().toString()));
-                    } else if (samples instanceof GaugeMetricFamily) {
-                        ((GaugeMetricFamily) samples)
-                                .addMetric(
-                                        Arrays.asList(component),
-                                        
Double.valueOf(localAttr.getValue().toString()));
+                    String metricName = PROM_DEFAULT_PREFIX + 
localAttr.getName();
+                    double value = 
Double.parseDouble(localAttr.getValue().toString());
+
+                    Counter counter = counters.get(metricName);
+                    if (counter != null) {
+                        // For counters, we label by component
+                        counter.labelValues(component).inc(value);
+                    }
+
+                    Gauge gauge = gauges.get(metricName);
+                    if (gauge != null) {
+                        // For gauges, we label by component
+                        gauge.labelValues(component).set(value);
                     }
                 }
             }
         }
 
-        private void processKafkaMetric(
-                Map<Object, Map<String, MetricFamilySamples>> counterMetricMap,
-                List<MetricFamilySamples> mfs,
-                ObjectInstance obj)
+        private void processKafkaMetric(ObjectInstance obj)
                 throws InstanceNotFoundException, IntrospectionException, 
ReflectionException {
 
             ObjectName objectName = obj.getObjectName();
@@ -201,18 +185,12 @@ public class PrometheusHTTPMetricsServer extends 
HTTPMetricsServer implements Mo
 
             TreeMap<String, String> properties = new TreeMap<>();
             for (String key : objectName.getKeyPropertyList().keySet()) {
-                properties.put(
-                        makeStringPromSafe(key), 
objectName.getKeyPropertyList().get(key));
+                properties.put(makeStringPromSafe(key), 
objectName.getKeyPropertyList().get(key));
             }
 
-            // We create a unique name for the metric based on the metric that 
came from Kafka, plus
-            // all of the properties. Unfortunately Kafka does not have unique 
metric names and therefore
-            // you can end up with metrics with differing property lists 
(which you can't have.
             String metricKey = qualifiedType + "_" + String.join("_", 
properties.keySet()) + "_";
 
-            Map<String, MetricFamilySamples> metricsMap;
-
-            // Get the attribute list now as we'll need it to create the gauge
+            // Get the attribute list now as we'll need it to create gauges
             MBeanAttributeInfo[] attrs =
                     
mbeanServer.getMBeanInfo(obj.getObjectName()).getAttributes();
             String[] strAtts = new String[attrs.length];
@@ -220,22 +198,10 @@ public class PrometheusHTTPMetricsServer extends 
HTTPMetricsServer implements Mo
                 strAtts[i] = attrs[i].getName();
             }
 
-            // We pre-create each metric (once) before populating it once for 
each matching mbean
-            if (!counterMetricMap.containsKey(metricKey)) {
-                metricsMap = new HashMap<>();
-
-                for (String attr : strAtts) {
-                    createGaugeIfNotExists(
-                            mfs,
-                            metricsMap,
-                            metricKey + "_" + makeStringPromSafe(attr),
-                            new ArrayList<>(properties.keySet()));
-                }
-
-                counterMetricMap.put(metricKey, metricsMap);
-
-            } else {
-                metricsMap = counterMetricMap.get(metricKey);
+            // Pre-create each metric (once) before populating it
+            for (String attr : strAtts) {
+                String gaugeName = metricKey + "_" + makeStringPromSafe(attr);
+                createGaugeIfNotExists(gaugeName);
             }
 
             AttributeList attrList = 
mbeanServer.getAttributes(obj.getObjectName(), strAtts);
@@ -244,42 +210,42 @@ public class PrometheusHTTPMetricsServer extends 
HTTPMetricsServer implements Mo
                 Attribute localAttr = (Attribute) attr;
 
                 try {
-
-                    GaugeMetricFamily samples = (GaugeMetricFamily)
-                            metricsMap.get(metricKey + "_" + 
makeStringPromSafe(localAttr.getName()));
-                    samples.addMetric(
-                            new ArrayList<>(properties.values()),
-                            Double.valueOf(localAttr.getValue().toString()));
+                    String gaugeName = metricKey + "_" + 
makeStringPromSafe(localAttr.getName());
+                    Gauge gauge = gauges.get(gaugeName);
+                    if (gauge != null) {
+                        double value = 
Double.parseDouble(localAttr.getValue().toString());
+                        gauge.labelValues(new 
ArrayList<>(properties.values()).toArray(new String[0]))
+                                .set(value);
+                    }
                 } catch (Exception e) {
                     LOG.warn("Metric {} could not be monitored", metricKey, e);
                 }
             }
         }
 
-        // Prometeus is really unhappy with metrics with , or - in, so replace 
them
+        // Prometheus is really unhappy with metrics with , or - in, so 
replace them
         private String makeStringPromSafe(String input) {
             return input.replaceAll("[.\\-]", "");
         }
 
-        private void createCounterIfNotExists(
-                List<MetricFamilySamples> mfs, Map<String, 
MetricFamilySamples> metricsMap, String counterName) {
-            if (!metricsMap.containsKey(counterName)) {
-                CounterMetricFamily labeledCounter =
-                        new CounterMetricFamily(counterName, counterName, 
Arrays.asList("component"));
-                metricsMap.put(counterName, labeledCounter);
-                mfs.add(labeledCounter);
+        private void createCounterIfNotExists(String counterName) {
+            if (!counters.containsKey(counterName)) {
+                Counter counter = Counter.builder()
+                        .name(counterName)
+                        .help(counterName)
+                        .labelNames("component")
+                        .register();
+                counters.put(counterName, counter);
             }
         }
 
-        private void createGaugeIfNotExists(
-                List<MetricFamilySamples> mfs,
-                Map<String, MetricFamilySamples> metricsMap,
-                String gaugeName,
-                List<String> labelNames) {
-            if (!metricsMap.containsKey(gaugeName)) {
-                GaugeMetricFamily labelledGauge = new 
GaugeMetricFamily(gaugeName, gaugeName, labelNames);
-                metricsMap.put(gaugeName, labelledGauge);
-                mfs.add(labelledGauge);
+        private void createGaugeIfNotExists(String gaugeName) {
+            if (!gauges.containsKey(gaugeName)) {
+                Gauge gauge = Gauge.builder()
+                        .name(gaugeName)
+                        .help(gaugeName)
+                        .register();
+                gauges.put(gaugeName, gauge);
             }
         }
     }
diff --git 
a/flume-ng-core/src/main/java/org/apache/flume/source/http/BLOBHandler.java 
b/flume-ng-core/src/main/java/org/apache/flume/source/http/BLOBHandler.java
deleted file mode 100644
index 17bd2b3da..000000000
--- a/flume-ng-core/src/main/java/org/apache/flume/source/http/BLOBHandler.java
+++ /dev/null
@@ -1,98 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to you under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flume.source.http;
-
-import com.google.common.base.Preconditions;
-import java.io.InputStream;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import javax.servlet.http.HttpServletRequest;
-import org.apache.commons.io.IOUtils;
-import org.apache.commons.io.output.ByteArrayOutputStream;
-import org.apache.flume.Context;
-import org.apache.flume.Event;
-import org.apache.flume.conf.LogPrivacyUtil;
-import org.apache.flume.event.EventBuilder;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- *
- * BLOBHandler for HTTPSource that accepts any binary stream of data as event.
- *
- */
-public class BLOBHandler implements HTTPSourceHandler {
-
-    private static final Logger LOG = 
LoggerFactory.getLogger(BLOBHandler.class);
-
-    private String commaSeparatedHeaders;
-
-    private String[] mandatoryHeaders;
-
-    public static final String MANDATORY_PARAMETERS = "mandatoryParameters";
-
-    public static final String DEFAULT_MANDATORY_PARAMETERS = "";
-
-    public static final String PARAMETER_SEPARATOR = ",";
-
-    /**
-     * {@inheritDoc}
-     */
-    @SuppressWarnings("unchecked")
-    @Override
-    public List<Event> getEvents(HttpServletRequest request) throws Exception {
-        Map<String, String> headers = new HashMap<String, String>();
-
-        InputStream inputStream = request.getInputStream();
-
-        Map<String, String[]> parameters = request.getParameterMap();
-        for (String parameter : parameters.keySet()) {
-            String value = parameters.get(parameter)[0];
-            if (LOG.isDebugEnabled() && LogPrivacyUtil.allowLogRawData()) {
-                LOG.debug("Setting Header [Key, Value] as [{},{}] ", 
parameter, value);
-            }
-            headers.put(parameter, value);
-        }
-
-        for (String header : mandatoryHeaders) {
-            Preconditions.checkArgument(
-                    headers.containsKey(header), "Please specify " + header + 
" parameter in the request.");
-        }
-
-        ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
-        try {
-            IOUtils.copy(inputStream, outputStream);
-            LOG.debug("Building an Event with stream of size -- {}", 
outputStream.size());
-            Event event = EventBuilder.withBody(outputStream.toByteArray(), 
headers);
-            event.setHeaders(headers);
-            List<Event> eventList = new ArrayList<Event>();
-            eventList.add(event);
-            return eventList;
-        } finally {
-            outputStream.close();
-            inputStream.close();
-        }
-    }
-
-    @Override
-    public void configure(Context context) {
-        this.commaSeparatedHeaders = context.getString(MANDATORY_PARAMETERS, 
DEFAULT_MANDATORY_PARAMETERS);
-        this.mandatoryHeaders = 
commaSeparatedHeaders.split(PARAMETER_SEPARATOR);
-    }
-}
diff --git 
a/flume-ng-core/src/main/java/org/apache/flume/source/http/HTTPBadRequestException.java
 
b/flume-ng-core/src/main/java/org/apache/flume/source/http/HTTPBadRequestException.java
deleted file mode 100644
index 61983382e..000000000
--- 
a/flume-ng-core/src/main/java/org/apache/flume/source/http/HTTPBadRequestException.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to you under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flume.source.http;
-
-import org.apache.flume.FlumeException;
-
-/**
- *
- * Exception thrown by an HTTP Handler if the request was not parsed correctly
- * into an event because the request was not in the expected format.
- *
- */
-public class HTTPBadRequestException extends FlumeException {
-
-    private static final long serialVersionUID = -3540764742069390951L;
-
-    public HTTPBadRequestException(String msg) {
-        super(msg);
-    }
-
-    public HTTPBadRequestException(String msg, Throwable th) {
-        super(msg, th);
-    }
-
-    public HTTPBadRequestException(Throwable th) {
-        super(th);
-    }
-}
diff --git 
a/flume-ng-core/src/main/java/org/apache/flume/source/http/HTTPSource.java 
b/flume-ng-core/src/main/java/org/apache/flume/source/http/HTTPSource.java
deleted file mode 100644
index 52c29ca9e..000000000
--- a/flume-ng-core/src/main/java/org/apache/flume/source/http/HTTPSource.java
+++ /dev/null
@@ -1,296 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to you under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flume.source.http;
-
-import com.google.common.base.Preconditions;
-import com.google.common.base.Throwables;
-import java.io.IOException;
-import java.lang.management.ManagementFactory;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-import javax.servlet.http.HttpServlet;
-import javax.servlet.http.HttpServletRequest;
-import javax.servlet.http.HttpServletResponse;
-import org.apache.flume.Context;
-import org.apache.flume.Event;
-import org.apache.flume.EventDrivenSource;
-import org.apache.flume.conf.Configurable;
-import org.apache.flume.exception.ChannelException;
-import org.apache.flume.instrumentation.SourceCounter;
-import org.apache.flume.source.SslContextAwareAbstractSource;
-import org.apache.flume.tools.FlumeBeanConfigurator;
-import org.apache.flume.tools.HTTPServerConstraintUtil;
-import org.eclipse.jetty.http.HttpVersion;
-import org.eclipse.jetty.jmx.MBeanContainer;
-import org.eclipse.jetty.server.HttpConfiguration;
-import org.eclipse.jetty.server.HttpConnectionFactory;
-import org.eclipse.jetty.server.SecureRequestCustomizer;
-import org.eclipse.jetty.server.Server;
-import org.eclipse.jetty.server.ServerConnector;
-import org.eclipse.jetty.server.SslConnectionFactory;
-import org.eclipse.jetty.servlet.ServletContextHandler;
-import org.eclipse.jetty.servlet.ServletHolder;
-import org.eclipse.jetty.util.ssl.SslContextFactory;
-import org.eclipse.jetty.util.thread.QueuedThreadPool;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * A source which accepts Flume Events by HTTP POST and GET. GET should be used
- * for experimentation only. HTTP requests are converted into flume events by a
- * pluggable "handler" which must implement the
- * {@linkplain HTTPSourceHandler} interface. This handler takes a
- * {@linkplain HttpServletRequest} and returns a list of flume events.
- *
- * The source accepts the following parameters: <p> <tt>port</tt>: port to 
which
- * the server should bind. Mandatory <p> <tt>handler</tt>: the class that
- * deserializes a HttpServletRequest into a list of flume events. This class
- * must implement HTTPSourceHandler. Default:
- * {@linkplain JSONHandler}. <p> <tt>handler.*</tt> Any configuration
- * to be passed to the handler. <p>
- *
- * All events deserialized from one Http request are committed to the channel 
in
- * one transaction, thus allowing for increased efficiency on channels like the
- * file channel. If the handler throws an exception this source will return
- * a HTTP status of 400. If the channel is full, or the source is unable to
- * append events to the channel, the source will return a HTTP 503 - 
Temporarily
- * unavailable status.
- *
- * A JSON handler which converts JSON objects to Flume events is provided.
- *
- */
-public class HTTPSource extends SslContextAwareAbstractSource implements 
EventDrivenSource, Configurable {
-    /*
-     * There are 2 ways of doing this:
-     * a. Have a static server instance and use connectors in each source
-     *    which binds to the port defined for that source.
-     * b. Each source starts its own server instance, which binds to the 
source's
-     *    port.
-     *
-     * b is more efficient than a because Jetty does not allow binding a
-     * servlet to a connector. So each request will need to go through each
-     * each of the handlers/servlet till the correct one is found.
-     *
-     */
-
-    private static final Logger LOG = 
LoggerFactory.getLogger(HTTPSource.class);
-    private volatile Integer port;
-    private volatile Server srv;
-    private volatile String host;
-    private HTTPSourceHandler handler;
-    private SourceCounter sourceCounter;
-
-    private Context sourceContext;
-
-    @Override
-    public void configure(Context context) {
-        configureSsl(context);
-        sourceContext = context;
-        try {
-            port = 
context.getInteger(HTTPSourceConfigurationConstants.CONFIG_PORT);
-            host = context.getString(
-                    HTTPSourceConfigurationConstants.CONFIG_BIND, 
HTTPSourceConfigurationConstants.DEFAULT_BIND);
-
-            Preconditions.checkState(host != null && !host.isEmpty(), 
"HTTPSource hostname specified is empty");
-            Preconditions.checkNotNull(port, "HTTPSource requires a port 
number to be" + " specified");
-
-            String handlerClassName = context.getString(
-                            HTTPSourceConfigurationConstants.CONFIG_HANDLER,
-                            HTTPSourceConfigurationConstants.DEFAULT_HANDLER)
-                    .trim();
-
-            @SuppressWarnings("unchecked")
-            Class<? extends HTTPSourceHandler> clazz =
-                    (Class<? extends HTTPSourceHandler>) 
Class.forName(handlerClassName);
-            handler = clazz.getDeclaredConstructor().newInstance();
-
-            Map<String, String> subProps =
-                    
context.getSubProperties(HTTPSourceConfigurationConstants.CONFIG_HANDLER_PREFIX);
-            handler.configure(new Context(subProps));
-        } catch (ClassNotFoundException ex) {
-            LOG.error("Error while configuring HTTPSource. Exception 
follows.", ex);
-            Throwables.propagate(ex);
-        } catch (ClassCastException ex) {
-            LOG.error("Deserializer is not an instance of HTTPSourceHandler."
-                    + "Deserializer must implement HTTPSourceHandler.");
-            Throwables.propagate(ex);
-        } catch (Exception ex) {
-            LOG.error("Error configuring HTTPSource!", ex);
-            Throwables.propagate(ex);
-        }
-        if (sourceCounter == null) {
-            sourceCounter = new SourceCounter(getName());
-        }
-    }
-
-    @Override
-    public void start() {
-        Preconditions.checkState(
-                srv == null,
-                "Running HTTP Server found in source: " + getName()
-                        + " before I started one."
-                        + "Will not attempt to start.");
-        QueuedThreadPool threadPool = new QueuedThreadPool();
-        if (sourceContext.getSubProperties("QueuedThreadPool.").size() > 0) {
-            FlumeBeanConfigurator.setConfigurationFields(threadPool, 
sourceContext);
-        }
-        srv = new Server(threadPool);
-
-        // Register with JMX for advanced monitoring
-        MBeanContainer mbContainer = new 
MBeanContainer(ManagementFactory.getPlatformMBeanServer());
-        srv.addEventListener(mbContainer);
-        srv.addBean(mbContainer);
-
-        HttpConfiguration httpConfiguration = new HttpConfiguration();
-        httpConfiguration.addCustomizer(new SecureRequestCustomizer());
-
-        FlumeBeanConfigurator.setConfigurationFields(httpConfiguration, 
sourceContext);
-        ServerConnector connector = getSslContextSupplier()
-                .get()
-                .map(sslContext -> {
-                    SslContextFactory sslCtxFactory = new SslContextFactory();
-                    sslCtxFactory.setSslContext(sslContext);
-                    
sslCtxFactory.setExcludeProtocols(getExcludeProtocols().toArray(new String[] 
{}));
-                    
sslCtxFactory.setIncludeProtocols(getIncludeProtocols().toArray(new String[] 
{}));
-                    sslCtxFactory.setExcludeCipherSuites(
-                            getExcludeCipherSuites().toArray(new String[] {}));
-                    sslCtxFactory.setIncludeCipherSuites(
-                            getIncludeCipherSuites().toArray(new String[] {}));
-
-                    
FlumeBeanConfigurator.setConfigurationFields(sslCtxFactory, sourceContext);
-
-                    httpConfiguration.setSecurePort(port);
-                    httpConfiguration.setSecureScheme("https");
-
-                    return new ServerConnector(
-                            srv,
-                            new SslConnectionFactory(sslCtxFactory, 
HttpVersion.HTTP_1_1.asString()),
-                            new HttpConnectionFactory(httpConfiguration));
-                })
-                .orElse(new ServerConnector(srv, new 
HttpConnectionFactory(httpConfiguration)));
-
-        connector.setPort(port);
-        connector.setHost(host);
-        connector.setReuseAddress(true);
-
-        FlumeBeanConfigurator.setConfigurationFields(connector, sourceContext);
-
-        srv.addConnector(connector);
-
-        try {
-            ServletContextHandler context = new 
ServletContextHandler(ServletContextHandler.SESSIONS);
-            context.setContextPath("/");
-            srv.setHandler(context);
-
-            context.addServlet(new ServletHolder(new FlumeHTTPServlet()), "/");
-            
context.setSecurityHandler(HTTPServerConstraintUtil.enforceConstraints());
-            srv.start();
-        } catch (Exception ex) {
-            LOG.error("Error while starting HTTPSource. Exception follows.", 
ex);
-            Throwables.propagate(ex);
-        }
-        Preconditions.checkArgument(srv.isRunning());
-        sourceCounter.start();
-        super.start();
-    }
-
-    @Override
-    public void stop() {
-        try {
-            srv.stop();
-            srv.join();
-            srv = null;
-        } catch (Exception ex) {
-            LOG.error("Error while stopping HTTPSource. Exception follows.", 
ex);
-        }
-        sourceCounter.stop();
-        LOG.info("Http source {} stopped. Metrics: {}", getName(), 
sourceCounter);
-    }
-
-    private class FlumeHTTPServlet extends HttpServlet {
-
-        private static final long serialVersionUID = 4891924863218790344L;
-
-        @Override
-        public void doPost(HttpServletRequest request, HttpServletResponse 
response) throws IOException {
-            List<Event> events = Collections.emptyList(); // create empty list
-            try {
-                events = handler.getEvents(request);
-            } catch (HTTPBadRequestException ex) {
-                LOG.warn("Received bad request from client. ", ex);
-                sourceCounter.incrementEventReadFail();
-                response.sendError(HttpServletResponse.SC_BAD_REQUEST, "Bad 
request from client.");
-                return;
-            } catch (Exception ex) {
-                LOG.warn("Deserializer threw unexpected exception. ", ex);
-                sourceCounter.incrementEventReadFail();
-                response.sendError(
-                        HttpServletResponse.SC_INTERNAL_SERVER_ERROR, 
"Deserializer threw unexpected exception.");
-                return;
-            }
-            sourceCounter.incrementAppendBatchReceivedCount();
-            sourceCounter.addToEventReceivedCount(events.size());
-            try {
-                getChannelProcessor().processEventBatch(events);
-            } catch (ChannelException ex) {
-                LOG.warn(
-                        "Error appending event to channel. "
-                                + "Channel might be full. Consider increasing 
the channel "
-                                + "capacity or make sure the sinks perform 
faster.",
-                        ex);
-                sourceCounter.incrementChannelWriteFail();
-                response.sendError(
-                        HttpServletResponse.SC_SERVICE_UNAVAILABLE,
-                        "Error appending event to channel. Channel might be 
full.");
-                return;
-            } catch (Exception ex) {
-                LOG.warn("Unexpected error appending event to channel. ", ex);
-                sourceCounter.incrementGenericProcessingFail();
-                response.sendError(
-                        HttpServletResponse.SC_INTERNAL_SERVER_ERROR,
-                        "Unexpected error while appending event to channel.");
-                return;
-            }
-            response.setCharacterEncoding(request.getCharacterEncoding());
-            response.setStatus(HttpServletResponse.SC_OK);
-            response.flushBuffer();
-            sourceCounter.incrementAppendBatchAcceptedCount();
-            sourceCounter.addToEventAcceptedCount(events.size());
-        }
-
-        @Override
-        public void doGet(HttpServletRequest request, HttpServletResponse 
response) throws IOException {
-            doPost(request, response);
-        }
-    }
-
-    @Override
-    protected void configureSsl(Context context) {
-        handleDeprecatedParameter(context, "ssl", "enableSSL");
-        handleDeprecatedParameter(context, "exclude-protocols", 
"excludeProtocols");
-        handleDeprecatedParameter(context, "keystore-password", 
"keystorePassword");
-
-        super.configureSsl(context);
-    }
-
-    private void handleDeprecatedParameter(Context context, String newParam, 
String oldParam) {
-        if (!context.containsKey(newParam) && context.containsKey(oldParam)) {
-            context.put(newParam, context.getString(oldParam));
-        }
-    }
-}
diff --git 
a/flume-ng-core/src/main/java/org/apache/flume/source/http/HTTPSourceConfigurationConstants.java
 
b/flume-ng-core/src/main/java/org/apache/flume/source/http/HTTPSourceConfigurationConstants.java
deleted file mode 100644
index 0c6484ded..000000000
--- 
a/flume-ng-core/src/main/java/org/apache/flume/source/http/HTTPSourceConfigurationConstants.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to you under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flume.source.http;
-
-/**
- *
- */
-public class HTTPSourceConfigurationConstants {
-
-    public static final String CONFIG_PORT = "port";
-    public static final String CONFIG_HANDLER = "handler";
-    public static final String CONFIG_HANDLER_PREFIX = CONFIG_HANDLER + ".";
-
-    public static final String CONFIG_BIND = "bind";
-
-    public static final String DEFAULT_BIND = "0.0.0.0";
-
-    public static final String DEFAULT_HANDLER = 
"org.apache.flume.source.http.JSONHandler";
-
-    @Deprecated
-    public static final String SSL_KEYSTORE = "keystore";
-
-    @Deprecated
-    public static final String SSL_KEYSTORE_PASSWORD = "keystorePassword";
-
-    @Deprecated
-    public static final String SSL_ENABLED = "enableSSL";
-
-    @Deprecated
-    public static final String EXCLUDE_PROTOCOLS = "excludeProtocols";
-}
diff --git 
a/flume-ng-core/src/main/java/org/apache/flume/source/http/HTTPSourceHandler.java
 
b/flume-ng-core/src/main/java/org/apache/flume/source/http/HTTPSourceHandler.java
deleted file mode 100644
index b0aa5a079..000000000
--- 
a/flume-ng-core/src/main/java/org/apache/flume/source/http/HTTPSourceHandler.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to you under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flume.source.http;
-
-import java.util.List;
-import javax.servlet.http.HttpServletRequest;
-import org.apache.flume.Event;
-import org.apache.flume.conf.Configurable;
-
-/**
- *
- */
-public interface HTTPSourceHandler extends Configurable {
-
-    /**
-     * Takes an {@linkplain HttpServletRequest} and returns a list of Flume
-     * Events. If this request cannot be parsed into Flume events based on the
-     * format this method will throw an exception. This method may also throw 
an
-     * exception if there is some sort of other error. <p>
-     *
-     * @param request The request to be parsed into Flume events.
-     * @return List of Flume events generated from the request.
-     * @throws HTTPBadRequestException If the was not parsed correctly into an
-     * event because the request was not in the expected format.
-     * @throws Exception If there was an unexpected error.
-     */
-    public List<Event> getEvents(HttpServletRequest request) throws 
HTTPBadRequestException, Exception;
-}
diff --git 
a/flume-ng-core/src/main/java/org/apache/flume/source/http/JSONHandler.java 
b/flume-ng-core/src/main/java/org/apache/flume/source/http/JSONHandler.java
deleted file mode 100644
index 961766a0f..000000000
--- a/flume-ng-core/src/main/java/org/apache/flume/source/http/JSONHandler.java
+++ /dev/null
@@ -1,133 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to you under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flume.source.http;
-
-import com.google.gson.Gson;
-import com.google.gson.GsonBuilder;
-import com.google.gson.JsonSyntaxException;
-import com.google.gson.reflect.TypeToken;
-import java.io.BufferedReader;
-import java.lang.reflect.Type;
-import java.nio.charset.UnsupportedCharsetException;
-import java.util.ArrayList;
-import java.util.List;
-import javax.servlet.http.HttpServletRequest;
-import org.apache.flume.Context;
-import org.apache.flume.Event;
-import org.apache.flume.event.EventBuilder;
-import org.apache.flume.event.JSONEvent;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * JSONHandler for HTTPSource that accepts an array of events.
- *
- * This handler throws exception if the deserialization fails because of bad
- * format or any other reason.
- *
- * Each event must be encoded as a map with two key-value pairs. <p> 1. headers
- * - the key for this key-value pair is "headers". The value for this key is
- * another map, which represent the event headers. These headers are inserted
- * into the Flume event as is. <p> 2. body - The body is a string which
- * represents the body of the event. The key for this key-value pair is "body".
- * All key-value pairs are considered to be headers. An example: <p> 
[{"headers"
- * : {"a":"b", "c":"d"},"body": "random_body"}, {"headers" : {"e": "f"},"body":
- * "random_body2"}] <p> would be interpreted as the following two flume events:
- * <p> * Event with body: "random_body" (in UTF-8/UTF-16/UTF-32 encoded bytes)
- * and headers : (a:b, c:d) <p> *
- * Event with body: "random_body2" (in UTF-8/UTF-16/UTF-32 encoded bytes) and
- * headers : (e:f) <p>
- *
- * The charset of the body is read from the request and used. If no charset is
- * set in the request, then the charset is assumed to be JSON's default - 
UTF-8.
- * The JSON handler supports UTF-8, UTF-16 and UTF-32.
- *
- * To set the charset, the request must have content type specified as
- * "application/json; charset=UTF-8" (replace UTF-8 with UTF-16 or UTF-32 as
- * required).
- *
- * One way to create an event in the format expected by this handler, is to
- * use {@linkplain JSONEvent} and use {@linkplain Gson} to create the JSON
- * string using the
- * {@linkplain Gson#toJson(java.lang.Object, java.lang.reflect.Type) }
- * method. The type token to pass as the 2nd argument of this method
- * for list of events can be created by: <p>
- * {@code
- * Type type = new TypeToken<List<JSONEvent>>() {}.getType();
- * }
- */
-public class JSONHandler implements HTTPSourceHandler {
-
-    private static final Logger LOG = 
LoggerFactory.getLogger(JSONHandler.class);
-    private final Type listType = new TypeToken<List<JSONEvent>>() 
{}.getType();
-    private final Gson gson;
-
-    public JSONHandler() {
-        gson = new GsonBuilder().disableHtmlEscaping().create();
-    }
-
-    /**
-     * {@inheritDoc}
-     */
-    @Override
-    public List<Event> getEvents(HttpServletRequest request) throws Exception {
-        BufferedReader reader = request.getReader();
-        String charset = request.getCharacterEncoding();
-        // UTF-8 is default for JSON. If no charset is specified, UTF-8 is to
-        // be assumed.
-        if (charset == null) {
-            LOG.debug("Charset is null, default charset of UTF-8 will be 
used.");
-            charset = "UTF-8";
-        } else if (!(charset.equalsIgnoreCase("utf-8")
-                || charset.equalsIgnoreCase("utf-16")
-                || charset.equalsIgnoreCase("utf-32"))) {
-            LOG.error(
-                    "Unsupported character set in request {}. "
-                            + "JSON handler supports UTF-8, "
-                            + "UTF-16 and UTF-32 only.",
-                    charset);
-            throw new UnsupportedCharsetException("JSON handler supports 
UTF-8, " + "UTF-16 and UTF-32 only.");
-        }
-
-        /*
-         * Gson throws Exception if the data is not parseable to JSON.
-         * Need not catch it since the source will catch it and return error.
-         */
-        List<Event> eventList = new ArrayList<Event>(0);
-        try {
-            eventList = gson.fromJson(reader, listType);
-        } catch (JsonSyntaxException ex) {
-            throw new HTTPBadRequestException("Request has invalid JSON 
Syntax.", ex);
-        }
-
-        for (Event e : eventList) {
-            ((JSONEvent) e).setCharset(charset);
-        }
-        return getSimpleEvents(eventList);
-    }
-
-    @Override
-    public void configure(Context context) {}
-
-    private List<Event> getSimpleEvents(List<Event> events) {
-        List<Event> newEvents = new ArrayList<Event>(events.size());
-        for (Event e : events) {
-            newEvents.add(EventBuilder.withBody(e.getBody(), e.getHeaders()));
-        }
-        return newEvents;
-    }
-}
diff --git 
a/flume-ng-core/src/main/java/org/apache/flume/tools/HTTPServerConstraintUtil.java
 
b/flume-ng-core/src/main/java/org/apache/flume/tools/HTTPServerConstraintUtil.java
index 8dfef6a54..d1ba83bac 100644
--- 
a/flume-ng-core/src/main/java/org/apache/flume/tools/HTTPServerConstraintUtil.java
+++ 
b/flume-ng-core/src/main/java/org/apache/flume/tools/HTTPServerConstraintUtil.java
@@ -16,11 +16,9 @@
  */
 package org.apache.flume.tools;
 
-import org.eclipse.jetty.security.ConstraintMapping;
-import org.eclipse.jetty.security.ConstraintSecurityHandler;
-import org.eclipse.jetty.util.security.Constraint;
-
-// Most of the code in this class is copied from HBASE-10473
+import org.eclipse.jetty.ee11.servlet.security.ConstraintMapping;
+import org.eclipse.jetty.ee11.servlet.security.ConstraintSecurityHandler;
+import org.eclipse.jetty.security.Constraint;
 
 /**
  * Utility class to define constraints on Jetty HTTP servers
@@ -34,22 +32,24 @@ public class HTTPServerConstraintUtil {
      * @return ConstraintSecurityHandler for use with Jetty servlet
      */
     public static ConstraintSecurityHandler enforceConstraints() {
-        Constraint c = new Constraint();
-        c.setAuthenticate(true);
+        // 1. Create a constraint that denies TRACE and OPTIONS access
+        Constraint constraint = Constraint.from("Deny Methods", 
Constraint.Authorization.FORBIDDEN);
 
-        ConstraintMapping cmt = new ConstraintMapping();
-        cmt.setConstraint(c);
-        cmt.setMethod("TRACE");
-        cmt.setPathSpec("/*");
+        // 2. Map the constraint to methods TRACE and OPTIONS on all paths
+        ConstraintMapping traceMapping = new ConstraintMapping();
+        traceMapping.setPathSpec("/*");
+        traceMapping.setMethod("TRACE");
+        traceMapping.setConstraint(constraint);
 
-        ConstraintMapping cmo = new ConstraintMapping();
-        cmo.setConstraint(c);
-        cmo.setMethod("OPTIONS");
-        cmo.setPathSpec("/*");
+        ConstraintMapping optionsMapping = new ConstraintMapping();
+        optionsMapping.setPathSpec("/*");
+        optionsMapping.setMethod("OPTIONS");
+        optionsMapping.setConstraint(constraint);
 
-        ConstraintSecurityHandler sh = new ConstraintSecurityHandler();
-        sh.setConstraintMappings(new ConstraintMapping[] {cmt, cmo});
+        // 3. Configure the ConstraintSecurityHandler
+        ConstraintSecurityHandler securityHandler = new 
ConstraintSecurityHandler();
+        securityHandler.setConstraintMappings(new 
ConstraintMapping[]{traceMapping, optionsMapping});
 
-        return sh;
+        return securityHandler;
     }
 }
diff --git 
a/flume-ng-core/src/test/java/org/apache/flume/instrumentation/http/TestHTTPMetricsServer.java
 
b/flume-ng-core/src/test/java/org/apache/flume/instrumentation/http/TestHTTPMetricsServer.java
index 3cb5e4bd8..94b1ebe22 100644
--- 
a/flume-ng-core/src/test/java/org/apache/flume/instrumentation/http/TestHTTPMetricsServer.java
+++ 
b/flume-ng-core/src/test/java/org/apache/flume/instrumentation/http/TestHTTPMetricsServer.java
@@ -24,7 +24,7 @@ import java.lang.reflect.Type;
 import java.net.HttpURLConnection;
 import java.net.URL;
 import java.util.Map;
-import javax.servlet.http.HttpServletResponse;
+import jakarta.servlet.http.HttpServletResponse;
 import org.apache.flume.Context;
 import org.apache.flume.instrumentation.MonitorService;
 import org.apache.flume.instrumentation.util.JMXTestUtils;
diff --git 
a/flume-ng-core/src/test/java/org/apache/flume/source/TestDefaultSourceFactory.java
 
b/flume-ng-core/src/test/java/org/apache/flume/source/TestDefaultSourceFactory.java
index dfd7fd7a7..b4f0e58f1 100644
--- 
a/flume-ng-core/src/test/java/org/apache/flume/source/TestDefaultSourceFactory.java
+++ 
b/flume-ng-core/src/test/java/org/apache/flume/source/TestDefaultSourceFactory.java
@@ -18,7 +18,6 @@ package org.apache.flume.source;
 
 import org.apache.flume.Source;
 import org.apache.flume.SourceFactory;
-import org.apache.flume.source.http.HTTPSource;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
@@ -63,13 +62,9 @@ public class TestDefaultSourceFactory {
         verifySourceCreation("netcat-src", "netcat", NetcatSource.class);
         verifySourceCreation("netcat-udp-src", "netcatudp", 
NetcatUdpSource.class);
         verifySourceCreation("exec-src", "exec", ExecSource.class);
-        // verifySourceCreation("avro-src", "avro", AvroSource.class);
         verifySourceCreation("syslogtcp-src", "syslogtcp", 
SyslogTcpSource.class);
         verifySourceCreation("multiport_syslogtcp-src", "multiport_syslogtcp", 
MultiportSyslogTCPSource.class);
         verifySourceCreation("syslogudp-src", "syslogudp", 
SyslogUDPSource.class);
-        // verifySourceCreation("spooldir-src", "spooldir", 
SpoolDirectorySource.class);
-        verifySourceCreation("http-src", "http", HTTPSource.class);
-        // verifySourceCreation("thrift-src", "thrift", ThriftSource.class);
         verifySourceCreation("custom-src", 
MockSource.class.getCanonicalName(), MockSource.class);
     }
 }
diff --git 
a/flume-ng-core/src/test/java/org/apache/flume/source/http/FlumeHttpServletRequestWrapper.java
 
b/flume-ng-core/src/test/java/org/apache/flume/source/http/FlumeHttpServletRequestWrapper.java
deleted file mode 100644
index e13b739fa..000000000
--- 
a/flume-ng-core/src/test/java/org/apache/flume/source/http/FlumeHttpServletRequestWrapper.java
+++ /dev/null
@@ -1,406 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to you under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flume.source.http;
-
-import java.io.BufferedReader;
-import java.io.ByteArrayInputStream;
-import java.io.IOException;
-import java.io.InputStreamReader;
-import java.io.UnsupportedEncodingException;
-import java.security.Principal;
-import java.util.Collection;
-import java.util.Enumeration;
-import java.util.Locale;
-import java.util.Map;
-import javax.servlet.AsyncContext;
-import javax.servlet.DispatcherType;
-import javax.servlet.RequestDispatcher;
-import javax.servlet.ServletContext;
-import javax.servlet.ServletException;
-import javax.servlet.ServletInputStream;
-import javax.servlet.ServletRequest;
-import javax.servlet.ServletResponse;
-import javax.servlet.http.Cookie;
-import javax.servlet.http.HttpServletRequest;
-import javax.servlet.http.HttpServletResponse;
-import javax.servlet.http.HttpSession;
-import javax.servlet.http.HttpUpgradeHandler;
-import javax.servlet.http.Part;
-
-/**
- *
- */
-class FlumeHttpServletRequestWrapper implements HttpServletRequest {
-
-    private BufferedReader reader;
-
-    String charset;
-
-    public FlumeHttpServletRequestWrapper(String data, String charset) throws 
UnsupportedEncodingException {
-        reader = new BufferedReader(new InputStreamReader(new 
ByteArrayInputStream(data.getBytes(charset)), charset));
-        this.charset = charset;
-    }
-
-    public FlumeHttpServletRequestWrapper(String data) throws 
UnsupportedEncodingException {
-        this(data, "UTF-8");
-    }
-
-    @Override
-    public String getAuthType() {
-        throw new UnsupportedOperationException("Not supported yet.");
-    }
-
-    @Override
-    public Cookie[] getCookies() {
-        throw new UnsupportedOperationException("Not supported yet.");
-    }
-
-    @Override
-    public long getDateHeader(String name) {
-        throw new UnsupportedOperationException("Not supported yet.");
-    }
-
-    @Override
-    public String getHeader(String name) {
-        throw new UnsupportedOperationException("Not supported yet.");
-    }
-
-    @Override
-    public Enumeration<String> getHeaders(String name) {
-        throw new UnsupportedOperationException("Not supported yet.");
-    }
-
-    @Override
-    public Enumeration<String> getHeaderNames() {
-        throw new UnsupportedOperationException("Not supported yet.");
-    }
-
-    @Override
-    public int getIntHeader(String name) {
-        throw new UnsupportedOperationException("Not supported yet.");
-    }
-
-    @Override
-    public String getMethod() {
-        throw new UnsupportedOperationException("Not supported yet.");
-    }
-
-    @Override
-    public String getPathInfo() {
-        throw new UnsupportedOperationException("Not supported yet.");
-    }
-
-    @Override
-    public String getPathTranslated() {
-        throw new UnsupportedOperationException("Not supported yet.");
-    }
-
-    @Override
-    public String getContextPath() {
-        throw new UnsupportedOperationException("Not supported yet.");
-    }
-
-    @Override
-    public String getQueryString() {
-        throw new UnsupportedOperationException("Not supported yet.");
-    }
-
-    @Override
-    public String getRemoteUser() {
-        throw new UnsupportedOperationException("Not supported yet.");
-    }
-
-    @Override
-    public boolean isUserInRole(String role) {
-        throw new UnsupportedOperationException("Not supported yet.");
-    }
-
-    @Override
-    public Principal getUserPrincipal() {
-        throw new UnsupportedOperationException("Not supported yet.");
-    }
-
-    @Override
-    public String getRequestedSessionId() {
-        throw new UnsupportedOperationException("Not supported yet.");
-    }
-
-    @Override
-    public String getRequestURI() {
-        throw new UnsupportedOperationException("Not supported yet.");
-    }
-
-    @Override
-    public StringBuffer getRequestURL() {
-        throw new UnsupportedOperationException("Not supported yet.");
-    }
-
-    @Override
-    public String getServletPath() {
-        throw new UnsupportedOperationException("Not supported yet.");
-    }
-
-    @Override
-    public HttpSession getSession(boolean create) {
-        throw new UnsupportedOperationException("Not supported yet.");
-    }
-
-    @Override
-    public HttpSession getSession() {
-        throw new UnsupportedOperationException("Not supported yet.");
-    }
-
-    @Override
-    public boolean isRequestedSessionIdValid() {
-        throw new UnsupportedOperationException("Not supported yet.");
-    }
-
-    @Override
-    public boolean isRequestedSessionIdFromCookie() {
-        throw new UnsupportedOperationException("Not supported yet.");
-    }
-
-    @Override
-    public boolean isRequestedSessionIdFromURL() {
-        throw new UnsupportedOperationException("Not supported yet.");
-    }
-
-    @Override
-    public boolean isRequestedSessionIdFromUrl() {
-        throw new UnsupportedOperationException("Not supported yet.");
-    }
-
-    @Override
-    public Object getAttribute(String name) {
-        throw new UnsupportedOperationException("Not supported yet.");
-    }
-
-    @Override
-    public Enumeration<String> getAttributeNames() {
-        throw new UnsupportedOperationException("Not supported yet.");
-    }
-
-    @Override
-    public String getCharacterEncoding() {
-        return charset;
-    }
-
-    @Override
-    public void setCharacterEncoding(String env) throws 
UnsupportedEncodingException {
-        this.charset = env;
-    }
-
-    @Override
-    public int getContentLength() {
-        throw new UnsupportedOperationException("Not supported yet.");
-    }
-
-    @Override
-    public String getContentType() {
-        throw new UnsupportedOperationException("Not supported yet.");
-    }
-
-    @Override
-    public ServletInputStream getInputStream() throws IOException {
-        throw new UnsupportedOperationException("Not supported yet.");
-    }
-
-    @Override
-    public String getParameter(String name) {
-        throw new UnsupportedOperationException("Not supported yet.");
-    }
-
-    @Override
-    public Enumeration<String> getParameterNames() {
-        throw new UnsupportedOperationException("Not supported yet.");
-    }
-
-    @Override
-    public String[] getParameterValues(String name) {
-        throw new UnsupportedOperationException("Not supported yet.");
-    }
-
-    @Override
-    public Map<String, String[]> getParameterMap() {
-        throw new UnsupportedOperationException("Not supported yet.");
-    }
-
-    @Override
-    public String getProtocol() {
-        throw new UnsupportedOperationException("Not supported yet.");
-    }
-
-    @Override
-    public String getScheme() {
-        throw new UnsupportedOperationException("Not supported yet.");
-    }
-
-    @Override
-    public String getServerName() {
-        throw new UnsupportedOperationException("Not supported yet.");
-    }
-
-    @Override
-    public int getServerPort() {
-        throw new UnsupportedOperationException("Not supported yet.");
-    }
-
-    @Override
-    public BufferedReader getReader() throws IOException {
-        return reader;
-    }
-
-    @Override
-    public String getRemoteAddr() {
-        throw new UnsupportedOperationException("Not supported yet.");
-    }
-
-    @Override
-    public String getRemoteHost() {
-        throw new UnsupportedOperationException("Not supported yet.");
-    }
-
-    @Override
-    public void setAttribute(String name, Object o) {
-        throw new UnsupportedOperationException("Not supported yet.");
-    }
-
-    @Override
-    public void removeAttribute(String name) {
-        throw new UnsupportedOperationException("Not supported yet.");
-    }
-
-    @Override
-    public Locale getLocale() {
-        throw new UnsupportedOperationException("Not supported yet.");
-    }
-
-    @Override
-    public Enumeration<Locale> getLocales() {
-        throw new UnsupportedOperationException("Not supported yet.");
-    }
-
-    @Override
-    public boolean isSecure() {
-        throw new UnsupportedOperationException("Not supported yet.");
-    }
-
-    @Override
-    public RequestDispatcher getRequestDispatcher(String path) {
-        throw new UnsupportedOperationException("Not supported yet.");
-    }
-
-    @Override
-    public String getRealPath(String path) {
-        throw new UnsupportedOperationException("Not supported yet.");
-    }
-
-    @Override
-    public int getRemotePort() {
-        throw new UnsupportedOperationException("Not supported yet.");
-    }
-
-    @Override
-    public String getLocalName() {
-        throw new UnsupportedOperationException("Not supported yet.");
-    }
-
-    @Override
-    public String getLocalAddr() {
-        throw new UnsupportedOperationException("Not supported yet.");
-    }
-
-    @Override
-    public int getLocalPort() {
-        throw new UnsupportedOperationException("Not supported yet.");
-    }
-
-    @Override
-    public AsyncContext getAsyncContext() {
-        throw new UnsupportedOperationException("Not supported yet.");
-    }
-
-    @Override
-    public long getContentLengthLong() {
-        throw new UnsupportedOperationException("Not supported yet.");
-    }
-
-    @Override
-    public DispatcherType getDispatcherType() {
-        throw new UnsupportedOperationException("Not supported yet.");
-    }
-
-    @Override
-    public ServletContext getServletContext() {
-        throw new UnsupportedOperationException("Not supported yet.");
-    }
-
-    @Override
-    public boolean isAsyncStarted() {
-        throw new UnsupportedOperationException("Not supported yet.");
-    }
-
-    @Override
-    public boolean isAsyncSupported() {
-        throw new UnsupportedOperationException("Not supported yet.");
-    }
-
-    @Override
-    public AsyncContext startAsync() throws IllegalStateException {
-        throw new UnsupportedOperationException("Not supported yet.");
-    }
-
-    @Override
-    public AsyncContext startAsync(ServletRequest arg0, ServletResponse arg1) 
throws IllegalStateException {
-        throw new UnsupportedOperationException("Not supported yet.");
-    }
-
-    @Override
-    public boolean authenticate(HttpServletResponse arg0) throws IOException, 
ServletException {
-        throw new UnsupportedOperationException("Not supported yet.");
-    }
-
-    @Override
-    public String changeSessionId() {
-        throw new UnsupportedOperationException("Not supported yet.");
-    }
-
-    @Override
-    public Part getPart(String arg0) throws IOException, ServletException {
-        throw new UnsupportedOperationException("Not supported yet.");
-    }
-
-    @Override
-    public Collection<Part> getParts() throws IOException, ServletException {
-        throw new UnsupportedOperationException("Not supported yet.");
-    }
-
-    @Override
-    public void login(String arg0, String arg1) throws ServletException {
-        throw new UnsupportedOperationException("Not supported yet.");
-    }
-
-    @Override
-    public void logout() throws ServletException {
-        throw new UnsupportedOperationException("Not supported yet.");
-    }
-
-    @Override
-    public <T extends HttpUpgradeHandler> T upgrade(Class<T> arg0) throws 
IOException, ServletException {
-        throw new UnsupportedOperationException("Not supported yet.");
-    }
-}
diff --git 
a/flume-ng-core/src/test/java/org/apache/flume/source/http/TestBLOBHandler.java 
b/flume-ng-core/src/test/java/org/apache/flume/source/http/TestBLOBHandler.java
deleted file mode 100644
index 655bd7d32..000000000
--- 
a/flume-ng-core/src/test/java/org/apache/flume/source/http/TestBLOBHandler.java
+++ /dev/null
@@ -1,173 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to you under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flume.source.http;
-
-import static org.junit.Assert.assertEquals;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-import java.io.ByteArrayInputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import javax.servlet.ReadListener;
-import javax.servlet.ServletInputStream;
-import javax.servlet.http.HttpServletRequest;
-import org.apache.flume.Context;
-import org.apache.flume.Event;
-import org.junit.Before;
-import org.junit.Test;
-
-/**
- *
- */
-public class TestBLOBHandler {
-
-    HTTPSourceHandler handler;
-
-    @Before
-    public void setUp() {
-        handler = new BLOBHandler();
-    }
-
-    @SuppressWarnings({"unchecked", "rawtypes"})
-    @Test
-    public void testCSVData() throws Exception {
-        Map requestParameterMap = new HashMap();
-        requestParameterMap.put("param1", new String[] {"value1"});
-        requestParameterMap.put("param2", new String[] {"value2"});
-
-        HttpServletRequest req = mock(HttpServletRequest.class);
-        final String csvData = "a,b,c";
-
-        ServletInputStream servletInputStream =
-                new DelegatingServletInputStream(new 
ByteArrayInputStream(csvData.getBytes()));
-
-        when(req.getInputStream()).thenReturn(servletInputStream);
-        when(req.getParameterMap()).thenReturn(requestParameterMap);
-
-        Context context = mock(Context.class);
-        when(context.getString(BLOBHandler.MANDATORY_PARAMETERS, 
BLOBHandler.DEFAULT_MANDATORY_PARAMETERS))
-                .thenReturn("param1,param2");
-
-        handler.configure(context);
-        List<Event> deserialized = handler.getEvents(req);
-        assertEquals(1, deserialized.size());
-        Event e = deserialized.get(0);
-
-        assertEquals(new String(e.getBody()), csvData);
-        assertEquals(e.getHeaders().get("param1"), "value1");
-        assertEquals(e.getHeaders().get("param2"), "value2");
-    }
-
-    @SuppressWarnings({"unchecked", "rawtypes"})
-    @Test
-    public void testTabData() throws Exception {
-        Map requestParameterMap = new HashMap();
-        requestParameterMap.put("param1", new String[] {"value1"});
-
-        HttpServletRequest req = mock(HttpServletRequest.class);
-        final String tabData = "a\tb\tc";
-
-        ServletInputStream servletInputStream =
-                new DelegatingServletInputStream(new 
ByteArrayInputStream(tabData.getBytes()));
-
-        when(req.getInputStream()).thenReturn(servletInputStream);
-        when(req.getParameterMap()).thenReturn(requestParameterMap);
-
-        Context context = mock(Context.class);
-        when(context.getString(BLOBHandler.MANDATORY_PARAMETERS, 
BLOBHandler.DEFAULT_MANDATORY_PARAMETERS))
-                .thenReturn("param1");
-
-        handler.configure(context);
-
-        List<Event> deserialized = handler.getEvents(req);
-        assertEquals(1, deserialized.size());
-        Event e = deserialized.get(0);
-
-        assertEquals(new String(e.getBody()), tabData);
-        assertEquals(e.getHeaders().get("param1"), "value1");
-    }
-
-    @SuppressWarnings({"rawtypes"})
-    @Test(expected = IllegalArgumentException.class)
-    public void testMissingParameters() throws Exception {
-        Map requestParameterMap = new HashMap();
-
-        HttpServletRequest req = mock(HttpServletRequest.class);
-        final String tabData = "a\tb\tc";
-
-        ServletInputStream servletInputStream =
-                new DelegatingServletInputStream(new 
ByteArrayInputStream(tabData.getBytes()));
-
-        when(req.getInputStream()).thenReturn(servletInputStream);
-        when(req.getParameterMap()).thenReturn(requestParameterMap);
-
-        Context context = mock(Context.class);
-        when(context.getString(BLOBHandler.MANDATORY_PARAMETERS, 
BLOBHandler.DEFAULT_MANDATORY_PARAMETERS))
-                .thenReturn("param1");
-
-        handler.configure(context);
-
-        handler.getEvents(req);
-    }
-
-    class DelegatingServletInputStream extends ServletInputStream {
-
-        private final InputStream sourceStream;
-
-        /**
-         * Create a DelegatingServletInputStream for the given source stream.
-         *
-         * @param sourceStream
-         *          the source stream (never <code>null</code>)
-         */
-        public DelegatingServletInputStream(InputStream sourceStream) {
-            this.sourceStream = sourceStream;
-        }
-
-        /**
-         * Return the underlying source stream (never <code>null</code>).
-         */
-        public final InputStream getSourceStream() {
-            return this.sourceStream;
-        }
-
-        public int read() throws IOException {
-            return this.sourceStream.read();
-        }
-
-        public void close() throws IOException {
-            super.close();
-            this.sourceStream.close();
-        }
-
-        public boolean isFinished() {
-            throw new UnsupportedOperationException("Not supported yet.");
-        }
-
-        public boolean isReady() {
-            throw new UnsupportedOperationException("Not supported yet.");
-        }
-
-        public void setReadListener(ReadListener arg0) {
-            throw new UnsupportedOperationException("Not supported yet.");
-        }
-    }
-}
diff --git 
a/flume-ng-core/src/test/java/org/apache/flume/source/http/TestHTTPSource.java 
b/flume-ng-core/src/test/java/org/apache/flume/source/http/TestHTTPSource.java
deleted file mode 100644
index 7fda74cf0..000000000
--- 
a/flume-ng-core/src/test/java/org/apache/flume/source/http/TestHTTPSource.java
+++ /dev/null
@@ -1,732 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to you under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flume.source.http;
-
-import static org.fest.reflect.core.Reflection.field;
-import static org.mockito.ArgumentMatchers.anyList;
-import static org.mockito.Mockito.doThrow;
-
-import com.google.common.collect.Maps;
-import com.google.gson.Gson;
-import com.google.gson.reflect.TypeToken;
-import java.io.IOException;
-import java.lang.management.ManagementFactory;
-import java.lang.reflect.Type;
-import java.net.HttpURLConnection;
-import java.net.InetAddress;
-import java.net.ServerSocket;
-import java.net.Socket;
-import java.net.URL;
-import java.net.UnknownHostException;
-import java.security.SecureRandom;
-import java.security.cert.CertificateException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-import java.util.Random;
-import java.util.Set;
-import javax.management.MBeanServer;
-import javax.management.MalformedObjectNameException;
-import javax.management.ObjectInstance;
-import javax.management.ObjectName;
-import javax.management.Query;
-import javax.management.QueryExp;
-import javax.net.ssl.HostnameVerifier;
-import javax.net.ssl.HttpsURLConnection;
-import javax.net.ssl.SSLContext;
-import javax.net.ssl.SSLSession;
-import javax.net.ssl.SSLSocket;
-import javax.net.ssl.TrustManager;
-import javax.net.ssl.X509TrustManager;
-import javax.servlet.http.HttpServletResponse;
-import org.apache.flume.Channel;
-import org.apache.flume.ChannelSelector;
-import org.apache.flume.Context;
-import org.apache.flume.Event;
-import org.apache.flume.Transaction;
-import org.apache.flume.channel.ChannelProcessor;
-import org.apache.flume.channel.MemoryChannel;
-import org.apache.flume.channel.ReplicatingChannelSelector;
-import org.apache.flume.conf.Configurables;
-import org.apache.flume.event.JSONEvent;
-import org.apache.flume.instrumentation.SourceCounter;
-import org.apache.flume.util.Whitebox;
-import org.apache.http.HttpResponse;
-import org.apache.http.client.HttpClient;
-import org.apache.http.client.methods.HttpOptions;
-import org.apache.http.client.methods.HttpPost;
-import org.apache.http.client.methods.HttpRequestBase;
-import org.apache.http.client.methods.HttpTrace;
-import org.apache.http.conn.ssl.NoopHostnameVerifier;
-import org.apache.http.entity.StringEntity;
-import org.apache.http.impl.client.HttpClientBuilder;
-import org.junit.After;
-import org.junit.AfterClass;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.Test;
-import org.mockito.Mockito;
-
-/**
- *
- */
-public class TestHTTPSource {
-
-    private static HTTPSource httpSource;
-    private static HTTPSource httpsSource;
-    private static HTTPSource httpsGlobalKeystoreSource;
-
-    private static Channel httpChannel;
-    private static Channel httpsChannel;
-    private static Channel httpsGlobalKeystoreChannel;
-    private static int httpPort;
-    private static int httpsPort;
-    private static int httpsGlobalKeystorePort;
-    private HttpClient httpClient;
-    private HttpPost postRequest;
-
-    private static int findFreePort() throws IOException {
-        ServerSocket socket = new ServerSocket(0);
-        int port = socket.getLocalPort();
-        socket.close();
-        return port;
-    }
-
-    private static Context getDefaultNonSecureContext(int port) throws 
IOException {
-        Context ctx = new Context();
-        ctx.put(HTTPSourceConfigurationConstants.CONFIG_BIND, "0.0.0.0");
-        ctx.put(HTTPSourceConfigurationConstants.CONFIG_PORT, 
String.valueOf(port));
-        ctx.put("QueuedThreadPool.MaxThreads", "100");
-        return ctx;
-    }
-
-    private static Context getDefaultSecureContext(int port) throws 
IOException {
-        Context sslContext = new Context();
-        sslContext.put(HTTPSourceConfigurationConstants.CONFIG_PORT, 
String.valueOf(port));
-        sslContext.put(HTTPSourceConfigurationConstants.SSL_ENABLED, "true");
-        sslContext.put(HTTPSourceConfigurationConstants.SSL_KEYSTORE_PASSWORD, 
"password");
-        sslContext.put(HTTPSourceConfigurationConstants.SSL_KEYSTORE, 
"src/test/resources/jettykeystore");
-        return sslContext;
-    }
-
-    private static Context getDefaultSecureContextGlobalKeystore(int port) 
throws IOException {
-        System.setProperty("javax.net.ssl.keyStore", 
"src/test/resources/jettykeystore");
-        System.setProperty("javax.net.ssl.keyStorePassword", "password");
-
-        Context sslContext = new Context();
-        sslContext.put(HTTPSourceConfigurationConstants.CONFIG_PORT, 
String.valueOf(port));
-        sslContext.put(HTTPSourceConfigurationConstants.SSL_ENABLED, "true");
-        return sslContext;
-    }
-
-    @BeforeClass
-    public static void setUpClass() throws Exception {
-        httpSource = new HTTPSource();
-        httpChannel = new MemoryChannel();
-        httpPort = findFreePort();
-        configureSourceAndChannel(httpSource, httpChannel, 
getDefaultNonSecureContext(httpPort));
-        httpChannel.start();
-        httpSource.start();
-
-        httpsSource = new HTTPSource();
-        httpsChannel = new MemoryChannel();
-        httpsPort = findFreePort();
-        configureSourceAndChannel(httpsSource, httpsChannel, 
getDefaultSecureContext(httpsPort));
-        httpsChannel.start();
-        httpsSource.start();
-
-        httpsGlobalKeystoreSource = new HTTPSource();
-        httpsGlobalKeystoreChannel = new MemoryChannel();
-        httpsGlobalKeystorePort = findFreePort();
-        configureSourceAndChannel(
-                httpsGlobalKeystoreSource,
-                httpsGlobalKeystoreChannel,
-                
getDefaultSecureContextGlobalKeystore(httpsGlobalKeystorePort));
-        httpsGlobalKeystoreChannel.start();
-        httpsGlobalKeystoreSource.start();
-
-        System.clearProperty("javax.net.ssl.keyStore");
-        System.clearProperty("javax.net.ssl.keyStorePassword");
-    }
-
-    private static void configureSourceAndChannel(HTTPSource source, Channel 
channel, Context context) {
-        Context channelContext = new Context();
-        channelContext.put("capacity", "100");
-        Configurables.configure(channel, channelContext);
-        Configurables.configure(source, context);
-
-        ChannelSelector rcs1 = new ReplicatingChannelSelector();
-        rcs1.setChannels(Collections.singletonList(channel));
-
-        source.setChannelProcessor(new ChannelProcessor(rcs1));
-    }
-
-    @AfterClass
-    public static void tearDownClass() throws Exception {
-        httpSource.stop();
-        httpChannel.stop();
-        httpsSource.stop();
-        httpsChannel.stop();
-        httpsGlobalKeystoreSource.stop();
-        httpsGlobalKeystoreChannel.stop();
-    }
-
-    @Before
-    public void setUp() {
-        HttpClientBuilder builder = HttpClientBuilder.create();
-        httpClient = builder.build();
-        postRequest = new HttpPost("http://0.0.0.0:"; + httpPort);
-        SourceCounter sc = (SourceCounter) 
Whitebox.getInternalState(httpSource, "sourceCounter");
-        sc.start();
-    }
-
-    @After
-    public void tearDown() {
-        SourceCounter sc = (SourceCounter) 
Whitebox.getInternalState(httpSource, "sourceCounter");
-        sc.stop();
-    }
-
-    @Test
-    public void testSimple() throws IOException, InterruptedException {
-
-        StringEntity input = new StringEntity("[{\"headers\":{\"a\": 
\"b\"},\"body\": \"random_body\"},"
-                + "{\"headers\":{\"e\": \"f\"},\"body\": \"random_body2\"}]");
-        // if we do not set the content type to JSON, the client will use
-        // ISO-8859-1 as the charset. JSON standard does not support this.
-        input.setContentType("application/json");
-        postRequest.setEntity(input);
-
-        HttpResponse response = httpClient.execute(postRequest);
-
-        Assert.assertEquals(HttpServletResponse.SC_OK, 
response.getStatusLine().getStatusCode());
-        Transaction tx = httpChannel.getTransaction();
-        tx.begin();
-        Event e = httpChannel.take();
-        Assert.assertNotNull(e);
-        Assert.assertEquals("b", e.getHeaders().get("a"));
-        Assert.assertEquals("random_body", new String(e.getBody(), "UTF-8"));
-
-        e = httpChannel.take();
-        Assert.assertNotNull(e);
-        Assert.assertEquals("f", e.getHeaders().get("e"));
-        Assert.assertEquals("random_body2", new String(e.getBody(), "UTF-8"));
-        tx.commit();
-        tx.close();
-    }
-
-    @Test
-    public void testTrace() throws Exception {
-        doTestForbidden(new HttpTrace("http://0.0.0.0:"; + httpPort));
-    }
-
-    @Test
-    public void testOptions() throws Exception {
-        doTestForbidden(new HttpOptions("http://0.0.0.0:"; + httpPort));
-    }
-
-    private void doTestForbidden(HttpRequestBase request) throws Exception {
-        HttpResponse response = httpClient.execute(request);
-        Assert.assertEquals(
-                HttpServletResponse.SC_FORBIDDEN, 
response.getStatusLine().getStatusCode());
-    }
-
-    @Test
-    public void testSimpleUTF16() throws IOException, InterruptedException {
-
-        StringEntity input = new StringEntity(
-                "[{\"headers\":{\"a\": \"b\"},\"body\": \"random_body\"},"
-                        + "{\"headers\":{\"e\": \"f\"},\"body\": 
\"random_body2\"}]",
-                "UTF-16");
-        input.setContentType("application/json; charset=utf-16");
-        postRequest.setEntity(input);
-
-        HttpResponse response = httpClient.execute(postRequest);
-
-        Assert.assertEquals(HttpServletResponse.SC_OK, 
response.getStatusLine().getStatusCode());
-        Transaction tx = httpChannel.getTransaction();
-        tx.begin();
-        Event e = httpChannel.take();
-        Assert.assertNotNull(e);
-        Assert.assertEquals("b", e.getHeaders().get("a"));
-        Assert.assertEquals("random_body", new String(e.getBody(), "UTF-16"));
-
-        e = httpChannel.take();
-        Assert.assertNotNull(e);
-        Assert.assertEquals("f", e.getHeaders().get("e"));
-        Assert.assertEquals("random_body2", new String(e.getBody(), "UTF-16"));
-        tx.commit();
-        tx.close();
-    }
-
-    @Test
-    public void testInvalid() throws Exception {
-        StringEntity input = new StringEntity("[{\"a\": 
\"b\",[\"d\":\"e\"],\"body\": \"random_body\"},"
-                + "{\"e\": \"f\",\"body\": \"random_body2\"}]");
-        input.setContentType("application/json");
-        postRequest.setEntity(input);
-        HttpResponse response = httpClient.execute(postRequest);
-
-        Assert.assertEquals(
-                HttpServletResponse.SC_BAD_REQUEST, 
response.getStatusLine().getStatusCode());
-        SourceCounter sc = (SourceCounter) 
Whitebox.getInternalState(httpSource, "sourceCounter");
-        Assert.assertEquals(1, sc.getEventReadFail());
-    }
-
-    @Test
-    public void testBigBatchDeserializationUTF8() throws Exception {
-        testBatchWithVariousEncoding("UTF-8");
-    }
-
-    @Test
-    public void testBigBatchDeserializationUTF16() throws Exception {
-        testBatchWithVariousEncoding("UTF-16");
-    }
-
-    @Test
-    public void testBigBatchDeserializationUTF32() throws Exception {
-        testBatchWithVariousEncoding("UTF-32");
-    }
-
-    @Test
-    public void testCounterGenericFail() throws Exception {
-        ChannelProcessor cp = Mockito.mock(ChannelProcessor.class);
-        doThrow(new 
RuntimeException("dummy")).when(cp).processEventBatch(anyList());
-        ChannelProcessor oldCp = httpSource.getChannelProcessor();
-        httpSource.setChannelProcessor(cp);
-        testBatchWithVariousEncoding("UTF-8");
-        SourceCounter sc = (SourceCounter) 
Whitebox.getInternalState(httpSource, "sourceCounter");
-        Assert.assertEquals(1, sc.getGenericProcessingFail());
-        httpSource.setChannelProcessor(oldCp);
-    }
-
-    @Test
-    public void testSingleEvent() throws Exception {
-        StringEntity input = new StringEntity("[{\"headers\" : {\"a\": 
\"b\"},\"body\":" + " \"random_body\"}]");
-        input.setContentType("application/json");
-        postRequest.setEntity(input);
-
-        httpClient.execute(postRequest);
-        Transaction tx = httpChannel.getTransaction();
-        tx.begin();
-        Event e = httpChannel.take();
-        Assert.assertNotNull(e);
-        Assert.assertEquals("b", e.getHeaders().get("a"));
-        Assert.assertEquals("random_body", new String(e.getBody(), "UTF-8"));
-        tx.commit();
-        tx.close();
-    }
-
-    /**
-     * First test that the unconfigured behaviour is as-expected, then add 
configurations
-     * to a new channel and observe the difference.
-     * For some of the properties, the most convenient way to test is using 
the MBean interface
-     * We test all of HttpConfiguration, ServerConnector, QueuedThreadPool and 
SslContextFactory
-     * sub-configurations (but not all properties)
-     */
-    @Test
-    public void testConfigurables() throws Exception {
-        StringEntity input = new StringEntity("[{\"headers\" : {\"a\": 
\"b\"},\"body\":" + " \"random_body\"}]");
-        input.setContentType("application/json");
-        postRequest.setEntity(input);
-
-        HttpResponse resp = httpClient.execute(postRequest);
-
-        // Testing default behaviour (to not provided X-Powered-By, but to 
provide Server headers)
-        Assert.assertTrue(resp.getHeaders("X-Powered-By").length == 0);
-        Assert.assertTrue(resp.getHeaders("Server").length == 1);
-
-        Transaction tx = httpChannel.getTransaction();
-        tx.begin();
-        Event e = httpChannel.take();
-        Assert.assertNotNull(e);
-        tx.commit();
-        tx.close();
-        
Assert.assertTrue(findMBeans("org.eclipse.jetty.util.thread:type=queuedthreadpool,*",
 "maxThreads", 123)
-                        .size()
-                == 0);
-        
Assert.assertTrue(findMBeans("org.eclipse.jetty.server:type=serverconnector,*", 
"acceptQueueSize", 22)
-                        .size()
-                == 0);
-
-        int newPort = findFreePort();
-        Context configuredSourceContext = getDefaultNonSecureContext(newPort);
-        configuredSourceContext.put("HttpConfiguration.sendServerVersion", 
"false");
-        configuredSourceContext.put("HttpConfiguration.sendXPoweredBy", 
"true");
-        configuredSourceContext.put("ServerConnector.acceptQueueSize", "22");
-        configuredSourceContext.put("QueuedThreadPool.maxThreads", "123");
-
-        HTTPSource newSource = new HTTPSource();
-        Channel newChannel = new MemoryChannel();
-        configureSourceAndChannel(newSource, newChannel, 
configuredSourceContext);
-        newChannel.start();
-        newSource.start();
-
-        HttpPost newPostRequest = new HttpPost("http://0.0.0.0:"; + newPort);
-
-        resp = httpClient.execute(newPostRequest);
-        Assert.assertTrue(resp.getHeaders("X-Powered-By").length > 0);
-        Assert.assertTrue(resp.getHeaders("Server").length == 0);
-        
Assert.assertTrue(findMBeans("org.eclipse.jetty.util.thread:type=queuedthreadpool,*",
 "maxThreads", 123)
-                        .size()
-                == 1);
-        
Assert.assertTrue(findMBeans("org.eclipse.jetty.server:type=serverconnector,*", 
"acceptQueueSize", 22)
-                        .size()
-                == 1);
-
-        newSource.stop();
-        newChannel.stop();
-
-        // Configure SslContextFactory with junk protocols (expect failure)
-        newPort = findFreePort();
-        configuredSourceContext = getDefaultSecureContext(newPort);
-        configuredSourceContext.put("SslContextFactory.IncludeProtocols", "abc 
def");
-
-        newSource = new HTTPSource();
-        newChannel = new MemoryChannel();
-
-        configureSourceAndChannel(newSource, newChannel, 
configuredSourceContext);
-
-        newChannel.start();
-        newSource.start();
-
-        newPostRequest = new HttpPost("http://0.0.0.0:"; + newPort);
-        try {
-            doTestHttps(null, newPort, httpsChannel);
-            // We are testing that this fails because we've deliberately 
configured the wrong protocols
-            Assert.assertTrue(false);
-        } catch (AssertionError ex) {
-            // no-op
-        }
-        newSource.stop();
-        newChannel.stop();
-    }
-
-    @Test
-    public void testFullChannel() throws Exception {
-        HttpResponse response = putWithEncoding("UTF-8", 150).response;
-        Assert.assertEquals(
-                HttpServletResponse.SC_SERVICE_UNAVAILABLE,
-                response.getStatusLine().getStatusCode());
-        SourceCounter sc = (SourceCounter) 
Whitebox.getInternalState(httpSource, "sourceCounter");
-        Assert.assertEquals(1, sc.getChannelWriteFail());
-    }
-
-    @Test
-    public void testFail() throws Exception {
-        HTTPSourceHandler handler =
-                
field("handler").ofType(HTTPSourceHandler.class).in(httpSource).get();
-        // Cause an exception in the source - this is equivalent to any 
exception
-        // thrown by the handler since the handler is called inside a try-catch
-        
field("handler").ofType(HTTPSourceHandler.class).in(httpSource).set(null);
-        HttpResponse response = putWithEncoding("UTF-8", 1).response;
-        Assert.assertEquals(
-                HttpServletResponse.SC_INTERNAL_SERVER_ERROR,
-                response.getStatusLine().getStatusCode());
-        // Set the original handler back so tests don't fail after this runs.
-        
field("handler").ofType(HTTPSourceHandler.class).in(httpSource).set(handler);
-    }
-
-    @Test
-    public void testMBeans() throws Exception {
-        MBeanServer mbeanServer = ManagementFactory.getPlatformMBeanServer();
-        ObjectName objectName = new ObjectName("org.eclipse.jetty.*:*");
-        Set<ObjectInstance> queryMBeans = mbeanServer.queryMBeans(objectName, 
null);
-        Assert.assertTrue(queryMBeans.size() > 0);
-    }
-
-    @Test
-    public void testHandlerThrowingException() throws Exception {
-        // This will cause the handler to throw an
-        // UnsupportedCharsetException.
-        HttpResponse response = putWithEncoding("ISO-8859-1", 150).response;
-        Assert.assertEquals(
-                HttpServletResponse.SC_INTERNAL_SERVER_ERROR,
-                response.getStatusLine().getStatusCode());
-    }
-
-    private Set<ObjectInstance> findMBeans(String name, String attribute, int 
value)
-            throws MalformedObjectNameException {
-        MBeanServer mbeanServer = ManagementFactory.getPlatformMBeanServer();
-        ObjectName objectName = new ObjectName(name);
-        QueryExp q = Query.eq(Query.attr(attribute), Query.value(value));
-        return mbeanServer.queryMBeans(objectName, q);
-    }
-
-    private ResultWrapper putWithEncoding(String encoding, int n) throws 
Exception {
-        Type listType = new TypeToken<List<JSONEvent>>() {}.getType();
-        List<JSONEvent> events = new ArrayList<JSONEvent>();
-        Random rand = new Random();
-        for (int i = 0; i < n; i++) {
-            Map<String, String> input = Maps.newHashMap();
-            for (int j = 0; j < 10; j++) {
-                input.put(String.valueOf(i) + String.valueOf(j), 
String.valueOf(i));
-            }
-            JSONEvent e = new JSONEvent();
-            e.setHeaders(input);
-            e.setBody(String.valueOf(rand.nextGaussian()).getBytes(encoding));
-            events.add(e);
-        }
-        Gson gson = new Gson();
-        String json = gson.toJson(events, listType);
-        StringEntity input = new StringEntity(json);
-        input.setContentType("application/json; charset=" + encoding);
-        postRequest.setEntity(input);
-        HttpResponse resp = httpClient.execute(postRequest);
-        return new ResultWrapper(resp, events);
-    }
-
-    @Test
-    public void testHttps() throws Exception {
-        doTestHttps(null, httpsPort, httpsChannel);
-    }
-
-    @Test(expected = javax.net.ssl.SSLHandshakeException.class)
-    public void testHttpsSSLv3() throws Exception {
-        doTestHttps("SSLv3", httpsPort, httpsChannel);
-    }
-
-    @Test
-    public void testHttpsGlobalKeystore() throws Exception {
-        doTestHttps(null, httpsGlobalKeystorePort, httpsGlobalKeystoreChannel);
-    }
-
-    private void doTestHttps(String protocol, int port, Channel channel) 
throws Exception {
-        Type listType = new TypeToken<List<JSONEvent>>() {}.getType();
-        List<JSONEvent> events = new ArrayList<JSONEvent>();
-        Random rand = new Random();
-        for (int i = 0; i < 10; i++) {
-            Map<String, String> input = Maps.newHashMap();
-            for (int j = 0; j < 10; j++) {
-                input.put(String.valueOf(i) + String.valueOf(j), 
String.valueOf(i));
-            }
-            input.put("MsgNum", String.valueOf(i));
-            JSONEvent e = new JSONEvent();
-            e.setHeaders(input);
-            e.setBody(String.valueOf(rand.nextGaussian()).getBytes("UTF-8"));
-            events.add(e);
-        }
-        Gson gson = new Gson();
-        String json = gson.toJson(events, listType);
-        HttpsURLConnection httpsURLConnection = null;
-        Transaction transaction = null;
-        try {
-            TrustManager[] trustAllCerts = {
-                new X509TrustManager() {
-                    @Override
-                    public void 
checkClientTrusted(java.security.cert.X509Certificate[] x509Certificates, 
String s)
-                            throws CertificateException {
-                        // noop
-                    }
-
-                    @Override
-                    public void 
checkServerTrusted(java.security.cert.X509Certificate[] x509Certificates, 
String s)
-                            throws CertificateException {
-                        // noop
-                    }
-
-                    public java.security.cert.X509Certificate[] 
getAcceptedIssuers() {
-                        return null;
-                    }
-                }
-            };
-
-            SSLContext sc = null;
-            javax.net.ssl.SSLSocketFactory factory = null;
-            if (System.getProperty("java.vendor").contains("IBM")) {
-                sc = SSLContext.getInstance("SSL_TLS");
-            } else {
-                sc = SSLContext.getInstance("SSL");
-            }
-
-            HostnameVerifier hv = new HostnameVerifier() {
-                public boolean verify(String arg0, SSLSession arg1) {
-                    return true;
-                }
-            };
-            sc.init(null, trustAllCerts, new SecureRandom());
-
-            if (protocol != null) {
-                factory = new 
DisabledProtocolsSocketFactory(sc.getSocketFactory(), protocol);
-            } else {
-                factory = sc.getSocketFactory();
-            }
-            HttpsURLConnection.setDefaultSSLSocketFactory(factory);
-            
HttpsURLConnection.setDefaultHostnameVerifier(NoopHostnameVerifier.INSTANCE);
-            URL sslUrl = new URL("https://0.0.0.0:"; + port);
-            httpsURLConnection = (HttpsURLConnection) sslUrl.openConnection();
-            httpsURLConnection.setDoInput(true);
-            httpsURLConnection.setDoOutput(true);
-            httpsURLConnection.setRequestMethod("POST");
-            httpsURLConnection.getOutputStream().write(json.getBytes());
-
-            int statusCode = httpsURLConnection.getResponseCode();
-            Assert.assertEquals(200, statusCode);
-
-            transaction = channel.getTransaction();
-            transaction.begin();
-            for (int i = 0; i < 10; i++) {
-                Event e = channel.take();
-                Assert.assertNotNull(e);
-                Assert.assertEquals(String.valueOf(i), 
e.getHeaders().get("MsgNum"));
-            }
-
-        } finally {
-            if (transaction != null) {
-                transaction.commit();
-                transaction.close();
-            }
-            httpsURLConnection.disconnect();
-        }
-    }
-
-    @Test
-    public void testHttpsSourceNonHttpsClient() throws Exception {
-        Type listType = new TypeToken<List<JSONEvent>>() {}.getType();
-        List<JSONEvent> events = new ArrayList<JSONEvent>();
-        Random rand = new Random();
-        for (int i = 0; i < 10; i++) {
-            Map<String, String> input = Maps.newHashMap();
-            for (int j = 0; j < 10; j++) {
-                input.put(String.valueOf(i) + String.valueOf(j), 
String.valueOf(i));
-            }
-            input.put("MsgNum", String.valueOf(i));
-            JSONEvent e = new JSONEvent();
-            e.setHeaders(input);
-            e.setBody(String.valueOf(rand.nextGaussian()).getBytes("UTF-8"));
-            events.add(e);
-        }
-        Gson gson = new Gson();
-        String json = gson.toJson(events, listType);
-        HttpURLConnection httpURLConnection = null;
-        try {
-            URL url = new URL("http://0.0.0.0:"; + httpsPort);
-            httpURLConnection = (HttpURLConnection) url.openConnection();
-            httpURLConnection.setDoInput(true);
-            httpURLConnection.setDoOutput(true);
-            httpURLConnection.setRequestMethod("POST");
-            httpURLConnection.getOutputStream().write(json.getBytes());
-            httpURLConnection.getResponseCode();
-
-            Assert.fail("HTTP Client cannot connect to HTTPS source");
-        } catch (Exception exception) {
-            Assert.assertTrue("Exception expected", true);
-        } finally {
-            httpURLConnection.disconnect();
-        }
-    }
-
-    private void takeWithEncoding(String encoding, int n, List<JSONEvent> 
events) throws Exception {
-        Transaction tx = httpChannel.getTransaction();
-        tx.begin();
-        Event e = null;
-        int i = 0;
-        while (true) {
-            e = httpChannel.take();
-            if (e == null) {
-                break;
-            }
-            Event current = events.get(i++);
-            Assert.assertEquals(new String(current.getBody(), encoding), new 
String(e.getBody(), encoding));
-            Assert.assertEquals(current.getHeaders(), e.getHeaders());
-        }
-        Assert.assertEquals(n, events.size());
-        tx.commit();
-        tx.close();
-    }
-
-    private void testBatchWithVariousEncoding(String encoding) throws 
Exception {
-        testBatchWithVariousEncoding(encoding, 50);
-    }
-
-    private void testBatchWithVariousEncoding(String encoding, int n) throws 
Exception {
-        List<JSONEvent> events = putWithEncoding(encoding, n).events;
-        takeWithEncoding(encoding, n, events);
-    }
-
-    private class ResultWrapper {
-        public final HttpResponse response;
-        public final List<JSONEvent> events;
-
-        public ResultWrapper(HttpResponse resp, List<JSONEvent> events) {
-            this.response = resp;
-            this.events = events;
-        }
-    }
-
-    private class DisabledProtocolsSocketFactory extends 
javax.net.ssl.SSLSocketFactory {
-
-        private final javax.net.ssl.SSLSocketFactory socketFactory;
-        private final String[] protocols;
-
-        DisabledProtocolsSocketFactory(javax.net.ssl.SSLSocketFactory factory, 
String protocol) {
-            this.socketFactory = factory;
-            protocols = new String[1];
-            protocols[0] = protocol;
-        }
-
-        @Override
-        public String[] getDefaultCipherSuites() {
-            return socketFactory.getDefaultCipherSuites();
-        }
-
-        @Override
-        public String[] getSupportedCipherSuites() {
-            return socketFactory.getSupportedCipherSuites();
-        }
-
-        @Override
-        public Socket createSocket(Socket socket, String s, int i, boolean b) 
throws IOException {
-            SSLSocket sc = (SSLSocket) socketFactory.createSocket(socket, s, 
i, b);
-            sc.setEnabledProtocols(protocols);
-            return sc;
-        }
-
-        @Override
-        public Socket createSocket(String s, int i) throws IOException, 
UnknownHostException {
-            SSLSocket sc = (SSLSocket) socketFactory.createSocket(s, i);
-            sc.setEnabledProtocols(protocols);
-            return sc;
-        }
-
-        @Override
-        public Socket createSocket(String s, int i, InetAddress inetAddress, 
int i2)
-                throws IOException, UnknownHostException {
-            SSLSocket sc = (SSLSocket) socketFactory.createSocket(s, i, 
inetAddress, i2);
-            sc.setEnabledProtocols(protocols);
-            return sc;
-        }
-
-        @Override
-        public Socket createSocket(InetAddress inetAddress, int i) throws 
IOException {
-            SSLSocket sc = (SSLSocket) socketFactory.createSocket(inetAddress, 
i);
-            sc.setEnabledProtocols(protocols);
-            return sc;
-        }
-
-        @Override
-        public Socket createSocket(InetAddress inetAddress, int i, InetAddress 
inetAddress2, int i2)
-                throws IOException {
-            SSLSocket sc = (SSLSocket) socketFactory.createSocket(
-                    inetAddress, i,
-                    inetAddress2, i2);
-            sc.setEnabledProtocols(protocols);
-            return sc;
-        }
-    }
-}
diff --git 
a/flume-ng-core/src/test/java/org/apache/flume/source/http/TestJSONHandler.java 
b/flume-ng-core/src/test/java/org/apache/flume/source/http/TestJSONHandler.java
deleted file mode 100644
index d5f90f7e2..000000000
--- 
a/flume-ng-core/src/test/java/org/apache/flume/source/http/TestJSONHandler.java
+++ /dev/null
@@ -1,235 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to you under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flume.source.http;
-
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import com.google.gson.Gson;
-import com.google.gson.reflect.TypeToken;
-import java.lang.reflect.Type;
-import java.nio.charset.UnsupportedCharsetException;
-import java.util.List;
-import java.util.Map;
-import java.util.Random;
-import javax.servlet.http.HttpServletRequest;
-import junit.framework.Assert;
-import org.apache.flume.Event;
-import org.apache.flume.event.JSONEvent;
-import org.junit.Before;
-import org.junit.Test;
-
-/**
- *
- */
-public class TestJSONHandler {
-
-    HTTPSourceHandler handler;
-
-    @Before
-    public void setUp() {
-        handler = new JSONHandler();
-    }
-
-    @Test
-    public void testMultipleEvents() throws Exception {
-        String json = "[{\"headers\":{\"a\": \"b\"},\"body\": 
\"random_body\"},"
-                + "{\"headers\":{\"e\": \"f\"},\"body\": \"random_body2\"}]";
-        HttpServletRequest req = new FlumeHttpServletRequestWrapper(json);
-        List<Event> deserialized = handler.getEvents(req);
-        Event e = deserialized.get(0);
-        Assert.assertEquals("b", e.getHeaders().get("a"));
-        Assert.assertEquals("random_body", new String(e.getBody(), "UTF-8"));
-        e = deserialized.get(1);
-        Assert.assertEquals("f", e.getHeaders().get("e"));
-        Assert.assertEquals("random_body2", new String(e.getBody(), "UTF-8"));
-    }
-
-    @Test
-    public void testMultipleEventsUTF16() throws Exception {
-        String json = "[{\"headers\":{\"a\": \"b\"},\"body\": 
\"random_body\"},"
-                + "{\"headers\":{\"e\": \"f\"},\"body\": \"random_body2\"}]";
-        HttpServletRequest req = new FlumeHttpServletRequestWrapper(json, 
"UTF-16");
-        List<Event> deserialized = handler.getEvents(req);
-        Event e = deserialized.get(0);
-        Assert.assertEquals("b", e.getHeaders().get("a"));
-        Assert.assertEquals("random_body", new String(e.getBody(), "UTF-16"));
-        e = deserialized.get(1);
-        Assert.assertEquals("f", e.getHeaders().get("e"));
-        Assert.assertEquals("random_body2", new String(e.getBody(), "UTF-16"));
-    }
-
-    @Test
-    public void testMultipleEventsUTF32() throws Exception {
-        String json = "[{\"headers\":{\"a\": \"b\"},\"body\": 
\"random_body\"},"
-                + "{\"headers\":{\"e\": \"f\"},\"body\": \"random_body2\"}]";
-        HttpServletRequest req = new FlumeHttpServletRequestWrapper(json, 
"UTF-32");
-        List<Event> deserialized = handler.getEvents(req);
-        Event e = deserialized.get(0);
-        Assert.assertEquals("b", e.getHeaders().get("a"));
-        Assert.assertEquals("random_body", new String(e.getBody(), "UTF-32"));
-        e = deserialized.get(1);
-        Assert.assertEquals("f", e.getHeaders().get("e"));
-        Assert.assertEquals("random_body2", new String(e.getBody(), "UTF-32"));
-    }
-
-    @Test
-    public void testMultipleEventsUTF8() throws Exception {
-        String json = "[{\"headers\":{\"a\": \"b\"},\"body\": 
\"random_body\"},"
-                + "{\"headers\":{\"e\": \"f\"},\"body\": \"random_body2\"}]";
-        HttpServletRequest req = new FlumeHttpServletRequestWrapper(json, 
"UTF-8");
-        List<Event> deserialized = handler.getEvents(req);
-        Event e = deserialized.get(0);
-        Assert.assertEquals("b", e.getHeaders().get("a"));
-        Assert.assertEquals("random_body", new String(e.getBody(), "UTF-8"));
-        e = deserialized.get(1);
-        Assert.assertEquals("f", e.getHeaders().get("e"));
-        Assert.assertEquals("random_body2", new String(e.getBody(), "UTF-8"));
-    }
-
-    @Test
-    public void testEscapedJSON() throws Exception {
-        // JSON allows escaping double quotes to add it in the data.
-        String json = "[{\"headers\":{\"a\": \"b\"}}," + "{\"headers\":{\"e\": 
\"f\"},\"body\": \"rand\\\"om_body2\"}]";
-        HttpServletRequest req = new FlumeHttpServletRequestWrapper(json);
-        List<Event> deserialized = handler.getEvents(req);
-        Event e = deserialized.get(0);
-        Assert.assertEquals("b", e.getHeaders().get("a"));
-        Assert.assertTrue(e.getBody().length == 0);
-        e = deserialized.get(1);
-        Assert.assertEquals("f", e.getHeaders().get("e"));
-        Assert.assertEquals("rand\"om_body2", new String(e.getBody(), 
"UTF-8"));
-    }
-
-    @Test
-    public void testNoBody() throws Exception {
-        String json = "[{\"headers\" : {\"a\": \"b\"}}," + "{\"headers\" : 
{\"e\": \"f\"},\"body\": \"random_body2\"}]";
-        HttpServletRequest req = new FlumeHttpServletRequestWrapper(json);
-        List<Event> deserialized = handler.getEvents(req);
-        Event e = deserialized.get(0);
-        Assert.assertEquals("b", e.getHeaders().get("a"));
-        Assert.assertTrue(e.getBody().length == 0);
-        e = deserialized.get(1);
-        Assert.assertEquals("f", e.getHeaders().get("e"));
-        Assert.assertEquals("random_body2", new String(e.getBody(), "UTF-8"));
-    }
-
-    @Test
-    public void testSingleHTMLEvent() throws Exception {
-        String json = "[{\"headers\": {\"a\": \"b\"}," + "\"body\": 
\"<html><body>test</body></html>\"}]";
-        HttpServletRequest req = new FlumeHttpServletRequestWrapper(json);
-        List<Event> deserialized = handler.getEvents(req);
-        Event e = deserialized.get(0);
-        Assert.assertEquals("b", e.getHeaders().get("a"));
-        Assert.assertEquals("<html><body>test</body></html>", new 
String(e.getBody(), "UTF-8"));
-    }
-
-    @Test
-    public void testSingleEvent() throws Exception {
-        String json = "[{\"headers\" : {\"a\": \"b\"},\"body\": 
\"random_body\"}]";
-        HttpServletRequest req = new FlumeHttpServletRequestWrapper(json);
-        List<Event> deserialized = handler.getEvents(req);
-        Event e = deserialized.get(0);
-        Assert.assertEquals("b", e.getHeaders().get("a"));
-        Assert.assertEquals("random_body", new String(e.getBody(), "UTF-8"));
-    }
-
-    @Test(expected = HTTPBadRequestException.class)
-    public void testBadEvent() throws Exception {
-        String json = "{[\"a\": \"b\"],\"body\": \"random_body\"}";
-        HttpServletRequest req = new FlumeHttpServletRequestWrapper(json);
-        handler.getEvents(req);
-        Assert.fail();
-    }
-
-    @Test(expected = UnsupportedCharsetException.class)
-    public void testError() throws Exception {
-        String json = "[{\"headers\" : {\"a\": \"b\"},\"body\": 
\"random_body\"}]";
-        HttpServletRequest req = new FlumeHttpServletRequestWrapper(json, 
"ISO-8859-1");
-        handler.getEvents(req);
-        Assert.fail();
-    }
-
-    @Test
-    public void testSingleEventInArray() throws Exception {
-        String json = "[{\"headers\": {\"a\": \"b\"},\"body\": 
\"random_body\"}]";
-        HttpServletRequest req = new FlumeHttpServletRequestWrapper(json);
-        List<Event> deserialized = handler.getEvents(req);
-        Event e = deserialized.get(0);
-        Assert.assertEquals("b", e.getHeaders().get("a"));
-        Assert.assertEquals("random_body", new String(e.getBody(), "UTF-8"));
-    }
-
-    @Test
-    public void testMultipleLargeEvents() throws Exception {
-        String json = "[{\"headers\" : {\"a\": \"b\", \"a2\": \"b2\","
-                + "\"a3\": \"b3\",\"a4\": \"b4\"},\"body\": \"random_body\"},"
-                + "{\"headers\" :{\"e\": \"f\",\"e2\": \"f2\","
-                + "\"e3\": \"f3\",\"e4\": \"f4\",\"e5\": \"f5\"},"
-                + "\"body\": \"random_body2\"},"
-                + "{\"headers\" :{\"q1\": \"b\",\"q2\": \"b2\",\"q3\": 
\"b3\",\"q4\": \"b4\"},"
-                + "\"body\": \"random_bodyq\"}]";
-        HttpServletRequest req = new FlumeHttpServletRequestWrapper(json);
-        List<Event> deserialized = handler.getEvents(req);
-        Event e = deserialized.get(0);
-        Assert.assertNotNull(e);
-        Assert.assertEquals("b", e.getHeaders().get("a"));
-        Assert.assertEquals("b2", e.getHeaders().get("a2"));
-        Assert.assertEquals("b3", e.getHeaders().get("a3"));
-        Assert.assertEquals("b4", e.getHeaders().get("a4"));
-        Assert.assertEquals("random_body", new String(e.getBody(), "UTF-8"));
-        e = deserialized.get(1);
-        Assert.assertNotNull(e);
-        Assert.assertEquals("f", e.getHeaders().get("e"));
-        Assert.assertEquals("f2", e.getHeaders().get("e2"));
-        Assert.assertEquals("f3", e.getHeaders().get("e3"));
-        Assert.assertEquals("f4", e.getHeaders().get("e4"));
-        Assert.assertEquals("f5", e.getHeaders().get("e5"));
-        Assert.assertEquals("random_body2", new String(e.getBody(), "UTF-8"));
-        e = deserialized.get(2);
-        Assert.assertNotNull(e);
-        Assert.assertEquals("b", e.getHeaders().get("q1"));
-        Assert.assertEquals("b2", e.getHeaders().get("q2"));
-        Assert.assertEquals("b3", e.getHeaders().get("q3"));
-        Assert.assertEquals("b4", e.getHeaders().get("q4"));
-        Assert.assertEquals("random_bodyq", new String(e.getBody(), "UTF-8"));
-    }
-
-    @Test
-    public void testDeserializarion() throws Exception {
-        Type listType = new TypeToken<List<JSONEvent>>() {}.getType();
-        List<JSONEvent> events = Lists.newArrayList();
-        Random rand = new Random();
-        for (int i = 1; i < 10; i++) {
-            Map<String, String> input = Maps.newHashMap();
-            for (int j = 1; j < 10; j++) {
-                input.put(String.valueOf(i) + String.valueOf(j), 
String.valueOf(i));
-            }
-            JSONEvent e = new JSONEvent();
-            e.setBody(String.valueOf(rand.nextGaussian()).getBytes("UTF-8"));
-            e.setHeaders(input);
-            events.add(e);
-        }
-        Gson gson = new Gson();
-        List<Event> deserialized = handler.getEvents(new 
FlumeHttpServletRequestWrapper(gson.toJson(events, listType)));
-        int i = 0;
-        for (Event e : deserialized) {
-            Event current = events.get(i++);
-            Assert.assertEquals(new String(current.getBody(), "UTF-8"), new 
String(e.getBody(), "UTF-8"));
-            Assert.assertEquals(current.getHeaders(), e.getHeaders());
-        }
-    }
-}
diff --git a/flume-parent/pom.xml b/flume-parent/pom.xml
index 90fcb68e0..3ef43cb53 100644
--- a/flume-parent/pom.xml
+++ b/flume-parent/pom.xml
@@ -254,7 +254,8 @@
     <httpcore.version>4.4.15</httpcore.version>
     <httpclient.version>4.5.13</httpclient.version>
     <irclib.version>1.10</irclib.version>
-    <jetty.version>9.4.51.v20230217</jetty.version>
+    <jakarta-servlet.version>6.1.0</jakarta-servlet.version>
+    <jetty.version>12.1.9</jetty.version>
     <junit.version>4.13.2</junit.version>
     <log4j.version>2.26.0</log4j.version>
     <mapdb.version>0.9.9</mapdb.version>
@@ -280,7 +281,7 @@
     <netty-all.version>4.1.86.Final</netty-all.version>
     <external.protobuf.version>4.35.0</external.protobuf.version>
     <protobuf.plugin.version>0.6.1</protobuf.plugin.version>
-    <prometheus.version>0.15.0</prometheus.version>
+    <prometheus.version>1.7.0</prometheus.version>
     <rat.version>0.12</rat.version>
     <snappy-java.version>1.1.8.4</snappy-java.version>
     <slf4j.version>1.7.32</slf4j.version>
@@ -409,8 +410,16 @@
       </dependency>
 
       <dependency>
-        <groupId>org.eclipse.jetty</groupId>
-        <artifactId>jetty-servlet</artifactId>
+        <groupId>jakarta.servlet</groupId>
+        <artifactId>jakarta.servlet-api</artifactId>
+        <!-- Use 5.0.0 for Jetty 11 / EE9. Use 6.0.0 or 6.1.0 for Jetty 12 
(EE10) -->
+        <version>${jakarta-servlet.version}</version>
+        <scope>provided</scope>
+      </dependency>
+
+      <dependency>
+        <groupId>org.eclipse.jetty.ee11</groupId>
+        <artifactId>jetty-ee11-servlet</artifactId>
         <version>${jetty.version}</version>
       </dependency>
 
@@ -554,13 +563,13 @@
 
       <dependency>
         <groupId>io.prometheus</groupId>
-        <artifactId>simpleclient</artifactId>
+        <artifactId>prometheus-metrics-core</artifactId>
         <version>${prometheus.version}</version>
       </dependency>
 
       <dependency>
         <groupId>io.prometheus</groupId>
-        <artifactId>simpleclient_servlet</artifactId>
+        <artifactId>prometheus-metrics-exporter-servlet-jakarta</artifactId>
         <version>${prometheus.version}</version>
       </dependency>
 

Reply via email to