This is an automated email from the ASF dual-hosted git repository. rgoers pushed a commit to branch Prometheus-and-Jetty in repository https://gitbox.apache.org/repos/asf/logging-flume.git
commit ee634d488ff0367b49aa4fa041f3d59fecc8dc2d Author: Ralph Goers <[email protected]> AuthorDate: Tue Jun 9 08:15:52 2026 -0700 Upgrade Prometheus and Jetty --- flume-ng-core/pom.xml | 9 +- .../instrumentation/http/HTTPMetricsServer.java | 69 +- .../http/PrometheusHTTPMetricsServer.java | 200 +++--- .../org/apache/flume/source/http/BLOBHandler.java | 98 --- .../flume/source/http/HTTPBadRequestException.java | 42 -- .../org/apache/flume/source/http/HTTPSource.java | 296 --------- .../http/HTTPSourceConfigurationConstants.java | 45 -- .../flume/source/http/HTTPSourceHandler.java | 42 -- .../org/apache/flume/source/http/JSONHandler.java | 133 ---- .../flume/tools/HTTPServerConstraintUtil.java | 36 +- .../http/TestHTTPMetricsServer.java | 2 +- .../flume/source/TestDefaultSourceFactory.java | 5 - .../http/FlumeHttpServletRequestWrapper.java | 406 ------------ .../apache/flume/source/http/TestBLOBHandler.java | 173 ----- .../apache/flume/source/http/TestHTTPSource.java | 732 --------------------- .../apache/flume/source/http/TestJSONHandler.java | 235 ------- flume-parent/pom.xml | 21 +- 17 files changed, 156 insertions(+), 2388 deletions(-) diff --git a/flume-ng-core/pom.xml b/flume-ng-core/pom.xml index 2c1ee40fa..a5a4d128e 100644 --- a/flume-ng-core/pom.xml +++ b/flume-ng-core/pom.xml @@ -131,9 +131,10 @@ <artifactId>netty-all</artifactId> </dependency> + <dependency> - <groupId>org.eclipse.jetty</groupId> - <artifactId>jetty-servlet</artifactId> + <groupId>org.eclipse.jetty.ee11</groupId> + <artifactId>jetty-ee11-servlet</artifactId> </dependency> <dependency> @@ -181,12 +182,12 @@ <dependency> <groupId>io.prometheus</groupId> - <artifactId>simpleclient</artifactId> + <artifactId>prometheus-metrics-core</artifactId> </dependency> <dependency> <groupId>io.prometheus</groupId> - <artifactId>simpleclient_servlet</artifactId> + <artifactId>prometheus-metrics-exporter-servlet-jakarta</artifactId> </dependency> </dependencies> diff --git a/flume-ng-core/src/main/java/org/apache/flume/instrumentation/http/HTTPMetricsServer.java b/flume-ng-core/src/main/java/org/apache/flume/instrumentation/http/HTTPMetricsServer.java index 1fefc5ec7..506ad59ed 100644 --- a/flume-ng-core/src/main/java/org/apache/flume/instrumentation/http/HTTPMetricsServer.java +++ b/flume-ng-core/src/main/java/org/apache/flume/instrumentation/http/HTTPMetricsServer.java @@ -18,21 +18,21 @@ package org.apache.flume.instrumentation.http; import com.google.gson.Gson; import com.google.gson.reflect.TypeToken; -import java.io.IOException; -import java.lang.reflect.Type; +import jakarta.servlet.http.HttpServletResponse; +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; import java.util.Map; -import javax.servlet.ServletException; -import javax.servlet.http.HttpServletRequest; -import javax.servlet.http.HttpServletResponse; import org.apache.flume.Context; import org.apache.flume.instrumentation.MonitorService; import org.apache.flume.instrumentation.util.JMXPollUtil; +import org.eclipse.jetty.server.Handler; import org.eclipse.jetty.server.HttpConfiguration; import org.eclipse.jetty.server.HttpConnectionFactory; import org.eclipse.jetty.server.Request; +import org.eclipse.jetty.server.Response; import org.eclipse.jetty.server.Server; import org.eclipse.jetty.server.ServerConnector; -import org.eclipse.jetty.server.handler.AbstractHandler; +import org.eclipse.jetty.util.Callback; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -94,45 +94,44 @@ public class HTTPMetricsServer implements MonitorService { port = context.getInteger(CONFIG_PORT, DEFAULT_PORT); } - private class HTTPMetricsHandler extends AbstractHandler { + private class HTTPMetricsHandler extends Handler.Abstract { - Type mapType = new TypeToken<Map<String, Map<String, String>>>() {}.getType(); + java.lang.reflect.Type mapType = new TypeToken<Map<String, Map<String, String>>>() {}.getType(); Gson gson = new Gson(); @Override - public void handle(String target, Request r1, HttpServletRequest request, HttpServletResponse response) - throws IOException, ServletException { - // /metrics is the only place to pull metrics. - // If we want to use any other url for something else, we should make sure - // that for metrics only /metrics is used to prevent backward - // compatibility issues. - if (request.getMethod().equalsIgnoreCase("TRACE") - || request.getMethod().equalsIgnoreCase("OPTIONS")) { - response.sendError(HttpServletResponse.SC_FORBIDDEN); - response.flushBuffer(); - ((Request) request).setHandled(true); - return; + public boolean handle(Request request, Response response, Callback callback) throws Exception { + String method = request.getMethod(); + String path = Request.getPathInContext(request); + + if (method.equalsIgnoreCase("TRACE") || method.equalsIgnoreCase("OPTIONS")) { + response.setStatus(HttpServletResponse.SC_FORBIDDEN); + callback.succeeded(); + return true; } - if (target.equals("/")) { - response.setContentType("text/html;charset=utf-8"); + + if ("/".equals(path)) { response.setStatus(HttpServletResponse.SC_OK); - response.getWriter().write("For Flume metrics please click" + " <a href = \"./metrics\"> here</a>."); - response.flushBuffer(); - ((Request) request).setHandled(true); - return; - } else if (target.equalsIgnoreCase("/metrics")) { - response.setContentType("application/json;charset=utf-8"); + response.getHeaders().put("Content-Type", "text/html;charset=utf-8"); + String html = "For Flume metrics please click <a href=\"./metrics\"> here</a>."; + response.write(true, ByteBuffer.wrap(html.getBytes(StandardCharsets.UTF_8)), callback); + return true; + } + + if ("/metrics".equalsIgnoreCase(path)) { response.setStatus(HttpServletResponse.SC_OK); + response.getHeaders().put("Content-Type", "application/json;charset=utf-8"); + Map<String, Map<String, String>> metricsMap = JMXPollUtil.getAllMBeans(); String json = gson.toJson(metricsMap, mapType); - response.getWriter().write(json); - response.flushBuffer(); - ((Request) request).setHandled(true); - return; + + response.write(true, ByteBuffer.wrap(json.getBytes(StandardCharsets.UTF_8)), callback); + return true; } - response.sendError(HttpServletResponse.SC_NOT_FOUND); - response.flushBuffer(); - // Not handling the request returns a Not found error page. + + response.setStatus(HttpServletResponse.SC_NOT_FOUND); + callback.succeeded(); + return true; } } } diff --git a/flume-ng-core/src/main/java/org/apache/flume/instrumentation/http/PrometheusHTTPMetricsServer.java b/flume-ng-core/src/main/java/org/apache/flume/instrumentation/http/PrometheusHTTPMetricsServer.java index 3512050eb..ea876b131 100644 --- a/flume-ng-core/src/main/java/org/apache/flume/instrumentation/http/PrometheusHTTPMetricsServer.java +++ b/flume-ng-core/src/main/java/org/apache/flume/instrumentation/http/PrometheusHTTPMetricsServer.java @@ -17,16 +17,14 @@ package org.apache.flume.instrumentation.http; import com.google.common.base.Throwables; -import io.prometheus.client.Collector; -import io.prometheus.client.CounterMetricFamily; -import io.prometheus.client.GaugeMetricFamily; -import io.prometheus.client.exporter.MetricsServlet; +import io.prometheus.metrics.core.metrics.Counter; +import io.prometheus.metrics.core.metrics.Gauge; +import io.prometheus.metrics.model.registry.PrometheusRegistry; +import io.prometheus.metrics.exporter.servlet.jakarta.PrometheusMetricsServlet; import java.lang.management.ManagementFactory; import java.lang.reflect.Method; import java.util.ArrayList; -import java.util.Arrays; import java.util.HashMap; -import java.util.List; import java.util.Map; import java.util.Set; import java.util.TreeMap; @@ -44,20 +42,16 @@ import org.eclipse.jetty.server.HttpConfiguration; import org.eclipse.jetty.server.HttpConnectionFactory; import org.eclipse.jetty.server.Server; import org.eclipse.jetty.server.ServerConnector; -import org.eclipse.jetty.servlet.ServletContextHandler; -import org.eclipse.jetty.servlet.ServletHolder; +import org.eclipse.jetty.ee11.servlet.ServletContextHandler; +import org.eclipse.jetty.ee11.servlet.ServletHolder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * A Monitor service implementation that runs a web server on a configurable - * port and returns the metrics for components in JSON format. <p> Optional + * port and returns the metrics for components in Prometheus format. <p> Optional * parameters: <p> <tt>port</tt> : The port on which the server should listen - * to.<p> Returns metrics in the following format: <p> - * - * {<p> "componentName1":{"metric1" : "metricValue1","metric2":"metricValue2"} - * <p> "componentName1":{"metric3" : "metricValue3","metric4":"metricValue4"} - * <p> } + * to.<p> Returns metrics in Prometheus text format via /metrics endpoint */ public class PrometheusHTTPMetricsServer extends HTTPMetricsServer implements MonitorService { @@ -66,12 +60,13 @@ public class PrometheusHTTPMetricsServer extends HTTPMetricsServer implements Mo private static Logger LOG = LoggerFactory.getLogger(PrometheusHTTPMetricsServer.class); private static MBeanServer mbeanServer = ManagementFactory.getPlatformMBeanServer(); - private FlumePrometheusCollector requests; + private FlumePrometheusCollector metricsCollector; @Override public void start() { - requests = new FlumePrometheusCollector().register(); + metricsCollector = new FlumePrometheusCollector(); + metricsCollector.register(); jettyServer = new Server(); // We can use Contexts etc if we have many urls to handle. For one url, @@ -84,24 +79,27 @@ public class PrometheusHTTPMetricsServer extends HTTPMetricsServer implements Mo ServletContextHandler context = new ServletContextHandler(); context.setContextPath("/"); jettyServer.setHandler(context); - context.addServlet(new ServletHolder(new MetricsServlet()), "/metrics"); + context.addServlet(new ServletHolder(new PrometheusMetricsServlet()), "/metrics"); try { jettyServer.start(); while (!jettyServer.isStarted()) { Thread.sleep(500); } } catch (Exception ex) { - LOG.error("Error starting Jetty. JSON Metrics may not be available.", ex); + LOG.error("Error starting Jetty. Prometheus Metrics may not be available.", ex); } } - class FlumePrometheusCollector extends Collector { - - public List<MetricFamilySamples> collect() { + class FlumePrometheusCollector { + private final Map<String, Counter> counters = new HashMap<>(); + private final Map<String, Gauge> gauges = new HashMap<>(); + private final PrometheusRegistry registry = PrometheusRegistry.defaultRegistry; - Map<Object, Map<String, MetricFamilySamples>> counterMetricMap = new HashMap<>(); - List<Collector.MetricFamilySamples> mfs = new ArrayList<>(); + public void register() { + collectMetrics(); + } + private void collectMetrics() { Set<ObjectInstance> queryMBeans; try { queryMBeans = mbeanServer.queryMBeans(null, null); @@ -109,57 +107,44 @@ public class PrometheusHTTPMetricsServer extends HTTPMetricsServer implements Mo for (ObjectInstance obj : queryMBeans) { try { if (obj.getObjectName().toString().startsWith("org.apache.flume")) { - processFlumeMetric(counterMetricMap, mfs, obj); + processFlumeMetric(obj); } else if ((obj.getObjectName().toString().startsWith("kafka.consumer") - || obj.getObjectName().toString().startsWith("kafka.producer")) + || obj.getObjectName().toString().startsWith("kafka.producer")) && obj.getObjectName().toString().contains("metrics")) { - processKafkaMetric(counterMetricMap, mfs, obj); + processKafkaMetric(obj); } } catch (Exception e) { LOG.error("Unable to poll JMX for metrics.", e); } } - return mfs; } catch (Exception ex) { LOG.error("Could not get Mbeans for monitoring", ex); Throwables.propagate(ex); - return null; } } - private void processFlumeMetric( - Map<Object, Map<String, MetricFamilySamples>> counterMetricMap, - List<MetricFamilySamples> mfs, - ObjectInstance obj) + private void processFlumeMetric(ObjectInstance obj) throws ClassNotFoundException, InstanceNotFoundException, IntrospectionException, ReflectionException { - Class mbeanClass = Class.forName(obj.getClassName()); - Map<String, MetricFamilySamples> metricsMap; - - if (!counterMetricMap.containsKey(mbeanClass)) { - metricsMap = new HashMap<>(); - - for (Method method : mbeanClass.getMethods()) { - String methodName = method.getName(); - if (methodName.startsWith("increment") && methodName.length() > "increment".length()) { - String counterName = PROM_DEFAULT_PREFIX + methodName.substring("increment".length()); - createCounterIfNotExists(mfs, metricsMap, counterName); - } else if (methodName.startsWith("addTo")) { - String counterName = PROM_DEFAULT_PREFIX + methodName.substring("addTo".length()); - createCounterIfNotExists(mfs, metricsMap, counterName); - } else if (methodName.startsWith("set")) { - String counterName = PROM_DEFAULT_PREFIX + methodName.substring("set".length()); - createGaugeIfNotExists(mfs, metricsMap, counterName, Arrays.asList("component")); - } + Class<?> mbeanClass = Class.forName(obj.getClassName()); + + // First pass: create counters and gauges based on method names + for (Method method : mbeanClass.getMethods()) { + String methodName = method.getName(); + if (methodName.startsWith("increment") && methodName.length() > "increment".length()) { + String counterName = PROM_DEFAULT_PREFIX + methodName.substring("increment".length()); + createCounterIfNotExists(counterName); + } else if (methodName.startsWith("addTo")) { + String counterName = PROM_DEFAULT_PREFIX + methodName.substring("addTo".length()); + createCounterIfNotExists(counterName); + } else if (methodName.startsWith("set")) { + String gaugeName = PROM_DEFAULT_PREFIX + methodName.substring("set".length()); + createGaugeIfNotExists(gaugeName); } - - counterMetricMap.put(mbeanClass, metricsMap); - - } else { - metricsMap = counterMetricMap.get(mbeanClass); } + // Second pass: get attribute values and update metrics MBeanAttributeInfo[] attrs = mbeanServer.getMBeanInfo(obj.getObjectName()).getAttributes(); String[] strAtts = new String[attrs.length]; @@ -174,26 +159,25 @@ public class PrometheusHTTPMetricsServer extends HTTPMetricsServer implements Mo for (Object attr : attrList) { Attribute localAttr = (Attribute) attr; if (!localAttr.getName().equalsIgnoreCase("type")) { - MetricFamilySamples samples = metricsMap.get(PROM_DEFAULT_PREFIX + localAttr.getName()); - if (samples instanceof CounterMetricFamily) { - ((CounterMetricFamily) samples) - .addMetric( - Arrays.asList(component), - Double.valueOf(localAttr.getValue().toString())); - } else if (samples instanceof GaugeMetricFamily) { - ((GaugeMetricFamily) samples) - .addMetric( - Arrays.asList(component), - Double.valueOf(localAttr.getValue().toString())); + String metricName = PROM_DEFAULT_PREFIX + localAttr.getName(); + double value = Double.parseDouble(localAttr.getValue().toString()); + + Counter counter = counters.get(metricName); + if (counter != null) { + // For counters, we label by component + counter.labelValues(component).inc(value); + } + + Gauge gauge = gauges.get(metricName); + if (gauge != null) { + // For gauges, we label by component + gauge.labelValues(component).set(value); } } } } - private void processKafkaMetric( - Map<Object, Map<String, MetricFamilySamples>> counterMetricMap, - List<MetricFamilySamples> mfs, - ObjectInstance obj) + private void processKafkaMetric(ObjectInstance obj) throws InstanceNotFoundException, IntrospectionException, ReflectionException { ObjectName objectName = obj.getObjectName(); @@ -201,18 +185,12 @@ public class PrometheusHTTPMetricsServer extends HTTPMetricsServer implements Mo TreeMap<String, String> properties = new TreeMap<>(); for (String key : objectName.getKeyPropertyList().keySet()) { - properties.put( - makeStringPromSafe(key), objectName.getKeyPropertyList().get(key)); + properties.put(makeStringPromSafe(key), objectName.getKeyPropertyList().get(key)); } - // We create a unique name for the metric based on the metric that came from Kafka, plus - // all of the properties. Unfortunately Kafka does not have unique metric names and therefore - // you can end up with metrics with differing property lists (which you can't have. String metricKey = qualifiedType + "_" + String.join("_", properties.keySet()) + "_"; - Map<String, MetricFamilySamples> metricsMap; - - // Get the attribute list now as we'll need it to create the gauge + // Get the attribute list now as we'll need it to create gauges MBeanAttributeInfo[] attrs = mbeanServer.getMBeanInfo(obj.getObjectName()).getAttributes(); String[] strAtts = new String[attrs.length]; @@ -220,22 +198,10 @@ public class PrometheusHTTPMetricsServer extends HTTPMetricsServer implements Mo strAtts[i] = attrs[i].getName(); } - // We pre-create each metric (once) before populating it once for each matching mbean - if (!counterMetricMap.containsKey(metricKey)) { - metricsMap = new HashMap<>(); - - for (String attr : strAtts) { - createGaugeIfNotExists( - mfs, - metricsMap, - metricKey + "_" + makeStringPromSafe(attr), - new ArrayList<>(properties.keySet())); - } - - counterMetricMap.put(metricKey, metricsMap); - - } else { - metricsMap = counterMetricMap.get(metricKey); + // Pre-create each metric (once) before populating it + for (String attr : strAtts) { + String gaugeName = metricKey + "_" + makeStringPromSafe(attr); + createGaugeIfNotExists(gaugeName); } AttributeList attrList = mbeanServer.getAttributes(obj.getObjectName(), strAtts); @@ -244,42 +210,42 @@ public class PrometheusHTTPMetricsServer extends HTTPMetricsServer implements Mo Attribute localAttr = (Attribute) attr; try { - - GaugeMetricFamily samples = (GaugeMetricFamily) - metricsMap.get(metricKey + "_" + makeStringPromSafe(localAttr.getName())); - samples.addMetric( - new ArrayList<>(properties.values()), - Double.valueOf(localAttr.getValue().toString())); + String gaugeName = metricKey + "_" + makeStringPromSafe(localAttr.getName()); + Gauge gauge = gauges.get(gaugeName); + if (gauge != null) { + double value = Double.parseDouble(localAttr.getValue().toString()); + gauge.labelValues(new ArrayList<>(properties.values()).toArray(new String[0])) + .set(value); + } } catch (Exception e) { LOG.warn("Metric {} could not be monitored", metricKey, e); } } } - // Prometeus is really unhappy with metrics with , or - in, so replace them + // Prometheus is really unhappy with metrics with , or - in, so replace them private String makeStringPromSafe(String input) { return input.replaceAll("[.\\-]", ""); } - private void createCounterIfNotExists( - List<MetricFamilySamples> mfs, Map<String, MetricFamilySamples> metricsMap, String counterName) { - if (!metricsMap.containsKey(counterName)) { - CounterMetricFamily labeledCounter = - new CounterMetricFamily(counterName, counterName, Arrays.asList("component")); - metricsMap.put(counterName, labeledCounter); - mfs.add(labeledCounter); + private void createCounterIfNotExists(String counterName) { + if (!counters.containsKey(counterName)) { + Counter counter = Counter.builder() + .name(counterName) + .help(counterName) + .labelNames("component") + .register(); + counters.put(counterName, counter); } } - private void createGaugeIfNotExists( - List<MetricFamilySamples> mfs, - Map<String, MetricFamilySamples> metricsMap, - String gaugeName, - List<String> labelNames) { - if (!metricsMap.containsKey(gaugeName)) { - GaugeMetricFamily labelledGauge = new GaugeMetricFamily(gaugeName, gaugeName, labelNames); - metricsMap.put(gaugeName, labelledGauge); - mfs.add(labelledGauge); + private void createGaugeIfNotExists(String gaugeName) { + if (!gauges.containsKey(gaugeName)) { + Gauge gauge = Gauge.builder() + .name(gaugeName) + .help(gaugeName) + .register(); + gauges.put(gaugeName, gauge); } } } diff --git a/flume-ng-core/src/main/java/org/apache/flume/source/http/BLOBHandler.java b/flume-ng-core/src/main/java/org/apache/flume/source/http/BLOBHandler.java deleted file mode 100644 index 17bd2b3da..000000000 --- a/flume-ng-core/src/main/java/org/apache/flume/source/http/BLOBHandler.java +++ /dev/null @@ -1,98 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to you under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flume.source.http; - -import com.google.common.base.Preconditions; -import java.io.InputStream; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import javax.servlet.http.HttpServletRequest; -import org.apache.commons.io.IOUtils; -import org.apache.commons.io.output.ByteArrayOutputStream; -import org.apache.flume.Context; -import org.apache.flume.Event; -import org.apache.flume.conf.LogPrivacyUtil; -import org.apache.flume.event.EventBuilder; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * - * BLOBHandler for HTTPSource that accepts any binary stream of data as event. - * - */ -public class BLOBHandler implements HTTPSourceHandler { - - private static final Logger LOG = LoggerFactory.getLogger(BLOBHandler.class); - - private String commaSeparatedHeaders; - - private String[] mandatoryHeaders; - - public static final String MANDATORY_PARAMETERS = "mandatoryParameters"; - - public static final String DEFAULT_MANDATORY_PARAMETERS = ""; - - public static final String PARAMETER_SEPARATOR = ","; - - /** - * {@inheritDoc} - */ - @SuppressWarnings("unchecked") - @Override - public List<Event> getEvents(HttpServletRequest request) throws Exception { - Map<String, String> headers = new HashMap<String, String>(); - - InputStream inputStream = request.getInputStream(); - - Map<String, String[]> parameters = request.getParameterMap(); - for (String parameter : parameters.keySet()) { - String value = parameters.get(parameter)[0]; - if (LOG.isDebugEnabled() && LogPrivacyUtil.allowLogRawData()) { - LOG.debug("Setting Header [Key, Value] as [{},{}] ", parameter, value); - } - headers.put(parameter, value); - } - - for (String header : mandatoryHeaders) { - Preconditions.checkArgument( - headers.containsKey(header), "Please specify " + header + " parameter in the request."); - } - - ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); - try { - IOUtils.copy(inputStream, outputStream); - LOG.debug("Building an Event with stream of size -- {}", outputStream.size()); - Event event = EventBuilder.withBody(outputStream.toByteArray(), headers); - event.setHeaders(headers); - List<Event> eventList = new ArrayList<Event>(); - eventList.add(event); - return eventList; - } finally { - outputStream.close(); - inputStream.close(); - } - } - - @Override - public void configure(Context context) { - this.commaSeparatedHeaders = context.getString(MANDATORY_PARAMETERS, DEFAULT_MANDATORY_PARAMETERS); - this.mandatoryHeaders = commaSeparatedHeaders.split(PARAMETER_SEPARATOR); - } -} diff --git a/flume-ng-core/src/main/java/org/apache/flume/source/http/HTTPBadRequestException.java b/flume-ng-core/src/main/java/org/apache/flume/source/http/HTTPBadRequestException.java deleted file mode 100644 index 61983382e..000000000 --- a/flume-ng-core/src/main/java/org/apache/flume/source/http/HTTPBadRequestException.java +++ /dev/null @@ -1,42 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to you under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flume.source.http; - -import org.apache.flume.FlumeException; - -/** - * - * Exception thrown by an HTTP Handler if the request was not parsed correctly - * into an event because the request was not in the expected format. - * - */ -public class HTTPBadRequestException extends FlumeException { - - private static final long serialVersionUID = -3540764742069390951L; - - public HTTPBadRequestException(String msg) { - super(msg); - } - - public HTTPBadRequestException(String msg, Throwable th) { - super(msg, th); - } - - public HTTPBadRequestException(Throwable th) { - super(th); - } -} diff --git a/flume-ng-core/src/main/java/org/apache/flume/source/http/HTTPSource.java b/flume-ng-core/src/main/java/org/apache/flume/source/http/HTTPSource.java deleted file mode 100644 index 52c29ca9e..000000000 --- a/flume-ng-core/src/main/java/org/apache/flume/source/http/HTTPSource.java +++ /dev/null @@ -1,296 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to you under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flume.source.http; - -import com.google.common.base.Preconditions; -import com.google.common.base.Throwables; -import java.io.IOException; -import java.lang.management.ManagementFactory; -import java.util.Collections; -import java.util.List; -import java.util.Map; -import javax.servlet.http.HttpServlet; -import javax.servlet.http.HttpServletRequest; -import javax.servlet.http.HttpServletResponse; -import org.apache.flume.Context; -import org.apache.flume.Event; -import org.apache.flume.EventDrivenSource; -import org.apache.flume.conf.Configurable; -import org.apache.flume.exception.ChannelException; -import org.apache.flume.instrumentation.SourceCounter; -import org.apache.flume.source.SslContextAwareAbstractSource; -import org.apache.flume.tools.FlumeBeanConfigurator; -import org.apache.flume.tools.HTTPServerConstraintUtil; -import org.eclipse.jetty.http.HttpVersion; -import org.eclipse.jetty.jmx.MBeanContainer; -import org.eclipse.jetty.server.HttpConfiguration; -import org.eclipse.jetty.server.HttpConnectionFactory; -import org.eclipse.jetty.server.SecureRequestCustomizer; -import org.eclipse.jetty.server.Server; -import org.eclipse.jetty.server.ServerConnector; -import org.eclipse.jetty.server.SslConnectionFactory; -import org.eclipse.jetty.servlet.ServletContextHandler; -import org.eclipse.jetty.servlet.ServletHolder; -import org.eclipse.jetty.util.ssl.SslContextFactory; -import org.eclipse.jetty.util.thread.QueuedThreadPool; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * A source which accepts Flume Events by HTTP POST and GET. GET should be used - * for experimentation only. HTTP requests are converted into flume events by a - * pluggable "handler" which must implement the - * {@linkplain HTTPSourceHandler} interface. This handler takes a - * {@linkplain HttpServletRequest} and returns a list of flume events. - * - * The source accepts the following parameters: <p> <tt>port</tt>: port to which - * the server should bind. Mandatory <p> <tt>handler</tt>: the class that - * deserializes a HttpServletRequest into a list of flume events. This class - * must implement HTTPSourceHandler. Default: - * {@linkplain JSONHandler}. <p> <tt>handler.*</tt> Any configuration - * to be passed to the handler. <p> - * - * All events deserialized from one Http request are committed to the channel in - * one transaction, thus allowing for increased efficiency on channels like the - * file channel. If the handler throws an exception this source will return - * a HTTP status of 400. If the channel is full, or the source is unable to - * append events to the channel, the source will return a HTTP 503 - Temporarily - * unavailable status. - * - * A JSON handler which converts JSON objects to Flume events is provided. - * - */ -public class HTTPSource extends SslContextAwareAbstractSource implements EventDrivenSource, Configurable { - /* - * There are 2 ways of doing this: - * a. Have a static server instance and use connectors in each source - * which binds to the port defined for that source. - * b. Each source starts its own server instance, which binds to the source's - * port. - * - * b is more efficient than a because Jetty does not allow binding a - * servlet to a connector. So each request will need to go through each - * each of the handlers/servlet till the correct one is found. - * - */ - - private static final Logger LOG = LoggerFactory.getLogger(HTTPSource.class); - private volatile Integer port; - private volatile Server srv; - private volatile String host; - private HTTPSourceHandler handler; - private SourceCounter sourceCounter; - - private Context sourceContext; - - @Override - public void configure(Context context) { - configureSsl(context); - sourceContext = context; - try { - port = context.getInteger(HTTPSourceConfigurationConstants.CONFIG_PORT); - host = context.getString( - HTTPSourceConfigurationConstants.CONFIG_BIND, HTTPSourceConfigurationConstants.DEFAULT_BIND); - - Preconditions.checkState(host != null && !host.isEmpty(), "HTTPSource hostname specified is empty"); - Preconditions.checkNotNull(port, "HTTPSource requires a port number to be" + " specified"); - - String handlerClassName = context.getString( - HTTPSourceConfigurationConstants.CONFIG_HANDLER, - HTTPSourceConfigurationConstants.DEFAULT_HANDLER) - .trim(); - - @SuppressWarnings("unchecked") - Class<? extends HTTPSourceHandler> clazz = - (Class<? extends HTTPSourceHandler>) Class.forName(handlerClassName); - handler = clazz.getDeclaredConstructor().newInstance(); - - Map<String, String> subProps = - context.getSubProperties(HTTPSourceConfigurationConstants.CONFIG_HANDLER_PREFIX); - handler.configure(new Context(subProps)); - } catch (ClassNotFoundException ex) { - LOG.error("Error while configuring HTTPSource. Exception follows.", ex); - Throwables.propagate(ex); - } catch (ClassCastException ex) { - LOG.error("Deserializer is not an instance of HTTPSourceHandler." - + "Deserializer must implement HTTPSourceHandler."); - Throwables.propagate(ex); - } catch (Exception ex) { - LOG.error("Error configuring HTTPSource!", ex); - Throwables.propagate(ex); - } - if (sourceCounter == null) { - sourceCounter = new SourceCounter(getName()); - } - } - - @Override - public void start() { - Preconditions.checkState( - srv == null, - "Running HTTP Server found in source: " + getName() - + " before I started one." - + "Will not attempt to start."); - QueuedThreadPool threadPool = new QueuedThreadPool(); - if (sourceContext.getSubProperties("QueuedThreadPool.").size() > 0) { - FlumeBeanConfigurator.setConfigurationFields(threadPool, sourceContext); - } - srv = new Server(threadPool); - - // Register with JMX for advanced monitoring - MBeanContainer mbContainer = new MBeanContainer(ManagementFactory.getPlatformMBeanServer()); - srv.addEventListener(mbContainer); - srv.addBean(mbContainer); - - HttpConfiguration httpConfiguration = new HttpConfiguration(); - httpConfiguration.addCustomizer(new SecureRequestCustomizer()); - - FlumeBeanConfigurator.setConfigurationFields(httpConfiguration, sourceContext); - ServerConnector connector = getSslContextSupplier() - .get() - .map(sslContext -> { - SslContextFactory sslCtxFactory = new SslContextFactory(); - sslCtxFactory.setSslContext(sslContext); - sslCtxFactory.setExcludeProtocols(getExcludeProtocols().toArray(new String[] {})); - sslCtxFactory.setIncludeProtocols(getIncludeProtocols().toArray(new String[] {})); - sslCtxFactory.setExcludeCipherSuites( - getExcludeCipherSuites().toArray(new String[] {})); - sslCtxFactory.setIncludeCipherSuites( - getIncludeCipherSuites().toArray(new String[] {})); - - FlumeBeanConfigurator.setConfigurationFields(sslCtxFactory, sourceContext); - - httpConfiguration.setSecurePort(port); - httpConfiguration.setSecureScheme("https"); - - return new ServerConnector( - srv, - new SslConnectionFactory(sslCtxFactory, HttpVersion.HTTP_1_1.asString()), - new HttpConnectionFactory(httpConfiguration)); - }) - .orElse(new ServerConnector(srv, new HttpConnectionFactory(httpConfiguration))); - - connector.setPort(port); - connector.setHost(host); - connector.setReuseAddress(true); - - FlumeBeanConfigurator.setConfigurationFields(connector, sourceContext); - - srv.addConnector(connector); - - try { - ServletContextHandler context = new ServletContextHandler(ServletContextHandler.SESSIONS); - context.setContextPath("/"); - srv.setHandler(context); - - context.addServlet(new ServletHolder(new FlumeHTTPServlet()), "/"); - context.setSecurityHandler(HTTPServerConstraintUtil.enforceConstraints()); - srv.start(); - } catch (Exception ex) { - LOG.error("Error while starting HTTPSource. Exception follows.", ex); - Throwables.propagate(ex); - } - Preconditions.checkArgument(srv.isRunning()); - sourceCounter.start(); - super.start(); - } - - @Override - public void stop() { - try { - srv.stop(); - srv.join(); - srv = null; - } catch (Exception ex) { - LOG.error("Error while stopping HTTPSource. Exception follows.", ex); - } - sourceCounter.stop(); - LOG.info("Http source {} stopped. Metrics: {}", getName(), sourceCounter); - } - - private class FlumeHTTPServlet extends HttpServlet { - - private static final long serialVersionUID = 4891924863218790344L; - - @Override - public void doPost(HttpServletRequest request, HttpServletResponse response) throws IOException { - List<Event> events = Collections.emptyList(); // create empty list - try { - events = handler.getEvents(request); - } catch (HTTPBadRequestException ex) { - LOG.warn("Received bad request from client. ", ex); - sourceCounter.incrementEventReadFail(); - response.sendError(HttpServletResponse.SC_BAD_REQUEST, "Bad request from client."); - return; - } catch (Exception ex) { - LOG.warn("Deserializer threw unexpected exception. ", ex); - sourceCounter.incrementEventReadFail(); - response.sendError( - HttpServletResponse.SC_INTERNAL_SERVER_ERROR, "Deserializer threw unexpected exception."); - return; - } - sourceCounter.incrementAppendBatchReceivedCount(); - sourceCounter.addToEventReceivedCount(events.size()); - try { - getChannelProcessor().processEventBatch(events); - } catch (ChannelException ex) { - LOG.warn( - "Error appending event to channel. " - + "Channel might be full. Consider increasing the channel " - + "capacity or make sure the sinks perform faster.", - ex); - sourceCounter.incrementChannelWriteFail(); - response.sendError( - HttpServletResponse.SC_SERVICE_UNAVAILABLE, - "Error appending event to channel. Channel might be full."); - return; - } catch (Exception ex) { - LOG.warn("Unexpected error appending event to channel. ", ex); - sourceCounter.incrementGenericProcessingFail(); - response.sendError( - HttpServletResponse.SC_INTERNAL_SERVER_ERROR, - "Unexpected error while appending event to channel."); - return; - } - response.setCharacterEncoding(request.getCharacterEncoding()); - response.setStatus(HttpServletResponse.SC_OK); - response.flushBuffer(); - sourceCounter.incrementAppendBatchAcceptedCount(); - sourceCounter.addToEventAcceptedCount(events.size()); - } - - @Override - public void doGet(HttpServletRequest request, HttpServletResponse response) throws IOException { - doPost(request, response); - } - } - - @Override - protected void configureSsl(Context context) { - handleDeprecatedParameter(context, "ssl", "enableSSL"); - handleDeprecatedParameter(context, "exclude-protocols", "excludeProtocols"); - handleDeprecatedParameter(context, "keystore-password", "keystorePassword"); - - super.configureSsl(context); - } - - private void handleDeprecatedParameter(Context context, String newParam, String oldParam) { - if (!context.containsKey(newParam) && context.containsKey(oldParam)) { - context.put(newParam, context.getString(oldParam)); - } - } -} diff --git a/flume-ng-core/src/main/java/org/apache/flume/source/http/HTTPSourceConfigurationConstants.java b/flume-ng-core/src/main/java/org/apache/flume/source/http/HTTPSourceConfigurationConstants.java deleted file mode 100644 index 0c6484ded..000000000 --- a/flume-ng-core/src/main/java/org/apache/flume/source/http/HTTPSourceConfigurationConstants.java +++ /dev/null @@ -1,45 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to you under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flume.source.http; - -/** - * - */ -public class HTTPSourceConfigurationConstants { - - public static final String CONFIG_PORT = "port"; - public static final String CONFIG_HANDLER = "handler"; - public static final String CONFIG_HANDLER_PREFIX = CONFIG_HANDLER + "."; - - public static final String CONFIG_BIND = "bind"; - - public static final String DEFAULT_BIND = "0.0.0.0"; - - public static final String DEFAULT_HANDLER = "org.apache.flume.source.http.JSONHandler"; - - @Deprecated - public static final String SSL_KEYSTORE = "keystore"; - - @Deprecated - public static final String SSL_KEYSTORE_PASSWORD = "keystorePassword"; - - @Deprecated - public static final String SSL_ENABLED = "enableSSL"; - - @Deprecated - public static final String EXCLUDE_PROTOCOLS = "excludeProtocols"; -} diff --git a/flume-ng-core/src/main/java/org/apache/flume/source/http/HTTPSourceHandler.java b/flume-ng-core/src/main/java/org/apache/flume/source/http/HTTPSourceHandler.java deleted file mode 100644 index b0aa5a079..000000000 --- a/flume-ng-core/src/main/java/org/apache/flume/source/http/HTTPSourceHandler.java +++ /dev/null @@ -1,42 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to you under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flume.source.http; - -import java.util.List; -import javax.servlet.http.HttpServletRequest; -import org.apache.flume.Event; -import org.apache.flume.conf.Configurable; - -/** - * - */ -public interface HTTPSourceHandler extends Configurable { - - /** - * Takes an {@linkplain HttpServletRequest} and returns a list of Flume - * Events. If this request cannot be parsed into Flume events based on the - * format this method will throw an exception. This method may also throw an - * exception if there is some sort of other error. <p> - * - * @param request The request to be parsed into Flume events. - * @return List of Flume events generated from the request. - * @throws HTTPBadRequestException If the was not parsed correctly into an - * event because the request was not in the expected format. - * @throws Exception If there was an unexpected error. - */ - public List<Event> getEvents(HttpServletRequest request) throws HTTPBadRequestException, Exception; -} diff --git a/flume-ng-core/src/main/java/org/apache/flume/source/http/JSONHandler.java b/flume-ng-core/src/main/java/org/apache/flume/source/http/JSONHandler.java deleted file mode 100644 index 961766a0f..000000000 --- a/flume-ng-core/src/main/java/org/apache/flume/source/http/JSONHandler.java +++ /dev/null @@ -1,133 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to you under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flume.source.http; - -import com.google.gson.Gson; -import com.google.gson.GsonBuilder; -import com.google.gson.JsonSyntaxException; -import com.google.gson.reflect.TypeToken; -import java.io.BufferedReader; -import java.lang.reflect.Type; -import java.nio.charset.UnsupportedCharsetException; -import java.util.ArrayList; -import java.util.List; -import javax.servlet.http.HttpServletRequest; -import org.apache.flume.Context; -import org.apache.flume.Event; -import org.apache.flume.event.EventBuilder; -import org.apache.flume.event.JSONEvent; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * JSONHandler for HTTPSource that accepts an array of events. - * - * This handler throws exception if the deserialization fails because of bad - * format or any other reason. - * - * Each event must be encoded as a map with two key-value pairs. <p> 1. headers - * - the key for this key-value pair is "headers". The value for this key is - * another map, which represent the event headers. These headers are inserted - * into the Flume event as is. <p> 2. body - The body is a string which - * represents the body of the event. The key for this key-value pair is "body". - * All key-value pairs are considered to be headers. An example: <p> [{"headers" - * : {"a":"b", "c":"d"},"body": "random_body"}, {"headers" : {"e": "f"},"body": - * "random_body2"}] <p> would be interpreted as the following two flume events: - * <p> * Event with body: "random_body" (in UTF-8/UTF-16/UTF-32 encoded bytes) - * and headers : (a:b, c:d) <p> * - * Event with body: "random_body2" (in UTF-8/UTF-16/UTF-32 encoded bytes) and - * headers : (e:f) <p> - * - * The charset of the body is read from the request and used. If no charset is - * set in the request, then the charset is assumed to be JSON's default - UTF-8. - * The JSON handler supports UTF-8, UTF-16 and UTF-32. - * - * To set the charset, the request must have content type specified as - * "application/json; charset=UTF-8" (replace UTF-8 with UTF-16 or UTF-32 as - * required). - * - * One way to create an event in the format expected by this handler, is to - * use {@linkplain JSONEvent} and use {@linkplain Gson} to create the JSON - * string using the - * {@linkplain Gson#toJson(java.lang.Object, java.lang.reflect.Type) } - * method. The type token to pass as the 2nd argument of this method - * for list of events can be created by: <p> - * {@code - * Type type = new TypeToken<List<JSONEvent>>() {}.getType(); - * } - */ -public class JSONHandler implements HTTPSourceHandler { - - private static final Logger LOG = LoggerFactory.getLogger(JSONHandler.class); - private final Type listType = new TypeToken<List<JSONEvent>>() {}.getType(); - private final Gson gson; - - public JSONHandler() { - gson = new GsonBuilder().disableHtmlEscaping().create(); - } - - /** - * {@inheritDoc} - */ - @Override - public List<Event> getEvents(HttpServletRequest request) throws Exception { - BufferedReader reader = request.getReader(); - String charset = request.getCharacterEncoding(); - // UTF-8 is default for JSON. If no charset is specified, UTF-8 is to - // be assumed. - if (charset == null) { - LOG.debug("Charset is null, default charset of UTF-8 will be used."); - charset = "UTF-8"; - } else if (!(charset.equalsIgnoreCase("utf-8") - || charset.equalsIgnoreCase("utf-16") - || charset.equalsIgnoreCase("utf-32"))) { - LOG.error( - "Unsupported character set in request {}. " - + "JSON handler supports UTF-8, " - + "UTF-16 and UTF-32 only.", - charset); - throw new UnsupportedCharsetException("JSON handler supports UTF-8, " + "UTF-16 and UTF-32 only."); - } - - /* - * Gson throws Exception if the data is not parseable to JSON. - * Need not catch it since the source will catch it and return error. - */ - List<Event> eventList = new ArrayList<Event>(0); - try { - eventList = gson.fromJson(reader, listType); - } catch (JsonSyntaxException ex) { - throw new HTTPBadRequestException("Request has invalid JSON Syntax.", ex); - } - - for (Event e : eventList) { - ((JSONEvent) e).setCharset(charset); - } - return getSimpleEvents(eventList); - } - - @Override - public void configure(Context context) {} - - private List<Event> getSimpleEvents(List<Event> events) { - List<Event> newEvents = new ArrayList<Event>(events.size()); - for (Event e : events) { - newEvents.add(EventBuilder.withBody(e.getBody(), e.getHeaders())); - } - return newEvents; - } -} diff --git a/flume-ng-core/src/main/java/org/apache/flume/tools/HTTPServerConstraintUtil.java b/flume-ng-core/src/main/java/org/apache/flume/tools/HTTPServerConstraintUtil.java index 8dfef6a54..d1ba83bac 100644 --- a/flume-ng-core/src/main/java/org/apache/flume/tools/HTTPServerConstraintUtil.java +++ b/flume-ng-core/src/main/java/org/apache/flume/tools/HTTPServerConstraintUtil.java @@ -16,11 +16,9 @@ */ package org.apache.flume.tools; -import org.eclipse.jetty.security.ConstraintMapping; -import org.eclipse.jetty.security.ConstraintSecurityHandler; -import org.eclipse.jetty.util.security.Constraint; - -// Most of the code in this class is copied from HBASE-10473 +import org.eclipse.jetty.ee11.servlet.security.ConstraintMapping; +import org.eclipse.jetty.ee11.servlet.security.ConstraintSecurityHandler; +import org.eclipse.jetty.security.Constraint; /** * Utility class to define constraints on Jetty HTTP servers @@ -34,22 +32,24 @@ public class HTTPServerConstraintUtil { * @return ConstraintSecurityHandler for use with Jetty servlet */ public static ConstraintSecurityHandler enforceConstraints() { - Constraint c = new Constraint(); - c.setAuthenticate(true); + // 1. Create a constraint that denies TRACE and OPTIONS access + Constraint constraint = Constraint.from("Deny Methods", Constraint.Authorization.FORBIDDEN); - ConstraintMapping cmt = new ConstraintMapping(); - cmt.setConstraint(c); - cmt.setMethod("TRACE"); - cmt.setPathSpec("/*"); + // 2. Map the constraint to methods TRACE and OPTIONS on all paths + ConstraintMapping traceMapping = new ConstraintMapping(); + traceMapping.setPathSpec("/*"); + traceMapping.setMethod("TRACE"); + traceMapping.setConstraint(constraint); - ConstraintMapping cmo = new ConstraintMapping(); - cmo.setConstraint(c); - cmo.setMethod("OPTIONS"); - cmo.setPathSpec("/*"); + ConstraintMapping optionsMapping = new ConstraintMapping(); + optionsMapping.setPathSpec("/*"); + optionsMapping.setMethod("OPTIONS"); + optionsMapping.setConstraint(constraint); - ConstraintSecurityHandler sh = new ConstraintSecurityHandler(); - sh.setConstraintMappings(new ConstraintMapping[] {cmt, cmo}); + // 3. Configure the ConstraintSecurityHandler + ConstraintSecurityHandler securityHandler = new ConstraintSecurityHandler(); + securityHandler.setConstraintMappings(new ConstraintMapping[]{traceMapping, optionsMapping}); - return sh; + return securityHandler; } } diff --git a/flume-ng-core/src/test/java/org/apache/flume/instrumentation/http/TestHTTPMetricsServer.java b/flume-ng-core/src/test/java/org/apache/flume/instrumentation/http/TestHTTPMetricsServer.java index 3cb5e4bd8..94b1ebe22 100644 --- a/flume-ng-core/src/test/java/org/apache/flume/instrumentation/http/TestHTTPMetricsServer.java +++ b/flume-ng-core/src/test/java/org/apache/flume/instrumentation/http/TestHTTPMetricsServer.java @@ -24,7 +24,7 @@ import java.lang.reflect.Type; import java.net.HttpURLConnection; import java.net.URL; import java.util.Map; -import javax.servlet.http.HttpServletResponse; +import jakarta.servlet.http.HttpServletResponse; import org.apache.flume.Context; import org.apache.flume.instrumentation.MonitorService; import org.apache.flume.instrumentation.util.JMXTestUtils; diff --git a/flume-ng-core/src/test/java/org/apache/flume/source/TestDefaultSourceFactory.java b/flume-ng-core/src/test/java/org/apache/flume/source/TestDefaultSourceFactory.java index dfd7fd7a7..b4f0e58f1 100644 --- a/flume-ng-core/src/test/java/org/apache/flume/source/TestDefaultSourceFactory.java +++ b/flume-ng-core/src/test/java/org/apache/flume/source/TestDefaultSourceFactory.java @@ -18,7 +18,6 @@ package org.apache.flume.source; import org.apache.flume.Source; import org.apache.flume.SourceFactory; -import org.apache.flume.source.http.HTTPSource; import org.junit.Assert; import org.junit.Before; import org.junit.Test; @@ -63,13 +62,9 @@ public class TestDefaultSourceFactory { verifySourceCreation("netcat-src", "netcat", NetcatSource.class); verifySourceCreation("netcat-udp-src", "netcatudp", NetcatUdpSource.class); verifySourceCreation("exec-src", "exec", ExecSource.class); - // verifySourceCreation("avro-src", "avro", AvroSource.class); verifySourceCreation("syslogtcp-src", "syslogtcp", SyslogTcpSource.class); verifySourceCreation("multiport_syslogtcp-src", "multiport_syslogtcp", MultiportSyslogTCPSource.class); verifySourceCreation("syslogudp-src", "syslogudp", SyslogUDPSource.class); - // verifySourceCreation("spooldir-src", "spooldir", SpoolDirectorySource.class); - verifySourceCreation("http-src", "http", HTTPSource.class); - // verifySourceCreation("thrift-src", "thrift", ThriftSource.class); verifySourceCreation("custom-src", MockSource.class.getCanonicalName(), MockSource.class); } } diff --git a/flume-ng-core/src/test/java/org/apache/flume/source/http/FlumeHttpServletRequestWrapper.java b/flume-ng-core/src/test/java/org/apache/flume/source/http/FlumeHttpServletRequestWrapper.java deleted file mode 100644 index e13b739fa..000000000 --- a/flume-ng-core/src/test/java/org/apache/flume/source/http/FlumeHttpServletRequestWrapper.java +++ /dev/null @@ -1,406 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to you under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flume.source.http; - -import java.io.BufferedReader; -import java.io.ByteArrayInputStream; -import java.io.IOException; -import java.io.InputStreamReader; -import java.io.UnsupportedEncodingException; -import java.security.Principal; -import java.util.Collection; -import java.util.Enumeration; -import java.util.Locale; -import java.util.Map; -import javax.servlet.AsyncContext; -import javax.servlet.DispatcherType; -import javax.servlet.RequestDispatcher; -import javax.servlet.ServletContext; -import javax.servlet.ServletException; -import javax.servlet.ServletInputStream; -import javax.servlet.ServletRequest; -import javax.servlet.ServletResponse; -import javax.servlet.http.Cookie; -import javax.servlet.http.HttpServletRequest; -import javax.servlet.http.HttpServletResponse; -import javax.servlet.http.HttpSession; -import javax.servlet.http.HttpUpgradeHandler; -import javax.servlet.http.Part; - -/** - * - */ -class FlumeHttpServletRequestWrapper implements HttpServletRequest { - - private BufferedReader reader; - - String charset; - - public FlumeHttpServletRequestWrapper(String data, String charset) throws UnsupportedEncodingException { - reader = new BufferedReader(new InputStreamReader(new ByteArrayInputStream(data.getBytes(charset)), charset)); - this.charset = charset; - } - - public FlumeHttpServletRequestWrapper(String data) throws UnsupportedEncodingException { - this(data, "UTF-8"); - } - - @Override - public String getAuthType() { - throw new UnsupportedOperationException("Not supported yet."); - } - - @Override - public Cookie[] getCookies() { - throw new UnsupportedOperationException("Not supported yet."); - } - - @Override - public long getDateHeader(String name) { - throw new UnsupportedOperationException("Not supported yet."); - } - - @Override - public String getHeader(String name) { - throw new UnsupportedOperationException("Not supported yet."); - } - - @Override - public Enumeration<String> getHeaders(String name) { - throw new UnsupportedOperationException("Not supported yet."); - } - - @Override - public Enumeration<String> getHeaderNames() { - throw new UnsupportedOperationException("Not supported yet."); - } - - @Override - public int getIntHeader(String name) { - throw new UnsupportedOperationException("Not supported yet."); - } - - @Override - public String getMethod() { - throw new UnsupportedOperationException("Not supported yet."); - } - - @Override - public String getPathInfo() { - throw new UnsupportedOperationException("Not supported yet."); - } - - @Override - public String getPathTranslated() { - throw new UnsupportedOperationException("Not supported yet."); - } - - @Override - public String getContextPath() { - throw new UnsupportedOperationException("Not supported yet."); - } - - @Override - public String getQueryString() { - throw new UnsupportedOperationException("Not supported yet."); - } - - @Override - public String getRemoteUser() { - throw new UnsupportedOperationException("Not supported yet."); - } - - @Override - public boolean isUserInRole(String role) { - throw new UnsupportedOperationException("Not supported yet."); - } - - @Override - public Principal getUserPrincipal() { - throw new UnsupportedOperationException("Not supported yet."); - } - - @Override - public String getRequestedSessionId() { - throw new UnsupportedOperationException("Not supported yet."); - } - - @Override - public String getRequestURI() { - throw new UnsupportedOperationException("Not supported yet."); - } - - @Override - public StringBuffer getRequestURL() { - throw new UnsupportedOperationException("Not supported yet."); - } - - @Override - public String getServletPath() { - throw new UnsupportedOperationException("Not supported yet."); - } - - @Override - public HttpSession getSession(boolean create) { - throw new UnsupportedOperationException("Not supported yet."); - } - - @Override - public HttpSession getSession() { - throw new UnsupportedOperationException("Not supported yet."); - } - - @Override - public boolean isRequestedSessionIdValid() { - throw new UnsupportedOperationException("Not supported yet."); - } - - @Override - public boolean isRequestedSessionIdFromCookie() { - throw new UnsupportedOperationException("Not supported yet."); - } - - @Override - public boolean isRequestedSessionIdFromURL() { - throw new UnsupportedOperationException("Not supported yet."); - } - - @Override - public boolean isRequestedSessionIdFromUrl() { - throw new UnsupportedOperationException("Not supported yet."); - } - - @Override - public Object getAttribute(String name) { - throw new UnsupportedOperationException("Not supported yet."); - } - - @Override - public Enumeration<String> getAttributeNames() { - throw new UnsupportedOperationException("Not supported yet."); - } - - @Override - public String getCharacterEncoding() { - return charset; - } - - @Override - public void setCharacterEncoding(String env) throws UnsupportedEncodingException { - this.charset = env; - } - - @Override - public int getContentLength() { - throw new UnsupportedOperationException("Not supported yet."); - } - - @Override - public String getContentType() { - throw new UnsupportedOperationException("Not supported yet."); - } - - @Override - public ServletInputStream getInputStream() throws IOException { - throw new UnsupportedOperationException("Not supported yet."); - } - - @Override - public String getParameter(String name) { - throw new UnsupportedOperationException("Not supported yet."); - } - - @Override - public Enumeration<String> getParameterNames() { - throw new UnsupportedOperationException("Not supported yet."); - } - - @Override - public String[] getParameterValues(String name) { - throw new UnsupportedOperationException("Not supported yet."); - } - - @Override - public Map<String, String[]> getParameterMap() { - throw new UnsupportedOperationException("Not supported yet."); - } - - @Override - public String getProtocol() { - throw new UnsupportedOperationException("Not supported yet."); - } - - @Override - public String getScheme() { - throw new UnsupportedOperationException("Not supported yet."); - } - - @Override - public String getServerName() { - throw new UnsupportedOperationException("Not supported yet."); - } - - @Override - public int getServerPort() { - throw new UnsupportedOperationException("Not supported yet."); - } - - @Override - public BufferedReader getReader() throws IOException { - return reader; - } - - @Override - public String getRemoteAddr() { - throw new UnsupportedOperationException("Not supported yet."); - } - - @Override - public String getRemoteHost() { - throw new UnsupportedOperationException("Not supported yet."); - } - - @Override - public void setAttribute(String name, Object o) { - throw new UnsupportedOperationException("Not supported yet."); - } - - @Override - public void removeAttribute(String name) { - throw new UnsupportedOperationException("Not supported yet."); - } - - @Override - public Locale getLocale() { - throw new UnsupportedOperationException("Not supported yet."); - } - - @Override - public Enumeration<Locale> getLocales() { - throw new UnsupportedOperationException("Not supported yet."); - } - - @Override - public boolean isSecure() { - throw new UnsupportedOperationException("Not supported yet."); - } - - @Override - public RequestDispatcher getRequestDispatcher(String path) { - throw new UnsupportedOperationException("Not supported yet."); - } - - @Override - public String getRealPath(String path) { - throw new UnsupportedOperationException("Not supported yet."); - } - - @Override - public int getRemotePort() { - throw new UnsupportedOperationException("Not supported yet."); - } - - @Override - public String getLocalName() { - throw new UnsupportedOperationException("Not supported yet."); - } - - @Override - public String getLocalAddr() { - throw new UnsupportedOperationException("Not supported yet."); - } - - @Override - public int getLocalPort() { - throw new UnsupportedOperationException("Not supported yet."); - } - - @Override - public AsyncContext getAsyncContext() { - throw new UnsupportedOperationException("Not supported yet."); - } - - @Override - public long getContentLengthLong() { - throw new UnsupportedOperationException("Not supported yet."); - } - - @Override - public DispatcherType getDispatcherType() { - throw new UnsupportedOperationException("Not supported yet."); - } - - @Override - public ServletContext getServletContext() { - throw new UnsupportedOperationException("Not supported yet."); - } - - @Override - public boolean isAsyncStarted() { - throw new UnsupportedOperationException("Not supported yet."); - } - - @Override - public boolean isAsyncSupported() { - throw new UnsupportedOperationException("Not supported yet."); - } - - @Override - public AsyncContext startAsync() throws IllegalStateException { - throw new UnsupportedOperationException("Not supported yet."); - } - - @Override - public AsyncContext startAsync(ServletRequest arg0, ServletResponse arg1) throws IllegalStateException { - throw new UnsupportedOperationException("Not supported yet."); - } - - @Override - public boolean authenticate(HttpServletResponse arg0) throws IOException, ServletException { - throw new UnsupportedOperationException("Not supported yet."); - } - - @Override - public String changeSessionId() { - throw new UnsupportedOperationException("Not supported yet."); - } - - @Override - public Part getPart(String arg0) throws IOException, ServletException { - throw new UnsupportedOperationException("Not supported yet."); - } - - @Override - public Collection<Part> getParts() throws IOException, ServletException { - throw new UnsupportedOperationException("Not supported yet."); - } - - @Override - public void login(String arg0, String arg1) throws ServletException { - throw new UnsupportedOperationException("Not supported yet."); - } - - @Override - public void logout() throws ServletException { - throw new UnsupportedOperationException("Not supported yet."); - } - - @Override - public <T extends HttpUpgradeHandler> T upgrade(Class<T> arg0) throws IOException, ServletException { - throw new UnsupportedOperationException("Not supported yet."); - } -} diff --git a/flume-ng-core/src/test/java/org/apache/flume/source/http/TestBLOBHandler.java b/flume-ng-core/src/test/java/org/apache/flume/source/http/TestBLOBHandler.java deleted file mode 100644 index 655bd7d32..000000000 --- a/flume-ng-core/src/test/java/org/apache/flume/source/http/TestBLOBHandler.java +++ /dev/null @@ -1,173 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to you under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flume.source.http; - -import static org.junit.Assert.assertEquals; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; - -import java.io.ByteArrayInputStream; -import java.io.IOException; -import java.io.InputStream; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import javax.servlet.ReadListener; -import javax.servlet.ServletInputStream; -import javax.servlet.http.HttpServletRequest; -import org.apache.flume.Context; -import org.apache.flume.Event; -import org.junit.Before; -import org.junit.Test; - -/** - * - */ -public class TestBLOBHandler { - - HTTPSourceHandler handler; - - @Before - public void setUp() { - handler = new BLOBHandler(); - } - - @SuppressWarnings({"unchecked", "rawtypes"}) - @Test - public void testCSVData() throws Exception { - Map requestParameterMap = new HashMap(); - requestParameterMap.put("param1", new String[] {"value1"}); - requestParameterMap.put("param2", new String[] {"value2"}); - - HttpServletRequest req = mock(HttpServletRequest.class); - final String csvData = "a,b,c"; - - ServletInputStream servletInputStream = - new DelegatingServletInputStream(new ByteArrayInputStream(csvData.getBytes())); - - when(req.getInputStream()).thenReturn(servletInputStream); - when(req.getParameterMap()).thenReturn(requestParameterMap); - - Context context = mock(Context.class); - when(context.getString(BLOBHandler.MANDATORY_PARAMETERS, BLOBHandler.DEFAULT_MANDATORY_PARAMETERS)) - .thenReturn("param1,param2"); - - handler.configure(context); - List<Event> deserialized = handler.getEvents(req); - assertEquals(1, deserialized.size()); - Event e = deserialized.get(0); - - assertEquals(new String(e.getBody()), csvData); - assertEquals(e.getHeaders().get("param1"), "value1"); - assertEquals(e.getHeaders().get("param2"), "value2"); - } - - @SuppressWarnings({"unchecked", "rawtypes"}) - @Test - public void testTabData() throws Exception { - Map requestParameterMap = new HashMap(); - requestParameterMap.put("param1", new String[] {"value1"}); - - HttpServletRequest req = mock(HttpServletRequest.class); - final String tabData = "a\tb\tc"; - - ServletInputStream servletInputStream = - new DelegatingServletInputStream(new ByteArrayInputStream(tabData.getBytes())); - - when(req.getInputStream()).thenReturn(servletInputStream); - when(req.getParameterMap()).thenReturn(requestParameterMap); - - Context context = mock(Context.class); - when(context.getString(BLOBHandler.MANDATORY_PARAMETERS, BLOBHandler.DEFAULT_MANDATORY_PARAMETERS)) - .thenReturn("param1"); - - handler.configure(context); - - List<Event> deserialized = handler.getEvents(req); - assertEquals(1, deserialized.size()); - Event e = deserialized.get(0); - - assertEquals(new String(e.getBody()), tabData); - assertEquals(e.getHeaders().get("param1"), "value1"); - } - - @SuppressWarnings({"rawtypes"}) - @Test(expected = IllegalArgumentException.class) - public void testMissingParameters() throws Exception { - Map requestParameterMap = new HashMap(); - - HttpServletRequest req = mock(HttpServletRequest.class); - final String tabData = "a\tb\tc"; - - ServletInputStream servletInputStream = - new DelegatingServletInputStream(new ByteArrayInputStream(tabData.getBytes())); - - when(req.getInputStream()).thenReturn(servletInputStream); - when(req.getParameterMap()).thenReturn(requestParameterMap); - - Context context = mock(Context.class); - when(context.getString(BLOBHandler.MANDATORY_PARAMETERS, BLOBHandler.DEFAULT_MANDATORY_PARAMETERS)) - .thenReturn("param1"); - - handler.configure(context); - - handler.getEvents(req); - } - - class DelegatingServletInputStream extends ServletInputStream { - - private final InputStream sourceStream; - - /** - * Create a DelegatingServletInputStream for the given source stream. - * - * @param sourceStream - * the source stream (never <code>null</code>) - */ - public DelegatingServletInputStream(InputStream sourceStream) { - this.sourceStream = sourceStream; - } - - /** - * Return the underlying source stream (never <code>null</code>). - */ - public final InputStream getSourceStream() { - return this.sourceStream; - } - - public int read() throws IOException { - return this.sourceStream.read(); - } - - public void close() throws IOException { - super.close(); - this.sourceStream.close(); - } - - public boolean isFinished() { - throw new UnsupportedOperationException("Not supported yet."); - } - - public boolean isReady() { - throw new UnsupportedOperationException("Not supported yet."); - } - - public void setReadListener(ReadListener arg0) { - throw new UnsupportedOperationException("Not supported yet."); - } - } -} diff --git a/flume-ng-core/src/test/java/org/apache/flume/source/http/TestHTTPSource.java b/flume-ng-core/src/test/java/org/apache/flume/source/http/TestHTTPSource.java deleted file mode 100644 index 7fda74cf0..000000000 --- a/flume-ng-core/src/test/java/org/apache/flume/source/http/TestHTTPSource.java +++ /dev/null @@ -1,732 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to you under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flume.source.http; - -import static org.fest.reflect.core.Reflection.field; -import static org.mockito.ArgumentMatchers.anyList; -import static org.mockito.Mockito.doThrow; - -import com.google.common.collect.Maps; -import com.google.gson.Gson; -import com.google.gson.reflect.TypeToken; -import java.io.IOException; -import java.lang.management.ManagementFactory; -import java.lang.reflect.Type; -import java.net.HttpURLConnection; -import java.net.InetAddress; -import java.net.ServerSocket; -import java.net.Socket; -import java.net.URL; -import java.net.UnknownHostException; -import java.security.SecureRandom; -import java.security.cert.CertificateException; -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; -import java.util.Map; -import java.util.Random; -import java.util.Set; -import javax.management.MBeanServer; -import javax.management.MalformedObjectNameException; -import javax.management.ObjectInstance; -import javax.management.ObjectName; -import javax.management.Query; -import javax.management.QueryExp; -import javax.net.ssl.HostnameVerifier; -import javax.net.ssl.HttpsURLConnection; -import javax.net.ssl.SSLContext; -import javax.net.ssl.SSLSession; -import javax.net.ssl.SSLSocket; -import javax.net.ssl.TrustManager; -import javax.net.ssl.X509TrustManager; -import javax.servlet.http.HttpServletResponse; -import org.apache.flume.Channel; -import org.apache.flume.ChannelSelector; -import org.apache.flume.Context; -import org.apache.flume.Event; -import org.apache.flume.Transaction; -import org.apache.flume.channel.ChannelProcessor; -import org.apache.flume.channel.MemoryChannel; -import org.apache.flume.channel.ReplicatingChannelSelector; -import org.apache.flume.conf.Configurables; -import org.apache.flume.event.JSONEvent; -import org.apache.flume.instrumentation.SourceCounter; -import org.apache.flume.util.Whitebox; -import org.apache.http.HttpResponse; -import org.apache.http.client.HttpClient; -import org.apache.http.client.methods.HttpOptions; -import org.apache.http.client.methods.HttpPost; -import org.apache.http.client.methods.HttpRequestBase; -import org.apache.http.client.methods.HttpTrace; -import org.apache.http.conn.ssl.NoopHostnameVerifier; -import org.apache.http.entity.StringEntity; -import org.apache.http.impl.client.HttpClientBuilder; -import org.junit.After; -import org.junit.AfterClass; -import org.junit.Assert; -import org.junit.Before; -import org.junit.BeforeClass; -import org.junit.Test; -import org.mockito.Mockito; - -/** - * - */ -public class TestHTTPSource { - - private static HTTPSource httpSource; - private static HTTPSource httpsSource; - private static HTTPSource httpsGlobalKeystoreSource; - - private static Channel httpChannel; - private static Channel httpsChannel; - private static Channel httpsGlobalKeystoreChannel; - private static int httpPort; - private static int httpsPort; - private static int httpsGlobalKeystorePort; - private HttpClient httpClient; - private HttpPost postRequest; - - private static int findFreePort() throws IOException { - ServerSocket socket = new ServerSocket(0); - int port = socket.getLocalPort(); - socket.close(); - return port; - } - - private static Context getDefaultNonSecureContext(int port) throws IOException { - Context ctx = new Context(); - ctx.put(HTTPSourceConfigurationConstants.CONFIG_BIND, "0.0.0.0"); - ctx.put(HTTPSourceConfigurationConstants.CONFIG_PORT, String.valueOf(port)); - ctx.put("QueuedThreadPool.MaxThreads", "100"); - return ctx; - } - - private static Context getDefaultSecureContext(int port) throws IOException { - Context sslContext = new Context(); - sslContext.put(HTTPSourceConfigurationConstants.CONFIG_PORT, String.valueOf(port)); - sslContext.put(HTTPSourceConfigurationConstants.SSL_ENABLED, "true"); - sslContext.put(HTTPSourceConfigurationConstants.SSL_KEYSTORE_PASSWORD, "password"); - sslContext.put(HTTPSourceConfigurationConstants.SSL_KEYSTORE, "src/test/resources/jettykeystore"); - return sslContext; - } - - private static Context getDefaultSecureContextGlobalKeystore(int port) throws IOException { - System.setProperty("javax.net.ssl.keyStore", "src/test/resources/jettykeystore"); - System.setProperty("javax.net.ssl.keyStorePassword", "password"); - - Context sslContext = new Context(); - sslContext.put(HTTPSourceConfigurationConstants.CONFIG_PORT, String.valueOf(port)); - sslContext.put(HTTPSourceConfigurationConstants.SSL_ENABLED, "true"); - return sslContext; - } - - @BeforeClass - public static void setUpClass() throws Exception { - httpSource = new HTTPSource(); - httpChannel = new MemoryChannel(); - httpPort = findFreePort(); - configureSourceAndChannel(httpSource, httpChannel, getDefaultNonSecureContext(httpPort)); - httpChannel.start(); - httpSource.start(); - - httpsSource = new HTTPSource(); - httpsChannel = new MemoryChannel(); - httpsPort = findFreePort(); - configureSourceAndChannel(httpsSource, httpsChannel, getDefaultSecureContext(httpsPort)); - httpsChannel.start(); - httpsSource.start(); - - httpsGlobalKeystoreSource = new HTTPSource(); - httpsGlobalKeystoreChannel = new MemoryChannel(); - httpsGlobalKeystorePort = findFreePort(); - configureSourceAndChannel( - httpsGlobalKeystoreSource, - httpsGlobalKeystoreChannel, - getDefaultSecureContextGlobalKeystore(httpsGlobalKeystorePort)); - httpsGlobalKeystoreChannel.start(); - httpsGlobalKeystoreSource.start(); - - System.clearProperty("javax.net.ssl.keyStore"); - System.clearProperty("javax.net.ssl.keyStorePassword"); - } - - private static void configureSourceAndChannel(HTTPSource source, Channel channel, Context context) { - Context channelContext = new Context(); - channelContext.put("capacity", "100"); - Configurables.configure(channel, channelContext); - Configurables.configure(source, context); - - ChannelSelector rcs1 = new ReplicatingChannelSelector(); - rcs1.setChannels(Collections.singletonList(channel)); - - source.setChannelProcessor(new ChannelProcessor(rcs1)); - } - - @AfterClass - public static void tearDownClass() throws Exception { - httpSource.stop(); - httpChannel.stop(); - httpsSource.stop(); - httpsChannel.stop(); - httpsGlobalKeystoreSource.stop(); - httpsGlobalKeystoreChannel.stop(); - } - - @Before - public void setUp() { - HttpClientBuilder builder = HttpClientBuilder.create(); - httpClient = builder.build(); - postRequest = new HttpPost("http://0.0.0.0:" + httpPort); - SourceCounter sc = (SourceCounter) Whitebox.getInternalState(httpSource, "sourceCounter"); - sc.start(); - } - - @After - public void tearDown() { - SourceCounter sc = (SourceCounter) Whitebox.getInternalState(httpSource, "sourceCounter"); - sc.stop(); - } - - @Test - public void testSimple() throws IOException, InterruptedException { - - StringEntity input = new StringEntity("[{\"headers\":{\"a\": \"b\"},\"body\": \"random_body\"}," - + "{\"headers\":{\"e\": \"f\"},\"body\": \"random_body2\"}]"); - // if we do not set the content type to JSON, the client will use - // ISO-8859-1 as the charset. JSON standard does not support this. - input.setContentType("application/json"); - postRequest.setEntity(input); - - HttpResponse response = httpClient.execute(postRequest); - - Assert.assertEquals(HttpServletResponse.SC_OK, response.getStatusLine().getStatusCode()); - Transaction tx = httpChannel.getTransaction(); - tx.begin(); - Event e = httpChannel.take(); - Assert.assertNotNull(e); - Assert.assertEquals("b", e.getHeaders().get("a")); - Assert.assertEquals("random_body", new String(e.getBody(), "UTF-8")); - - e = httpChannel.take(); - Assert.assertNotNull(e); - Assert.assertEquals("f", e.getHeaders().get("e")); - Assert.assertEquals("random_body2", new String(e.getBody(), "UTF-8")); - tx.commit(); - tx.close(); - } - - @Test - public void testTrace() throws Exception { - doTestForbidden(new HttpTrace("http://0.0.0.0:" + httpPort)); - } - - @Test - public void testOptions() throws Exception { - doTestForbidden(new HttpOptions("http://0.0.0.0:" + httpPort)); - } - - private void doTestForbidden(HttpRequestBase request) throws Exception { - HttpResponse response = httpClient.execute(request); - Assert.assertEquals( - HttpServletResponse.SC_FORBIDDEN, response.getStatusLine().getStatusCode()); - } - - @Test - public void testSimpleUTF16() throws IOException, InterruptedException { - - StringEntity input = new StringEntity( - "[{\"headers\":{\"a\": \"b\"},\"body\": \"random_body\"}," - + "{\"headers\":{\"e\": \"f\"},\"body\": \"random_body2\"}]", - "UTF-16"); - input.setContentType("application/json; charset=utf-16"); - postRequest.setEntity(input); - - HttpResponse response = httpClient.execute(postRequest); - - Assert.assertEquals(HttpServletResponse.SC_OK, response.getStatusLine().getStatusCode()); - Transaction tx = httpChannel.getTransaction(); - tx.begin(); - Event e = httpChannel.take(); - Assert.assertNotNull(e); - Assert.assertEquals("b", e.getHeaders().get("a")); - Assert.assertEquals("random_body", new String(e.getBody(), "UTF-16")); - - e = httpChannel.take(); - Assert.assertNotNull(e); - Assert.assertEquals("f", e.getHeaders().get("e")); - Assert.assertEquals("random_body2", new String(e.getBody(), "UTF-16")); - tx.commit(); - tx.close(); - } - - @Test - public void testInvalid() throws Exception { - StringEntity input = new StringEntity("[{\"a\": \"b\",[\"d\":\"e\"],\"body\": \"random_body\"}," - + "{\"e\": \"f\",\"body\": \"random_body2\"}]"); - input.setContentType("application/json"); - postRequest.setEntity(input); - HttpResponse response = httpClient.execute(postRequest); - - Assert.assertEquals( - HttpServletResponse.SC_BAD_REQUEST, response.getStatusLine().getStatusCode()); - SourceCounter sc = (SourceCounter) Whitebox.getInternalState(httpSource, "sourceCounter"); - Assert.assertEquals(1, sc.getEventReadFail()); - } - - @Test - public void testBigBatchDeserializationUTF8() throws Exception { - testBatchWithVariousEncoding("UTF-8"); - } - - @Test - public void testBigBatchDeserializationUTF16() throws Exception { - testBatchWithVariousEncoding("UTF-16"); - } - - @Test - public void testBigBatchDeserializationUTF32() throws Exception { - testBatchWithVariousEncoding("UTF-32"); - } - - @Test - public void testCounterGenericFail() throws Exception { - ChannelProcessor cp = Mockito.mock(ChannelProcessor.class); - doThrow(new RuntimeException("dummy")).when(cp).processEventBatch(anyList()); - ChannelProcessor oldCp = httpSource.getChannelProcessor(); - httpSource.setChannelProcessor(cp); - testBatchWithVariousEncoding("UTF-8"); - SourceCounter sc = (SourceCounter) Whitebox.getInternalState(httpSource, "sourceCounter"); - Assert.assertEquals(1, sc.getGenericProcessingFail()); - httpSource.setChannelProcessor(oldCp); - } - - @Test - public void testSingleEvent() throws Exception { - StringEntity input = new StringEntity("[{\"headers\" : {\"a\": \"b\"},\"body\":" + " \"random_body\"}]"); - input.setContentType("application/json"); - postRequest.setEntity(input); - - httpClient.execute(postRequest); - Transaction tx = httpChannel.getTransaction(); - tx.begin(); - Event e = httpChannel.take(); - Assert.assertNotNull(e); - Assert.assertEquals("b", e.getHeaders().get("a")); - Assert.assertEquals("random_body", new String(e.getBody(), "UTF-8")); - tx.commit(); - tx.close(); - } - - /** - * First test that the unconfigured behaviour is as-expected, then add configurations - * to a new channel and observe the difference. - * For some of the properties, the most convenient way to test is using the MBean interface - * We test all of HttpConfiguration, ServerConnector, QueuedThreadPool and SslContextFactory - * sub-configurations (but not all properties) - */ - @Test - public void testConfigurables() throws Exception { - StringEntity input = new StringEntity("[{\"headers\" : {\"a\": \"b\"},\"body\":" + " \"random_body\"}]"); - input.setContentType("application/json"); - postRequest.setEntity(input); - - HttpResponse resp = httpClient.execute(postRequest); - - // Testing default behaviour (to not provided X-Powered-By, but to provide Server headers) - Assert.assertTrue(resp.getHeaders("X-Powered-By").length == 0); - Assert.assertTrue(resp.getHeaders("Server").length == 1); - - Transaction tx = httpChannel.getTransaction(); - tx.begin(); - Event e = httpChannel.take(); - Assert.assertNotNull(e); - tx.commit(); - tx.close(); - Assert.assertTrue(findMBeans("org.eclipse.jetty.util.thread:type=queuedthreadpool,*", "maxThreads", 123) - .size() - == 0); - Assert.assertTrue(findMBeans("org.eclipse.jetty.server:type=serverconnector,*", "acceptQueueSize", 22) - .size() - == 0); - - int newPort = findFreePort(); - Context configuredSourceContext = getDefaultNonSecureContext(newPort); - configuredSourceContext.put("HttpConfiguration.sendServerVersion", "false"); - configuredSourceContext.put("HttpConfiguration.sendXPoweredBy", "true"); - configuredSourceContext.put("ServerConnector.acceptQueueSize", "22"); - configuredSourceContext.put("QueuedThreadPool.maxThreads", "123"); - - HTTPSource newSource = new HTTPSource(); - Channel newChannel = new MemoryChannel(); - configureSourceAndChannel(newSource, newChannel, configuredSourceContext); - newChannel.start(); - newSource.start(); - - HttpPost newPostRequest = new HttpPost("http://0.0.0.0:" + newPort); - - resp = httpClient.execute(newPostRequest); - Assert.assertTrue(resp.getHeaders("X-Powered-By").length > 0); - Assert.assertTrue(resp.getHeaders("Server").length == 0); - Assert.assertTrue(findMBeans("org.eclipse.jetty.util.thread:type=queuedthreadpool,*", "maxThreads", 123) - .size() - == 1); - Assert.assertTrue(findMBeans("org.eclipse.jetty.server:type=serverconnector,*", "acceptQueueSize", 22) - .size() - == 1); - - newSource.stop(); - newChannel.stop(); - - // Configure SslContextFactory with junk protocols (expect failure) - newPort = findFreePort(); - configuredSourceContext = getDefaultSecureContext(newPort); - configuredSourceContext.put("SslContextFactory.IncludeProtocols", "abc def"); - - newSource = new HTTPSource(); - newChannel = new MemoryChannel(); - - configureSourceAndChannel(newSource, newChannel, configuredSourceContext); - - newChannel.start(); - newSource.start(); - - newPostRequest = new HttpPost("http://0.0.0.0:" + newPort); - try { - doTestHttps(null, newPort, httpsChannel); - // We are testing that this fails because we've deliberately configured the wrong protocols - Assert.assertTrue(false); - } catch (AssertionError ex) { - // no-op - } - newSource.stop(); - newChannel.stop(); - } - - @Test - public void testFullChannel() throws Exception { - HttpResponse response = putWithEncoding("UTF-8", 150).response; - Assert.assertEquals( - HttpServletResponse.SC_SERVICE_UNAVAILABLE, - response.getStatusLine().getStatusCode()); - SourceCounter sc = (SourceCounter) Whitebox.getInternalState(httpSource, "sourceCounter"); - Assert.assertEquals(1, sc.getChannelWriteFail()); - } - - @Test - public void testFail() throws Exception { - HTTPSourceHandler handler = - field("handler").ofType(HTTPSourceHandler.class).in(httpSource).get(); - // Cause an exception in the source - this is equivalent to any exception - // thrown by the handler since the handler is called inside a try-catch - field("handler").ofType(HTTPSourceHandler.class).in(httpSource).set(null); - HttpResponse response = putWithEncoding("UTF-8", 1).response; - Assert.assertEquals( - HttpServletResponse.SC_INTERNAL_SERVER_ERROR, - response.getStatusLine().getStatusCode()); - // Set the original handler back so tests don't fail after this runs. - field("handler").ofType(HTTPSourceHandler.class).in(httpSource).set(handler); - } - - @Test - public void testMBeans() throws Exception { - MBeanServer mbeanServer = ManagementFactory.getPlatformMBeanServer(); - ObjectName objectName = new ObjectName("org.eclipse.jetty.*:*"); - Set<ObjectInstance> queryMBeans = mbeanServer.queryMBeans(objectName, null); - Assert.assertTrue(queryMBeans.size() > 0); - } - - @Test - public void testHandlerThrowingException() throws Exception { - // This will cause the handler to throw an - // UnsupportedCharsetException. - HttpResponse response = putWithEncoding("ISO-8859-1", 150).response; - Assert.assertEquals( - HttpServletResponse.SC_INTERNAL_SERVER_ERROR, - response.getStatusLine().getStatusCode()); - } - - private Set<ObjectInstance> findMBeans(String name, String attribute, int value) - throws MalformedObjectNameException { - MBeanServer mbeanServer = ManagementFactory.getPlatformMBeanServer(); - ObjectName objectName = new ObjectName(name); - QueryExp q = Query.eq(Query.attr(attribute), Query.value(value)); - return mbeanServer.queryMBeans(objectName, q); - } - - private ResultWrapper putWithEncoding(String encoding, int n) throws Exception { - Type listType = new TypeToken<List<JSONEvent>>() {}.getType(); - List<JSONEvent> events = new ArrayList<JSONEvent>(); - Random rand = new Random(); - for (int i = 0; i < n; i++) { - Map<String, String> input = Maps.newHashMap(); - for (int j = 0; j < 10; j++) { - input.put(String.valueOf(i) + String.valueOf(j), String.valueOf(i)); - } - JSONEvent e = new JSONEvent(); - e.setHeaders(input); - e.setBody(String.valueOf(rand.nextGaussian()).getBytes(encoding)); - events.add(e); - } - Gson gson = new Gson(); - String json = gson.toJson(events, listType); - StringEntity input = new StringEntity(json); - input.setContentType("application/json; charset=" + encoding); - postRequest.setEntity(input); - HttpResponse resp = httpClient.execute(postRequest); - return new ResultWrapper(resp, events); - } - - @Test - public void testHttps() throws Exception { - doTestHttps(null, httpsPort, httpsChannel); - } - - @Test(expected = javax.net.ssl.SSLHandshakeException.class) - public void testHttpsSSLv3() throws Exception { - doTestHttps("SSLv3", httpsPort, httpsChannel); - } - - @Test - public void testHttpsGlobalKeystore() throws Exception { - doTestHttps(null, httpsGlobalKeystorePort, httpsGlobalKeystoreChannel); - } - - private void doTestHttps(String protocol, int port, Channel channel) throws Exception { - Type listType = new TypeToken<List<JSONEvent>>() {}.getType(); - List<JSONEvent> events = new ArrayList<JSONEvent>(); - Random rand = new Random(); - for (int i = 0; i < 10; i++) { - Map<String, String> input = Maps.newHashMap(); - for (int j = 0; j < 10; j++) { - input.put(String.valueOf(i) + String.valueOf(j), String.valueOf(i)); - } - input.put("MsgNum", String.valueOf(i)); - JSONEvent e = new JSONEvent(); - e.setHeaders(input); - e.setBody(String.valueOf(rand.nextGaussian()).getBytes("UTF-8")); - events.add(e); - } - Gson gson = new Gson(); - String json = gson.toJson(events, listType); - HttpsURLConnection httpsURLConnection = null; - Transaction transaction = null; - try { - TrustManager[] trustAllCerts = { - new X509TrustManager() { - @Override - public void checkClientTrusted(java.security.cert.X509Certificate[] x509Certificates, String s) - throws CertificateException { - // noop - } - - @Override - public void checkServerTrusted(java.security.cert.X509Certificate[] x509Certificates, String s) - throws CertificateException { - // noop - } - - public java.security.cert.X509Certificate[] getAcceptedIssuers() { - return null; - } - } - }; - - SSLContext sc = null; - javax.net.ssl.SSLSocketFactory factory = null; - if (System.getProperty("java.vendor").contains("IBM")) { - sc = SSLContext.getInstance("SSL_TLS"); - } else { - sc = SSLContext.getInstance("SSL"); - } - - HostnameVerifier hv = new HostnameVerifier() { - public boolean verify(String arg0, SSLSession arg1) { - return true; - } - }; - sc.init(null, trustAllCerts, new SecureRandom()); - - if (protocol != null) { - factory = new DisabledProtocolsSocketFactory(sc.getSocketFactory(), protocol); - } else { - factory = sc.getSocketFactory(); - } - HttpsURLConnection.setDefaultSSLSocketFactory(factory); - HttpsURLConnection.setDefaultHostnameVerifier(NoopHostnameVerifier.INSTANCE); - URL sslUrl = new URL("https://0.0.0.0:" + port); - httpsURLConnection = (HttpsURLConnection) sslUrl.openConnection(); - httpsURLConnection.setDoInput(true); - httpsURLConnection.setDoOutput(true); - httpsURLConnection.setRequestMethod("POST"); - httpsURLConnection.getOutputStream().write(json.getBytes()); - - int statusCode = httpsURLConnection.getResponseCode(); - Assert.assertEquals(200, statusCode); - - transaction = channel.getTransaction(); - transaction.begin(); - for (int i = 0; i < 10; i++) { - Event e = channel.take(); - Assert.assertNotNull(e); - Assert.assertEquals(String.valueOf(i), e.getHeaders().get("MsgNum")); - } - - } finally { - if (transaction != null) { - transaction.commit(); - transaction.close(); - } - httpsURLConnection.disconnect(); - } - } - - @Test - public void testHttpsSourceNonHttpsClient() throws Exception { - Type listType = new TypeToken<List<JSONEvent>>() {}.getType(); - List<JSONEvent> events = new ArrayList<JSONEvent>(); - Random rand = new Random(); - for (int i = 0; i < 10; i++) { - Map<String, String> input = Maps.newHashMap(); - for (int j = 0; j < 10; j++) { - input.put(String.valueOf(i) + String.valueOf(j), String.valueOf(i)); - } - input.put("MsgNum", String.valueOf(i)); - JSONEvent e = new JSONEvent(); - e.setHeaders(input); - e.setBody(String.valueOf(rand.nextGaussian()).getBytes("UTF-8")); - events.add(e); - } - Gson gson = new Gson(); - String json = gson.toJson(events, listType); - HttpURLConnection httpURLConnection = null; - try { - URL url = new URL("http://0.0.0.0:" + httpsPort); - httpURLConnection = (HttpURLConnection) url.openConnection(); - httpURLConnection.setDoInput(true); - httpURLConnection.setDoOutput(true); - httpURLConnection.setRequestMethod("POST"); - httpURLConnection.getOutputStream().write(json.getBytes()); - httpURLConnection.getResponseCode(); - - Assert.fail("HTTP Client cannot connect to HTTPS source"); - } catch (Exception exception) { - Assert.assertTrue("Exception expected", true); - } finally { - httpURLConnection.disconnect(); - } - } - - private void takeWithEncoding(String encoding, int n, List<JSONEvent> events) throws Exception { - Transaction tx = httpChannel.getTransaction(); - tx.begin(); - Event e = null; - int i = 0; - while (true) { - e = httpChannel.take(); - if (e == null) { - break; - } - Event current = events.get(i++); - Assert.assertEquals(new String(current.getBody(), encoding), new String(e.getBody(), encoding)); - Assert.assertEquals(current.getHeaders(), e.getHeaders()); - } - Assert.assertEquals(n, events.size()); - tx.commit(); - tx.close(); - } - - private void testBatchWithVariousEncoding(String encoding) throws Exception { - testBatchWithVariousEncoding(encoding, 50); - } - - private void testBatchWithVariousEncoding(String encoding, int n) throws Exception { - List<JSONEvent> events = putWithEncoding(encoding, n).events; - takeWithEncoding(encoding, n, events); - } - - private class ResultWrapper { - public final HttpResponse response; - public final List<JSONEvent> events; - - public ResultWrapper(HttpResponse resp, List<JSONEvent> events) { - this.response = resp; - this.events = events; - } - } - - private class DisabledProtocolsSocketFactory extends javax.net.ssl.SSLSocketFactory { - - private final javax.net.ssl.SSLSocketFactory socketFactory; - private final String[] protocols; - - DisabledProtocolsSocketFactory(javax.net.ssl.SSLSocketFactory factory, String protocol) { - this.socketFactory = factory; - protocols = new String[1]; - protocols[0] = protocol; - } - - @Override - public String[] getDefaultCipherSuites() { - return socketFactory.getDefaultCipherSuites(); - } - - @Override - public String[] getSupportedCipherSuites() { - return socketFactory.getSupportedCipherSuites(); - } - - @Override - public Socket createSocket(Socket socket, String s, int i, boolean b) throws IOException { - SSLSocket sc = (SSLSocket) socketFactory.createSocket(socket, s, i, b); - sc.setEnabledProtocols(protocols); - return sc; - } - - @Override - public Socket createSocket(String s, int i) throws IOException, UnknownHostException { - SSLSocket sc = (SSLSocket) socketFactory.createSocket(s, i); - sc.setEnabledProtocols(protocols); - return sc; - } - - @Override - public Socket createSocket(String s, int i, InetAddress inetAddress, int i2) - throws IOException, UnknownHostException { - SSLSocket sc = (SSLSocket) socketFactory.createSocket(s, i, inetAddress, i2); - sc.setEnabledProtocols(protocols); - return sc; - } - - @Override - public Socket createSocket(InetAddress inetAddress, int i) throws IOException { - SSLSocket sc = (SSLSocket) socketFactory.createSocket(inetAddress, i); - sc.setEnabledProtocols(protocols); - return sc; - } - - @Override - public Socket createSocket(InetAddress inetAddress, int i, InetAddress inetAddress2, int i2) - throws IOException { - SSLSocket sc = (SSLSocket) socketFactory.createSocket( - inetAddress, i, - inetAddress2, i2); - sc.setEnabledProtocols(protocols); - return sc; - } - } -} diff --git a/flume-ng-core/src/test/java/org/apache/flume/source/http/TestJSONHandler.java b/flume-ng-core/src/test/java/org/apache/flume/source/http/TestJSONHandler.java deleted file mode 100644 index d5f90f7e2..000000000 --- a/flume-ng-core/src/test/java/org/apache/flume/source/http/TestJSONHandler.java +++ /dev/null @@ -1,235 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to you under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flume.source.http; - -import com.google.common.collect.Lists; -import com.google.common.collect.Maps; -import com.google.gson.Gson; -import com.google.gson.reflect.TypeToken; -import java.lang.reflect.Type; -import java.nio.charset.UnsupportedCharsetException; -import java.util.List; -import java.util.Map; -import java.util.Random; -import javax.servlet.http.HttpServletRequest; -import junit.framework.Assert; -import org.apache.flume.Event; -import org.apache.flume.event.JSONEvent; -import org.junit.Before; -import org.junit.Test; - -/** - * - */ -public class TestJSONHandler { - - HTTPSourceHandler handler; - - @Before - public void setUp() { - handler = new JSONHandler(); - } - - @Test - public void testMultipleEvents() throws Exception { - String json = "[{\"headers\":{\"a\": \"b\"},\"body\": \"random_body\"}," - + "{\"headers\":{\"e\": \"f\"},\"body\": \"random_body2\"}]"; - HttpServletRequest req = new FlumeHttpServletRequestWrapper(json); - List<Event> deserialized = handler.getEvents(req); - Event e = deserialized.get(0); - Assert.assertEquals("b", e.getHeaders().get("a")); - Assert.assertEquals("random_body", new String(e.getBody(), "UTF-8")); - e = deserialized.get(1); - Assert.assertEquals("f", e.getHeaders().get("e")); - Assert.assertEquals("random_body2", new String(e.getBody(), "UTF-8")); - } - - @Test - public void testMultipleEventsUTF16() throws Exception { - String json = "[{\"headers\":{\"a\": \"b\"},\"body\": \"random_body\"}," - + "{\"headers\":{\"e\": \"f\"},\"body\": \"random_body2\"}]"; - HttpServletRequest req = new FlumeHttpServletRequestWrapper(json, "UTF-16"); - List<Event> deserialized = handler.getEvents(req); - Event e = deserialized.get(0); - Assert.assertEquals("b", e.getHeaders().get("a")); - Assert.assertEquals("random_body", new String(e.getBody(), "UTF-16")); - e = deserialized.get(1); - Assert.assertEquals("f", e.getHeaders().get("e")); - Assert.assertEquals("random_body2", new String(e.getBody(), "UTF-16")); - } - - @Test - public void testMultipleEventsUTF32() throws Exception { - String json = "[{\"headers\":{\"a\": \"b\"},\"body\": \"random_body\"}," - + "{\"headers\":{\"e\": \"f\"},\"body\": \"random_body2\"}]"; - HttpServletRequest req = new FlumeHttpServletRequestWrapper(json, "UTF-32"); - List<Event> deserialized = handler.getEvents(req); - Event e = deserialized.get(0); - Assert.assertEquals("b", e.getHeaders().get("a")); - Assert.assertEquals("random_body", new String(e.getBody(), "UTF-32")); - e = deserialized.get(1); - Assert.assertEquals("f", e.getHeaders().get("e")); - Assert.assertEquals("random_body2", new String(e.getBody(), "UTF-32")); - } - - @Test - public void testMultipleEventsUTF8() throws Exception { - String json = "[{\"headers\":{\"a\": \"b\"},\"body\": \"random_body\"}," - + "{\"headers\":{\"e\": \"f\"},\"body\": \"random_body2\"}]"; - HttpServletRequest req = new FlumeHttpServletRequestWrapper(json, "UTF-8"); - List<Event> deserialized = handler.getEvents(req); - Event e = deserialized.get(0); - Assert.assertEquals("b", e.getHeaders().get("a")); - Assert.assertEquals("random_body", new String(e.getBody(), "UTF-8")); - e = deserialized.get(1); - Assert.assertEquals("f", e.getHeaders().get("e")); - Assert.assertEquals("random_body2", new String(e.getBody(), "UTF-8")); - } - - @Test - public void testEscapedJSON() throws Exception { - // JSON allows escaping double quotes to add it in the data. - String json = "[{\"headers\":{\"a\": \"b\"}}," + "{\"headers\":{\"e\": \"f\"},\"body\": \"rand\\\"om_body2\"}]"; - HttpServletRequest req = new FlumeHttpServletRequestWrapper(json); - List<Event> deserialized = handler.getEvents(req); - Event e = deserialized.get(0); - Assert.assertEquals("b", e.getHeaders().get("a")); - Assert.assertTrue(e.getBody().length == 0); - e = deserialized.get(1); - Assert.assertEquals("f", e.getHeaders().get("e")); - Assert.assertEquals("rand\"om_body2", new String(e.getBody(), "UTF-8")); - } - - @Test - public void testNoBody() throws Exception { - String json = "[{\"headers\" : {\"a\": \"b\"}}," + "{\"headers\" : {\"e\": \"f\"},\"body\": \"random_body2\"}]"; - HttpServletRequest req = new FlumeHttpServletRequestWrapper(json); - List<Event> deserialized = handler.getEvents(req); - Event e = deserialized.get(0); - Assert.assertEquals("b", e.getHeaders().get("a")); - Assert.assertTrue(e.getBody().length == 0); - e = deserialized.get(1); - Assert.assertEquals("f", e.getHeaders().get("e")); - Assert.assertEquals("random_body2", new String(e.getBody(), "UTF-8")); - } - - @Test - public void testSingleHTMLEvent() throws Exception { - String json = "[{\"headers\": {\"a\": \"b\"}," + "\"body\": \"<html><body>test</body></html>\"}]"; - HttpServletRequest req = new FlumeHttpServletRequestWrapper(json); - List<Event> deserialized = handler.getEvents(req); - Event e = deserialized.get(0); - Assert.assertEquals("b", e.getHeaders().get("a")); - Assert.assertEquals("<html><body>test</body></html>", new String(e.getBody(), "UTF-8")); - } - - @Test - public void testSingleEvent() throws Exception { - String json = "[{\"headers\" : {\"a\": \"b\"},\"body\": \"random_body\"}]"; - HttpServletRequest req = new FlumeHttpServletRequestWrapper(json); - List<Event> deserialized = handler.getEvents(req); - Event e = deserialized.get(0); - Assert.assertEquals("b", e.getHeaders().get("a")); - Assert.assertEquals("random_body", new String(e.getBody(), "UTF-8")); - } - - @Test(expected = HTTPBadRequestException.class) - public void testBadEvent() throws Exception { - String json = "{[\"a\": \"b\"],\"body\": \"random_body\"}"; - HttpServletRequest req = new FlumeHttpServletRequestWrapper(json); - handler.getEvents(req); - Assert.fail(); - } - - @Test(expected = UnsupportedCharsetException.class) - public void testError() throws Exception { - String json = "[{\"headers\" : {\"a\": \"b\"},\"body\": \"random_body\"}]"; - HttpServletRequest req = new FlumeHttpServletRequestWrapper(json, "ISO-8859-1"); - handler.getEvents(req); - Assert.fail(); - } - - @Test - public void testSingleEventInArray() throws Exception { - String json = "[{\"headers\": {\"a\": \"b\"},\"body\": \"random_body\"}]"; - HttpServletRequest req = new FlumeHttpServletRequestWrapper(json); - List<Event> deserialized = handler.getEvents(req); - Event e = deserialized.get(0); - Assert.assertEquals("b", e.getHeaders().get("a")); - Assert.assertEquals("random_body", new String(e.getBody(), "UTF-8")); - } - - @Test - public void testMultipleLargeEvents() throws Exception { - String json = "[{\"headers\" : {\"a\": \"b\", \"a2\": \"b2\"," - + "\"a3\": \"b3\",\"a4\": \"b4\"},\"body\": \"random_body\"}," - + "{\"headers\" :{\"e\": \"f\",\"e2\": \"f2\"," - + "\"e3\": \"f3\",\"e4\": \"f4\",\"e5\": \"f5\"}," - + "\"body\": \"random_body2\"}," - + "{\"headers\" :{\"q1\": \"b\",\"q2\": \"b2\",\"q3\": \"b3\",\"q4\": \"b4\"}," - + "\"body\": \"random_bodyq\"}]"; - HttpServletRequest req = new FlumeHttpServletRequestWrapper(json); - List<Event> deserialized = handler.getEvents(req); - Event e = deserialized.get(0); - Assert.assertNotNull(e); - Assert.assertEquals("b", e.getHeaders().get("a")); - Assert.assertEquals("b2", e.getHeaders().get("a2")); - Assert.assertEquals("b3", e.getHeaders().get("a3")); - Assert.assertEquals("b4", e.getHeaders().get("a4")); - Assert.assertEquals("random_body", new String(e.getBody(), "UTF-8")); - e = deserialized.get(1); - Assert.assertNotNull(e); - Assert.assertEquals("f", e.getHeaders().get("e")); - Assert.assertEquals("f2", e.getHeaders().get("e2")); - Assert.assertEquals("f3", e.getHeaders().get("e3")); - Assert.assertEquals("f4", e.getHeaders().get("e4")); - Assert.assertEquals("f5", e.getHeaders().get("e5")); - Assert.assertEquals("random_body2", new String(e.getBody(), "UTF-8")); - e = deserialized.get(2); - Assert.assertNotNull(e); - Assert.assertEquals("b", e.getHeaders().get("q1")); - Assert.assertEquals("b2", e.getHeaders().get("q2")); - Assert.assertEquals("b3", e.getHeaders().get("q3")); - Assert.assertEquals("b4", e.getHeaders().get("q4")); - Assert.assertEquals("random_bodyq", new String(e.getBody(), "UTF-8")); - } - - @Test - public void testDeserializarion() throws Exception { - Type listType = new TypeToken<List<JSONEvent>>() {}.getType(); - List<JSONEvent> events = Lists.newArrayList(); - Random rand = new Random(); - for (int i = 1; i < 10; i++) { - Map<String, String> input = Maps.newHashMap(); - for (int j = 1; j < 10; j++) { - input.put(String.valueOf(i) + String.valueOf(j), String.valueOf(i)); - } - JSONEvent e = new JSONEvent(); - e.setBody(String.valueOf(rand.nextGaussian()).getBytes("UTF-8")); - e.setHeaders(input); - events.add(e); - } - Gson gson = new Gson(); - List<Event> deserialized = handler.getEvents(new FlumeHttpServletRequestWrapper(gson.toJson(events, listType))); - int i = 0; - for (Event e : deserialized) { - Event current = events.get(i++); - Assert.assertEquals(new String(current.getBody(), "UTF-8"), new String(e.getBody(), "UTF-8")); - Assert.assertEquals(current.getHeaders(), e.getHeaders()); - } - } -} diff --git a/flume-parent/pom.xml b/flume-parent/pom.xml index 90fcb68e0..3ef43cb53 100644 --- a/flume-parent/pom.xml +++ b/flume-parent/pom.xml @@ -254,7 +254,8 @@ <httpcore.version>4.4.15</httpcore.version> <httpclient.version>4.5.13</httpclient.version> <irclib.version>1.10</irclib.version> - <jetty.version>9.4.51.v20230217</jetty.version> + <jakarta-servlet.version>6.1.0</jakarta-servlet.version> + <jetty.version>12.1.9</jetty.version> <junit.version>4.13.2</junit.version> <log4j.version>2.26.0</log4j.version> <mapdb.version>0.9.9</mapdb.version> @@ -280,7 +281,7 @@ <netty-all.version>4.1.86.Final</netty-all.version> <external.protobuf.version>4.35.0</external.protobuf.version> <protobuf.plugin.version>0.6.1</protobuf.plugin.version> - <prometheus.version>0.15.0</prometheus.version> + <prometheus.version>1.7.0</prometheus.version> <rat.version>0.12</rat.version> <snappy-java.version>1.1.8.4</snappy-java.version> <slf4j.version>1.7.32</slf4j.version> @@ -409,8 +410,16 @@ </dependency> <dependency> - <groupId>org.eclipse.jetty</groupId> - <artifactId>jetty-servlet</artifactId> + <groupId>jakarta.servlet</groupId> + <artifactId>jakarta.servlet-api</artifactId> + <!-- Use 5.0.0 for Jetty 11 / EE9. Use 6.0.0 or 6.1.0 for Jetty 12 (EE10) --> + <version>${jakarta-servlet.version}</version> + <scope>provided</scope> + </dependency> + + <dependency> + <groupId>org.eclipse.jetty.ee11</groupId> + <artifactId>jetty-ee11-servlet</artifactId> <version>${jetty.version}</version> </dependency> @@ -554,13 +563,13 @@ <dependency> <groupId>io.prometheus</groupId> - <artifactId>simpleclient</artifactId> + <artifactId>prometheus-metrics-core</artifactId> <version>${prometheus.version}</version> </dependency> <dependency> <groupId>io.prometheus</groupId> - <artifactId>simpleclient_servlet</artifactId> + <artifactId>prometheus-metrics-exporter-servlet-jakarta</artifactId> <version>${prometheus.version}</version> </dependency>
