This is an automated email from the ASF dual-hosted git repository.
exceptionfactory pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new 87e92122fc NIFI-14386 Remove ListenHTTP Max Data to Receive per Second
property and document the replacement Use Case (#9823)
87e92122fc is described below
commit 87e92122fc9d18747a828d6e66e50df4bd05820d
Author: Michael Moser <[email protected]>
AuthorDate: Wed Mar 26 21:11:46 2025 -0400
NIFI-14386 Remove ListenHTTP Max Data to Receive per Second property and
document the replacement Use Case (#9823)
Signed-off-by: David Handermann <[email protected]>
---
.../nifi/stream/io/LeakyBucketStreamThrottler.java | 332 ---------------------
.../org/apache/nifi/stream/io/StreamThrottler.java | 33 --
.../nifi/stream/io/TestLeakyBucketThrottler.java | 155 ----------
.../nifi/processors/standard/ListenHTTP.java | 65 ++--
.../standard/servlets/ListenHTTPServlet.java | 21 +-
5 files changed, 43 insertions(+), 563 deletions(-)
diff --git
a/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/LeakyBucketStreamThrottler.java
b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/LeakyBucketStreamThrottler.java
deleted file mode 100644
index 6bff25753b..0000000000
---
a/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/LeakyBucketStreamThrottler.java
+++ /dev/null
@@ -1,332 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.stream.io;
-
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.Executors;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-public class LeakyBucketStreamThrottler implements StreamThrottler {
-
- private final int maxBytesPerSecond;
- private final BlockingQueue<Request> requestQueue = new
LinkedBlockingQueue<>();
- private final ScheduledExecutorService executorService;
- private final AtomicBoolean shutdown = new AtomicBoolean(false);
-
- public LeakyBucketStreamThrottler(final int maxBytesPerSecond) {
- this.maxBytesPerSecond = maxBytesPerSecond;
-
- executorService = Executors.newSingleThreadScheduledExecutor();
- final Runnable task = new Drain();
- executorService.scheduleAtFixedRate(task, 0, 1000,
TimeUnit.MILLISECONDS);
- }
-
- @Override
- public void close() {
- this.shutdown.set(true);
-
- executorService.shutdown();
- try {
- // Should not take more than 2 seconds because we run every
second. If it takes more than
- // 2 seconds, it is because the Runnable thread is blocking on a
write; in this case,
- // we will just ignore it and return
- executorService.awaitTermination(2, TimeUnit.SECONDS);
- } catch (InterruptedException ignored) {
- }
- }
-
- @Override
- public OutputStream newThrottledOutputStream(final OutputStream toWrap) {
- return new OutputStream() {
- @Override
- public void write(final int b) throws IOException {
- write(new byte[]{(byte) b}, 0, 1);
- }
-
- @Override
- public void write(byte[] b) throws IOException {
- write(b, 0, b.length);
- }
-
- @Override
- public void write(byte[] b, int off, int len) throws IOException {
- final InputStream in = new ByteArrayInputStream(b, off, len);
- LeakyBucketStreamThrottler.this.copy(in, toWrap);
- }
-
- @Override
- public void close() throws IOException {
- toWrap.close();
- }
-
- @Override
- public void flush() throws IOException {
- toWrap.flush();
- }
- };
- }
-
- @Override
- public InputStream newThrottledInputStream(final InputStream toWrap) {
- return new InputStream() {
- final ByteArrayOutputStream baos = new ByteArrayOutputStream();
-
- @Override
- public int read() throws IOException {
- final ByteArrayOutputStream baos = new
ByteArrayOutputStream(1);
- LeakyBucketStreamThrottler.this.copy(toWrap, baos, 1L);
- if (baos.size() < 1) {
- return -1;
- }
-
- return baos.toByteArray()[0] & 0xFF;
- }
-
- @Override
- public int read(final byte[] b) throws IOException {
- if (b.length == 0) {
- return 0;
- }
- return read(b, 0, b.length);
- }
-
- @Override
- public int read(byte[] b, int off, int len) throws IOException {
- if (len < 0) {
- throw new IllegalArgumentException();
- }
- if (len == 0) {
- return 0;
- }
-
- baos.reset();
- final int copied = (int)
LeakyBucketStreamThrottler.this.copy(toWrap, baos, len);
- if (copied == 0) {
- return -1;
- }
- System.arraycopy(baos.toByteArray(), 0, b, off, copied);
- return copied;
- }
-
- @Override
- public void close() throws IOException {
- toWrap.close();
- }
-
- @Override
- public int available() throws IOException {
- return toWrap.available();
- }
- };
- }
-
- @Override
- public long copy(final InputStream in, final OutputStream out) throws
IOException {
- return copy(in, out, -1);
- }
-
- @Override
- public long copy(final InputStream in, final OutputStream out, final long
maxBytes) throws IOException {
- long totalBytesCopied = 0;
- boolean finished = false;
- while (!finished) {
- final long requestMax = (maxBytes < 0) ? Long.MAX_VALUE : maxBytes
- totalBytesCopied;
- final Request request = new Request(in, out, requestMax);
- boolean transferred = false;
- while (!transferred) {
- if (shutdown.get()) {
- throw new IOException("Throttler shutdown");
- }
-
- try {
- transferred = requestQueue.offer(request, 1000,
TimeUnit.MILLISECONDS);
- } catch (final InterruptedException e) {
- throw new IOException("Interrupted", e);
- }
- }
-
- final BlockingQueue<Response> responseQueue =
request.getResponseQueue();
- Response response = null;
- while (response == null) {
- try {
- if (shutdown.get()) {
- throw new IOException("Throttler shutdown");
- }
- response = responseQueue.poll(1000L,
TimeUnit.MILLISECONDS);
- } catch (InterruptedException e) {
- throw new IOException("Interrupted", e);
- }
- }
-
- if (!response.isSuccess()) {
- throw response.getError();
- }
-
- totalBytesCopied += response.getBytesCopied();
- finished = (response.getBytesCopied() == 0) || (totalBytesCopied
>= maxBytes && maxBytes > 0);
- }
-
- return totalBytesCopied;
- }
-
- /**
- * This class is responsible for draining water from the leaky bucket.
I.e., it actually moves the data
- */
- private class Drain implements Runnable {
-
- private final byte[] buffer;
-
- public Drain() {
- final int bufferSize = Math.min(4096, maxBytesPerSecond);
- buffer = new byte[bufferSize];
- }
-
- @Override
- public void run() {
- final long start = System.currentTimeMillis();
-
- int bytesTransferred = 0;
- while (bytesTransferred < maxBytesPerSecond) {
- final long maxMillisToWait = 1000 -
(System.currentTimeMillis() - start);
- if (maxMillisToWait < 1) {
- return;
- }
-
- try {
- final Request request = requestQueue.poll(maxMillisToWait,
TimeUnit.MILLISECONDS);
- if (request == null) {
- return;
- }
-
- final BlockingQueue<Response> responseQueue =
request.getResponseQueue();
-
- final OutputStream out = request.getOutputStream();
- final InputStream in = request.getInputStream();
-
- try {
- final long requestMax = request.getMaxBytesToCopy();
- long maxBytesToTransfer;
- if (requestMax < 0) {
- maxBytesToTransfer = Math.min(buffer.length,
maxBytesPerSecond - bytesTransferred);
- } else {
- maxBytesToTransfer = Math.min(requestMax,
- Math.min(buffer.length, maxBytesPerSecond
- bytesTransferred));
- }
- maxBytesToTransfer = Math.max(1L, maxBytesToTransfer);
-
- final int bytesCopied = fillBuffer(in,
maxBytesToTransfer);
- out.write(buffer, 0, bytesCopied);
-
- final Response response = new Response(true,
bytesCopied);
- responseQueue.put(response);
- bytesTransferred += bytesCopied;
- } catch (final IOException e) {
- final Response response = new Response(e);
- responseQueue.put(response);
- }
- } catch (InterruptedException ignored) {
- }
- }
- }
-
- private int fillBuffer(final InputStream in, final long maxBytes)
throws IOException {
- int bytesRead = 0;
- int len;
- while (bytesRead < maxBytes && (len = in.read(buffer, bytesRead,
(int) Math.min(maxBytes - bytesRead, buffer.length - bytesRead))) > 0) {
- bytesRead += len;
- }
-
- return bytesRead;
- }
- }
-
- private static class Response {
-
- private final boolean success;
- private final IOException error;
- private final int bytesCopied;
-
- public Response(final boolean success, final int bytesCopied) {
- this.success = success;
- this.bytesCopied = bytesCopied;
- this.error = null;
- }
-
- public Response(final IOException error) {
- this.success = false;
- this.error = error;
- this.bytesCopied = -1;
- }
-
- public boolean isSuccess() {
- return success;
- }
-
- public IOException getError() {
- return error;
- }
-
- public int getBytesCopied() {
- return bytesCopied;
- }
- }
-
- private static class Request {
-
- private final OutputStream out;
- private final InputStream in;
- private final long maxBytesToCopy;
- private final BlockingQueue<Response> responseQueue;
-
- public Request(final InputStream in, final OutputStream out, final
long maxBytesToCopy) {
- this.out = out;
- this.in = in;
- this.maxBytesToCopy = maxBytesToCopy;
- this.responseQueue = new LinkedBlockingQueue<>(1);
- }
-
- public BlockingQueue<Response> getResponseQueue() {
- return this.responseQueue;
- }
-
- public OutputStream getOutputStream() {
- return out;
- }
-
- public InputStream getInputStream() {
- return in;
- }
-
- public long getMaxBytesToCopy() {
- return maxBytesToCopy;
- }
-
- @Override
- public String toString() {
- return "Request[maxBytes=" + maxBytesToCopy + "]";
- }
- }
-
-}
diff --git
a/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/StreamThrottler.java
b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/StreamThrottler.java
deleted file mode 100644
index 9158050d03..0000000000
---
a/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/StreamThrottler.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.stream.io;
-
-import java.io.Closeable;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-
-public interface StreamThrottler extends Closeable {
-
- long copy(InputStream in, OutputStream out) throws IOException;
-
- long copy(InputStream in, OutputStream out, long maxBytes) throws
IOException;
-
- InputStream newThrottledInputStream(final InputStream toWrap);
-
- OutputStream newThrottledOutputStream(final OutputStream toWrap);
-}
diff --git
a/nifi-commons/nifi-utils/src/test/java/org/apache/nifi/stream/io/TestLeakyBucketThrottler.java
b/nifi-commons/nifi-utils/src/test/java/org/apache/nifi/stream/io/TestLeakyBucketThrottler.java
deleted file mode 100644
index a28c92e1d4..0000000000
---
a/nifi-commons/nifi-utils/src/test/java/org/apache/nifi/stream/io/TestLeakyBucketThrottler.java
+++ /dev/null
@@ -1,155 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.stream.io;
-
-import org.junit.jupiter.api.Disabled;
-import org.junit.jupiter.api.Test;
-import org.junit.jupiter.api.Timeout;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.TimeUnit;
-
-import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertTrue;
-
-@Disabled("Tests are time-based")
-public class TestLeakyBucketThrottler {
-
- private static final Logger logger =
LoggerFactory.getLogger(TestLeakyBucketThrottler.class);
-
- @Test
- @Timeout(value = 10000, unit = TimeUnit.MILLISECONDS)
- public void testOutputStreamInterface() throws IOException {
- // throttle rate at 1 MB/sec
- final LeakyBucketStreamThrottler throttler = new
LeakyBucketStreamThrottler(1024 * 1024);
-
- final byte[] data = new byte[1024 * 1024 * 4];
- final ByteArrayOutputStream baos = new ByteArrayOutputStream();
- try (final OutputStream throttledOut =
throttler.newThrottledOutputStream(baos)) {
-
- final long start = System.currentTimeMillis();
- throttledOut.write(data);
- throttler.close();
- final long millis = System.currentTimeMillis() - start;
- // should take 4 sec give or take
- assertTrue(millis > 3000);
- assertTrue(millis < 6000);
- }
- }
-
- @Test
- @Timeout(value = 10000, unit = TimeUnit.MILLISECONDS)
- public void testInputStreamInterface() throws IOException {
-
- final byte[] data = new byte[1024 * 1024 * 4];
- // throttle rate at 1 MB/sec
- try ( final LeakyBucketStreamThrottler throttler = new
LeakyBucketStreamThrottler(1024 * 1024);
- final ByteArrayInputStream bais = new
ByteArrayInputStream(data);
- final InputStream throttledIn =
throttler.newThrottledInputStream(bais);
- final ByteArrayOutputStream baos = new
ByteArrayOutputStream()) {
-
- final byte[] buffer = new byte[4096];
- final long start = System.currentTimeMillis();
- int len;
- while ((len = throttledIn.read(buffer)) > 0) {
- baos.write(buffer, 0, len);
- }
-
- final long millis = System.currentTimeMillis() - start;
- // should take 4 sec give or take
- assertTrue(millis > 3000);
- assertTrue(millis < 6000);
- }
- }
-
- @Test
- @Timeout(value = 10000, unit = TimeUnit.MILLISECONDS)
- public void testDirectInterface() throws IOException, InterruptedException
{
- // throttle rate at 1 MB/sec
- try (final LeakyBucketStreamThrottler throttler = new
LeakyBucketStreamThrottler(1024 * 1024);
- final ByteArrayOutputStream baos = new
ByteArrayOutputStream()) {
- // create 3 threads, each sending ~2 MB
- final List<Thread> threads = new ArrayList<>();
- for (int i = 0; i < 3; i++) {
- final Thread t = new WriterThread(i, throttler, baos);
- threads.add(t);
- }
-
- final long start = System.currentTimeMillis();
- for (final Thread t : threads) {
- t.start();
- }
-
- for (final Thread t : threads) {
- t.join();
- }
- final long elapsed = System.currentTimeMillis() - start;
-
- throttler.close();
-
- // To send 15 MB, it should have taken at least 5 seconds and no
more than 7 seconds, to
- // allow for busy-ness and the fact that we could write a tiny bit
more than the limit.
- assertTrue(elapsed > 5000);
- assertTrue(elapsed < 7000);
-
- // ensure bytes were copied out appropriately
- assertEquals(3 * (2 * 1024 * 1024 + 1), baos.size());
- assertEquals((byte) 'A', baos.toByteArray()[baos.size() - 1]);
- }
- }
-
- private static class WriterThread extends Thread {
-
- private final int idx;
- private final byte[] data = new byte[1024 * 1024 * 2 + 1];
- private final LeakyBucketStreamThrottler throttler;
- private final OutputStream out;
-
- public WriterThread(final int idx, final LeakyBucketStreamThrottler
throttler, final OutputStream out) {
- this.idx = idx;
- this.throttler = throttler;
- this.out = out;
- this.data[this.data.length - 1] = (byte) 'A';
- }
-
- @Override
- public void run() {
- long startMillis = System.currentTimeMillis();
- long bytesWritten = 0L;
- try {
- throttler.copy(new ByteArrayInputStream(data), out);
- } catch (IOException e) {
- e.printStackTrace();
- return;
- }
- long now = System.currentTimeMillis();
- long millisElapsed = now - startMillis;
- bytesWritten += data.length;
- float bytesPerSec = (float) bytesWritten / (float) millisElapsed *
1000F;
- logger.info("{} : copied data at a rate of {} bytes/sec", idx,
bytesPerSec);
- }
- }
-
-}
diff --git
a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenHTTP.java
b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenHTTP.java
index 4007a5e52f..675120d07e 100644
---
a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenHTTP.java
+++
b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenHTTP.java
@@ -23,9 +23,12 @@ import jakarta.ws.rs.Path;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.MultiProcessorUseCase;
+import org.apache.nifi.annotation.documentation.ProcessorConfiguration;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.documentation.UseCase;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnShutdown;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.annotation.notification.OnPrimaryNodeStateChange;
import org.apache.nifi.annotation.notification.PrimaryNodeState;
@@ -37,6 +40,7 @@ import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import
org.apache.nifi.jetty.configuration.connector.StandardServerConnectorFactory;
+import org.apache.nifi.migration.PropertyConfiguration;
import org.apache.nifi.processor.AbstractSessionFactoryProcessor;
import org.apache.nifi.processor.DataUnit;
import org.apache.nifi.processor.ProcessContext;
@@ -55,8 +59,6 @@ import org.apache.nifi.security.util.ClientAuth;
import org.apache.nifi.serialization.RecordReaderFactory;
import org.apache.nifi.serialization.RecordSetWriterFactory;
import org.apache.nifi.ssl.SSLContextProvider;
-import org.apache.nifi.stream.io.LeakyBucketStreamThrottler;
-import org.apache.nifi.stream.io.StreamThrottler;
import org.eclipse.jetty.ee10.servlet.ServletContextHandler;
import org.eclipse.jetty.ee10.servlet.ServletHolder;
import org.eclipse.jetty.server.Server;
@@ -67,7 +69,6 @@ import javax.net.ssl.SSLContext;
import javax.net.ssl.TrustManager;
import javax.net.ssl.TrustManagerFactory;
import javax.net.ssl.X509TrustManager;
-import java.io.IOException;
import java.security.KeyStore;
import java.security.cert.X509Certificate;
import java.util.ArrayList;
@@ -93,7 +94,7 @@ import java.util.regex.Pattern;
+ "CONNECT will also result in an error and the HTTP response status
code 400. "
+ "GET is supported on <service_URI>/healthcheck. If the service is
available, it returns \"200 OK\" with the content \"OK\". "
+ "The health check functionality can be configured to be accessible
via a different port. "
- + "For details see the documentation of the \"Listening Port for
health check requests\" property."
+ + "For details see the documentation of the \"Listening Port for
health check requests\" property. "
+ "A Record Reader and Record Writer property can be enabled on the
processor to process incoming requests as records. "
+ "Record processing is not allowed for multipart requests and request
in FlowFileV3 format (minifi).")
@UseCase(
@@ -110,6 +111,31 @@ import java.util.regex.Pattern;
The MergeContent and PackageFlowFile processors can generate
FlowFileV3 formatted data.
"""
)
+@MultiProcessorUseCase(
+ description = "Limit the date flow rate that is accepted",
+ keywords = {"rate", "limit"},
+ notes = """
+ When ListenHTTP cannot output FlowFiles due to back pressure, it
will send HTTP 503 Service Unavailable
+ response to clients, or deny connections, until more space is
available in the output queue.
+ """,
+ configurations = {
+ @ProcessorConfiguration(
+ processorClass = ListenHTTP.class,
+ configuration = """
+ Connect the 'success' relationship of ListenHTTP to a
ControlRate processor and configure back pressure on that
+ connection so that a small amount of data will fill the
queue. The size of the back pressure configuration
+ determines how much data to buffer to handle spikes in
rate without affecting clients.
+ """
+ ),
+ @ProcessorConfiguration(
+ processorClass = ControlRate.class,
+ configuration = """
+ Use the ControlRate properties to set the desired data
flow rate limit. When the limit it reached,
+ the ControlRate input connection will start accumulating
files. When this connection is full, ListenHTTP
+ will limit the input data flow rate.
+ """)
+ }
+)
public class ListenHTTP extends AbstractSessionFactoryProcessor {
private static final String MATCH_ALL = ".*";
@@ -196,12 +222,6 @@ public class ListenHTTP extends
AbstractSessionFactoryProcessor {
.defaultValue("60 secs")
.addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
.build();
- public static final PropertyDescriptor MAX_DATA_RATE = new
PropertyDescriptor.Builder()
- .name("Max Data to Receive per Second")
- .description("The maximum amount of data to receive per second; this
allows the bandwidth to be throttled to a specified data rate; if not
specified, the data rate is not throttled")
- .required(false)
- .addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
- .build();
public static final PropertyDescriptor SSL_CONTEXT_SERVICE = new
PropertyDescriptor.Builder()
.name("SSL Context Service")
.description("SSL Context Service enables support for HTTPS")
@@ -302,7 +322,6 @@ public class ListenHTTP extends
AbstractSessionFactoryProcessor {
BASE_PATH,
PORT,
HEALTH_CHECK_PORT,
- MAX_DATA_RATE,
SSL_CONTEXT_SERVICE,
HTTP_PROTOCOL_STRATEGY,
CLIENT_AUTHENTICATION,
@@ -336,7 +355,6 @@ public class ListenHTTP extends
AbstractSessionFactoryProcessor {
public static final String CONTEXT_ATTRIBUTE_AUTHORITY_ISSUER_PATTERN =
"authorityIssuerPattern";
public static final String CONTEXT_ATTRIBUTE_HEADER_PATTERN =
"headerPattern";
public static final String CONTEXT_ATTRIBUTE_FLOWFILE_MAP = "flowFileMap";
- public static final String CONTEXT_ATTRIBUTE_STREAM_THROTTLER =
"streamThrottler";
public static final String CONTEXT_ATTRIBUTE_BASE_PATH = "basePath";
public static final String CONTEXT_ATTRIBUTE_RETURN_CODE = "returnCode";
public static final String CONTEXT_ATTRIBUTE_MULTIPART_REQUEST_MAX_SIZE =
"multipartRequestMaxSize";
@@ -346,7 +364,6 @@ public class ListenHTTP extends
AbstractSessionFactoryProcessor {
private volatile Server server = null;
private final ConcurrentMap<String, FlowFileEntryTimeWrapper> flowFileMap
= new ConcurrentHashMap<>();
private final AtomicReference<ProcessSessionFactory>
sessionFactoryReference = new AtomicReference<>();
- private final AtomicReference<StreamThrottler> throttlerRef = new
AtomicReference<>();
@Override
protected Collection<ValidationResult> customValidate(ValidationContext
validationContext) {
@@ -382,17 +399,17 @@ public class ListenHTTP extends
AbstractSessionFactoryProcessor {
return PROPERTY_DESCRIPTORS;
}
- @OnStopped
- public void shutdownHttpServer() {
- final StreamThrottler throttler = throttlerRef.getAndSet(null);
- if (throttler != null) {
- try {
- throttler.close();
- } catch (IOException e) {
- getLogger().error("Failed to close StreamThrottler", e);
- }
+ @Override
+ public void migrateProperties(PropertyConfiguration config) {
+ super.migrateProperties(config);
+ if (config.removeProperty("Max Data to Receive per Second")) {
+ getLogger().warn("ListenHTTP rate limit feature was removed.
Please see ListenHTTP documentation for alternatives.");
}
+ }
+ @OnShutdown
+ @OnStopped
+ public void shutdownHttpServer() {
final Server toShutdown = this.server;
if (toShutdown == null) {
return;
@@ -423,14 +440,11 @@ public class ListenHTTP extends
AbstractSessionFactoryProcessor {
runOnPrimary.set(context.getExecutionNode().equals(ExecutionNode.PRIMARY));
final String basePath =
context.getProperty(BASE_PATH).evaluateAttributeExpressions().getValue();
final SSLContextProvider sslContextProvider =
context.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextProvider.class);
- final Double maxBytesPerSecond =
context.getProperty(MAX_DATA_RATE).asDataSize(DataUnit.B);
- final StreamThrottler streamThrottler = (maxBytesPerSecond == null) ?
null : new LeakyBucketStreamThrottler(maxBytesPerSecond.intValue());
final int returnCode = context.getProperty(RETURN_CODE).asInteger();
final long requestMaxSize =
context.getProperty(MULTIPART_REQUEST_MAX_SIZE).asDataSize(DataUnit.B).longValue();
final int readBufferSize =
context.getProperty(MULTIPART_READ_BUFFER_SIZE).asDataSize(DataUnit.B).intValue();
final int maxThreadPoolSize =
context.getProperty(MAX_THREAD_POOL_SIZE).asInteger();
final int requestHeaderSize =
context.getProperty(REQUEST_HEADER_MAX_SIZE).asDataSize(DataUnit.B).intValue();
- throttlerRef.set(streamThrottler);
final PropertyValue clientAuthenticationProperty =
context.getProperty(CLIENT_AUTHENTICATION);
final ClientAuthentication clientAuthentication =
getClientAuthentication(sslContextProvider, clientAuthenticationProperty);
@@ -491,7 +505,6 @@ public class ListenHTTP extends
AbstractSessionFactoryProcessor {
contextHandler.setAttribute(CONTEXT_ATTRIBUTE_AUTHORITY_PATTERN,
Pattern.compile(context.getProperty(AUTHORIZED_DN_PATTERN).getValue()));
contextHandler.setAttribute(CONTEXT_ATTRIBUTE_AUTHORITY_ISSUER_PATTERN,
Pattern.compile(context.getProperty(AUTHORIZED_ISSUER_DN_PATTERN)
.isSet() ?
context.getProperty(AUTHORIZED_ISSUER_DN_PATTERN).getValue() : MATCH_ALL));
- contextHandler.setAttribute(CONTEXT_ATTRIBUTE_STREAM_THROTTLER,
streamThrottler);
contextHandler.setAttribute(CONTEXT_ATTRIBUTE_BASE_PATH, basePath);
contextHandler.setAttribute(CONTEXT_ATTRIBUTE_RETURN_CODE, returnCode);
contextHandler.setAttribute(CONTEXT_ATTRIBUTE_MULTIPART_REQUEST_MAX_SIZE,
requestMaxSize);
diff --git
a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/servlets/ListenHTTPServlet.java
b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/servlets/ListenHTTPServlet.java
index a9b90b82e6..3e032bbfda 100644
---
a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/servlets/ListenHTTPServlet.java
+++
b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/servlets/ListenHTTPServlet.java
@@ -48,7 +48,6 @@ import org.apache.nifi.serialization.RecordReaderFactory;
import org.apache.nifi.serialization.RecordSetWriter;
import org.apache.nifi.serialization.RecordSetWriterFactory;
import org.apache.nifi.serialization.record.RecordSet;
-import org.apache.nifi.stream.io.StreamThrottler;
import org.apache.nifi.stream.io.StreamUtils;
import org.apache.nifi.util.FlowFileUnpackager;
import org.apache.nifi.util.FlowFileUnpackagerV1;
@@ -113,7 +112,6 @@ public class ListenHTTPServlet extends HttpServlet {
private Pattern authorizedIssuerPattern;
private Pattern headerPattern;
private ConcurrentMap<String, FlowFileEntryTimeWrapper> flowFileMap;
- private StreamThrottler streamThrottler;
private String basePath;
private int returnCode;
private long multipartRequestMaxSize;
@@ -133,7 +131,6 @@ public class ListenHTTPServlet extends HttpServlet {
this.authorizedIssuerPattern = (Pattern)
context.getAttribute(ListenHTTP.CONTEXT_ATTRIBUTE_AUTHORITY_ISSUER_PATTERN);
this.headerPattern = (Pattern)
context.getAttribute(ListenHTTP.CONTEXT_ATTRIBUTE_HEADER_PATTERN);
this.flowFileMap = (ConcurrentMap<String, FlowFileEntryTimeWrapper>)
context.getAttribute(ListenHTTP.CONTEXT_ATTRIBUTE_FLOWFILE_MAP);
- this.streamThrottler = (StreamThrottler)
context.getAttribute(ListenHTTP.CONTEXT_ATTRIBUTE_STREAM_THROTTLER);
this.basePath = (String)
context.getAttribute(ListenHTTP.CONTEXT_ATTRIBUTE_BASE_PATH);
this.returnCode = (int)
context.getAttribute(ListenHTTP.CONTEXT_ATTRIBUTE_RETURN_CODE);
this.multipartRequestMaxSize = (long)
context.getAttribute(ListenHTTP.CONTEXT_ATTRIBUTE_MULTIPART_REQUEST_MAX_SIZE);
@@ -148,7 +145,7 @@ public class ListenHTTPServlet extends HttpServlet {
}
@Override
- protected void doHead(final HttpServletRequest request, final
HttpServletResponse response) throws ServletException, IOException {
+ public void doHead(final HttpServletRequest request, final
HttpServletResponse response) throws ServletException, IOException {
if (request.getLocalPort() == port) {
response.addHeader(ACCEPT_ENCODING_NAME, ACCEPT_ENCODING_VALUE);
response.addHeader(ACCEPT_HEADER_NAME, ACCEPT_HEADER_VALUE);
@@ -159,7 +156,7 @@ public class ListenHTTPServlet extends HttpServlet {
}
@Override
- protected void doPost(final HttpServletRequest request, final
HttpServletResponse response) throws ServletException, IOException {
+ public void doPost(final HttpServletRequest request, final
HttpServletResponse response) throws ServletException, IOException {
if (request.getLocalPort() != port) {
super.doPost(request, response);
@@ -241,9 +238,7 @@ public class ListenHTTPServlet extends HttpServlet {
final boolean createHold =
Boolean.parseBoolean(request.getHeader(FLOWFILE_CONFIRMATION_HEADER));
final String contentType = request.getContentType();
- final InputStream unthrottled = contentGzipped ? new
GZIPInputStream(request.getInputStream()) : request.getInputStream();
-
- final InputStream in = (streamThrottler == null) ? unthrottled :
streamThrottler.newThrottledInputStream(unthrottled);
+ final InputStream in = contentGzipped ? new
GZIPInputStream(request.getInputStream()) : request.getInputStream();
if (logger.isDebugEnabled()) {
logger.debug("Received request from {}, createHold={},
content-type={}, gzip={}", request.getRemoteHost(), createHold, contentType,
contentGzipped);
@@ -328,7 +323,6 @@ public class ListenHTTPServlet extends HttpServlet {
private Set<FlowFile> handleRequest(final HttpServletRequest request,
final ProcessSession session, String foundSubject, String foundIssuer,
final boolean destinationIsLegacyNiFi,
final String contentType, final InputStream in) throws IOException {
FlowFile flowFile;
- String holdUuid = null;
final AtomicBoolean hasMoreData = new AtomicBoolean(false);
final FlowFileUnpackager unpackager =
getFlowFileUnpackager(contentType);
@@ -366,13 +360,9 @@ public class ListenHTTPServlet extends HttpServlet {
attributes.put(CoreAttributes.FILENAME.key(), nameVal);
}
- String sourceSystemFlowFileIdentifier =
attributes.get(CoreAttributes.UUID.key());
+ String sourceSystemFlowFileIdentifier =
attributes.remove(CoreAttributes.UUID.key());
if (sourceSystemFlowFileIdentifier != null) {
sourceSystemFlowFileIdentifier = "urn:nifi:" +
sourceSystemFlowFileIdentifier;
-
- // If we receveied a UUID, we want to give the FlowFile a new
UUID and register the sending system's
- // identifier as the SourceSystemFlowFileIdentifier field in
the Provenance RECEIVE event
- attributes.put(CoreAttributes.UUID.key(),
UUID.randomUUID().toString());
}
flowFile = session.putAllAttributes(flowFile, attributes);
@@ -381,9 +371,6 @@ public class ListenHTTPServlet extends HttpServlet {
session.getProvenanceReporter().receive(flowFile,
request.getRequestURL().toString(), sourceSystemFlowFileIdentifier, details,
transferMillis);
flowFileSet.add(flowFile);
- if (holdUuid == null) {
- holdUuid = flowFile.getAttribute(CoreAttributes.UUID.key());
- }
} while (hasMoreData.get());
return flowFileSet;
}