This is an automated email from the ASF dual-hosted git repository.

exceptionfactory pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new 87e92122fc NIFI-14386 Remove ListenHTTP Max Data to Receive per Second 
property and document the replacement Use Case (#9823)
87e92122fc is described below

commit 87e92122fc9d18747a828d6e66e50df4bd05820d
Author: Michael Moser <[email protected]>
AuthorDate: Wed Mar 26 21:11:46 2025 -0400

    NIFI-14386 Remove ListenHTTP Max Data to Receive per Second property and 
document the replacement Use Case (#9823)
    
    Signed-off-by: David Handermann <[email protected]>
---
 .../nifi/stream/io/LeakyBucketStreamThrottler.java | 332 ---------------------
 .../org/apache/nifi/stream/io/StreamThrottler.java |  33 --
 .../nifi/stream/io/TestLeakyBucketThrottler.java   | 155 ----------
 .../nifi/processors/standard/ListenHTTP.java       |  65 ++--
 .../standard/servlets/ListenHTTPServlet.java       |  21 +-
 5 files changed, 43 insertions(+), 563 deletions(-)

diff --git 
a/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/LeakyBucketStreamThrottler.java
 
b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/LeakyBucketStreamThrottler.java
deleted file mode 100644
index 6bff25753b..0000000000
--- 
a/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/LeakyBucketStreamThrottler.java
+++ /dev/null
@@ -1,332 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.stream.io;
-
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.Executors;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-public class LeakyBucketStreamThrottler implements StreamThrottler {
-
-    private final int maxBytesPerSecond;
-    private final BlockingQueue<Request> requestQueue = new 
LinkedBlockingQueue<>();
-    private final ScheduledExecutorService executorService;
-    private final AtomicBoolean shutdown = new AtomicBoolean(false);
-
-    public LeakyBucketStreamThrottler(final int maxBytesPerSecond) {
-        this.maxBytesPerSecond = maxBytesPerSecond;
-
-        executorService = Executors.newSingleThreadScheduledExecutor();
-        final Runnable task = new Drain();
-        executorService.scheduleAtFixedRate(task, 0, 1000, 
TimeUnit.MILLISECONDS);
-    }
-
-    @Override
-    public void close() {
-        this.shutdown.set(true);
-
-        executorService.shutdown();
-        try {
-            // Should not take more than 2 seconds because we run every 
second. If it takes more than
-            // 2 seconds, it is because the Runnable thread is blocking on a 
write; in this case,
-            // we will just ignore it and return
-            executorService.awaitTermination(2, TimeUnit.SECONDS);
-        } catch (InterruptedException ignored) {
-        }
-    }
-
-    @Override
-    public OutputStream newThrottledOutputStream(final OutputStream toWrap) {
-        return new OutputStream() {
-            @Override
-            public void write(final int b) throws IOException {
-                write(new byte[]{(byte) b}, 0, 1);
-            }
-
-            @Override
-            public void write(byte[] b) throws IOException {
-                write(b, 0, b.length);
-            }
-
-            @Override
-            public void write(byte[] b, int off, int len) throws IOException {
-                final InputStream in = new ByteArrayInputStream(b, off, len);
-                LeakyBucketStreamThrottler.this.copy(in, toWrap);
-            }
-
-            @Override
-            public void close() throws IOException {
-                toWrap.close();
-            }
-
-            @Override
-            public void flush() throws IOException {
-                toWrap.flush();
-            }
-        };
-    }
-
-    @Override
-    public InputStream newThrottledInputStream(final InputStream toWrap) {
-        return new InputStream() {
-            final ByteArrayOutputStream baos = new ByteArrayOutputStream();
-
-            @Override
-            public int read() throws IOException {
-                final ByteArrayOutputStream baos = new 
ByteArrayOutputStream(1);
-                LeakyBucketStreamThrottler.this.copy(toWrap, baos, 1L);
-                if (baos.size() < 1) {
-                    return -1;
-                }
-
-                return baos.toByteArray()[0] & 0xFF;
-            }
-
-            @Override
-            public int read(final byte[] b) throws IOException {
-                if (b.length == 0) {
-                    return 0;
-                }
-                return read(b, 0, b.length);
-            }
-
-            @Override
-            public int read(byte[] b, int off, int len) throws IOException {
-                if (len < 0) {
-                    throw new IllegalArgumentException();
-                }
-                if (len == 0) {
-                    return 0;
-                }
-
-                baos.reset();
-                final int copied = (int) 
LeakyBucketStreamThrottler.this.copy(toWrap, baos, len);
-                if (copied == 0) {
-                    return -1;
-                }
-                System.arraycopy(baos.toByteArray(), 0, b, off, copied);
-                return copied;
-            }
-
-            @Override
-            public void close() throws IOException {
-                toWrap.close();
-            }
-
-            @Override
-            public int available() throws IOException {
-                return toWrap.available();
-            }
-        };
-    }
-
-    @Override
-    public long copy(final InputStream in, final OutputStream out) throws 
IOException {
-        return copy(in, out, -1);
-    }
-
-    @Override
-    public long copy(final InputStream in, final OutputStream out, final long 
maxBytes) throws IOException {
-        long totalBytesCopied = 0;
-        boolean finished = false;
-        while (!finished) {
-            final long requestMax = (maxBytes < 0) ? Long.MAX_VALUE : maxBytes 
- totalBytesCopied;
-            final Request request = new Request(in, out, requestMax);
-            boolean transferred = false;
-            while (!transferred) {
-                if (shutdown.get()) {
-                    throw new IOException("Throttler shutdown");
-                }
-
-                try {
-                    transferred = requestQueue.offer(request, 1000, 
TimeUnit.MILLISECONDS);
-                } catch (final InterruptedException e) {
-                    throw new IOException("Interrupted", e);
-                }
-            }
-
-            final BlockingQueue<Response> responseQueue = 
request.getResponseQueue();
-            Response response = null;
-            while (response == null) {
-                try {
-                    if (shutdown.get()) {
-                        throw new IOException("Throttler shutdown");
-                    }
-                    response = responseQueue.poll(1000L, 
TimeUnit.MILLISECONDS);
-                } catch (InterruptedException e) {
-                    throw new IOException("Interrupted", e);
-                }
-            }
-
-            if (!response.isSuccess()) {
-                throw response.getError();
-            }
-
-            totalBytesCopied += response.getBytesCopied();
-            finished = (response.getBytesCopied() == 0) || (totalBytesCopied 
>= maxBytes && maxBytes > 0);
-        }
-
-        return totalBytesCopied;
-    }
-
-    /**
-     * This class is responsible for draining water from the leaky bucket. 
I.e., it actually moves the data
-     */
-    private class Drain implements Runnable {
-
-        private final byte[] buffer;
-
-        public Drain() {
-            final int bufferSize = Math.min(4096, maxBytesPerSecond);
-            buffer = new byte[bufferSize];
-        }
-
-        @Override
-        public void run() {
-            final long start = System.currentTimeMillis();
-
-            int bytesTransferred = 0;
-            while (bytesTransferred < maxBytesPerSecond) {
-                final long maxMillisToWait = 1000 - 
(System.currentTimeMillis() - start);
-                if (maxMillisToWait < 1) {
-                    return;
-                }
-
-                try {
-                    final Request request = requestQueue.poll(maxMillisToWait, 
TimeUnit.MILLISECONDS);
-                    if (request == null) {
-                        return;
-                    }
-
-                    final BlockingQueue<Response> responseQueue = 
request.getResponseQueue();
-
-                    final OutputStream out = request.getOutputStream();
-                    final InputStream in = request.getInputStream();
-
-                    try {
-                        final long requestMax = request.getMaxBytesToCopy();
-                        long maxBytesToTransfer;
-                        if (requestMax < 0) {
-                            maxBytesToTransfer = Math.min(buffer.length, 
maxBytesPerSecond - bytesTransferred);
-                        } else {
-                            maxBytesToTransfer = Math.min(requestMax,
-                                    Math.min(buffer.length, maxBytesPerSecond 
- bytesTransferred));
-                        }
-                        maxBytesToTransfer = Math.max(1L, maxBytesToTransfer);
-
-                        final int bytesCopied = fillBuffer(in, 
maxBytesToTransfer);
-                        out.write(buffer, 0, bytesCopied);
-
-                        final Response response = new Response(true, 
bytesCopied);
-                        responseQueue.put(response);
-                        bytesTransferred += bytesCopied;
-                    } catch (final IOException e) {
-                        final Response response = new Response(e);
-                        responseQueue.put(response);
-                    }
-                } catch (InterruptedException ignored) {
-                }
-            }
-        }
-
-        private int fillBuffer(final InputStream in, final long maxBytes) 
throws IOException {
-            int bytesRead = 0;
-            int len;
-            while (bytesRead < maxBytes && (len = in.read(buffer, bytesRead, 
(int) Math.min(maxBytes - bytesRead, buffer.length - bytesRead))) > 0) {
-                bytesRead += len;
-            }
-
-            return bytesRead;
-        }
-    }
-
-    private static class Response {
-
-        private final boolean success;
-        private final IOException error;
-        private final int bytesCopied;
-
-        public Response(final boolean success, final int bytesCopied) {
-            this.success = success;
-            this.bytesCopied = bytesCopied;
-            this.error = null;
-        }
-
-        public Response(final IOException error) {
-            this.success = false;
-            this.error = error;
-            this.bytesCopied = -1;
-        }
-
-        public boolean isSuccess() {
-            return success;
-        }
-
-        public IOException getError() {
-            return error;
-        }
-
-        public int getBytesCopied() {
-            return bytesCopied;
-        }
-    }
-
-    private static class Request {
-
-        private final OutputStream out;
-        private final InputStream in;
-        private final long maxBytesToCopy;
-        private final BlockingQueue<Response> responseQueue;
-
-        public Request(final InputStream in, final OutputStream out, final 
long maxBytesToCopy) {
-            this.out = out;
-            this.in = in;
-            this.maxBytesToCopy = maxBytesToCopy;
-            this.responseQueue = new LinkedBlockingQueue<>(1);
-        }
-
-        public BlockingQueue<Response> getResponseQueue() {
-            return this.responseQueue;
-        }
-
-        public OutputStream getOutputStream() {
-            return out;
-        }
-
-        public InputStream getInputStream() {
-            return in;
-        }
-
-        public long getMaxBytesToCopy() {
-            return maxBytesToCopy;
-        }
-
-        @Override
-        public String toString() {
-            return "Request[maxBytes=" + maxBytesToCopy + "]";
-        }
-    }
-
-}
diff --git 
a/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/StreamThrottler.java
 
b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/StreamThrottler.java
deleted file mode 100644
index 9158050d03..0000000000
--- 
a/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/StreamThrottler.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.stream.io;
-
-import java.io.Closeable;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-
-public interface StreamThrottler extends Closeable {
-
-    long copy(InputStream in, OutputStream out) throws IOException;
-
-    long copy(InputStream in, OutputStream out, long maxBytes) throws 
IOException;
-
-    InputStream newThrottledInputStream(final InputStream toWrap);
-
-    OutputStream newThrottledOutputStream(final OutputStream toWrap);
-}
diff --git 
a/nifi-commons/nifi-utils/src/test/java/org/apache/nifi/stream/io/TestLeakyBucketThrottler.java
 
b/nifi-commons/nifi-utils/src/test/java/org/apache/nifi/stream/io/TestLeakyBucketThrottler.java
deleted file mode 100644
index a28c92e1d4..0000000000
--- 
a/nifi-commons/nifi-utils/src/test/java/org/apache/nifi/stream/io/TestLeakyBucketThrottler.java
+++ /dev/null
@@ -1,155 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.stream.io;
-
-import org.junit.jupiter.api.Disabled;
-import org.junit.jupiter.api.Test;
-import org.junit.jupiter.api.Timeout;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.TimeUnit;
-
-import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertTrue;
-
-@Disabled("Tests are time-based")
-public class TestLeakyBucketThrottler {
-
-    private static final Logger logger = 
LoggerFactory.getLogger(TestLeakyBucketThrottler.class);
-
-    @Test
-    @Timeout(value = 10000, unit = TimeUnit.MILLISECONDS)
-    public void testOutputStreamInterface() throws IOException {
-        // throttle rate at 1 MB/sec
-        final LeakyBucketStreamThrottler throttler = new 
LeakyBucketStreamThrottler(1024 * 1024);
-
-        final byte[] data = new byte[1024 * 1024 * 4];
-        final ByteArrayOutputStream baos = new ByteArrayOutputStream();
-        try (final OutputStream throttledOut = 
throttler.newThrottledOutputStream(baos)) {
-
-            final long start = System.currentTimeMillis();
-            throttledOut.write(data);
-            throttler.close();
-            final long millis = System.currentTimeMillis() - start;
-            // should take 4 sec give or take
-            assertTrue(millis > 3000);
-            assertTrue(millis < 6000);
-        }
-    }
-
-    @Test
-    @Timeout(value = 10000, unit = TimeUnit.MILLISECONDS)
-    public void testInputStreamInterface() throws IOException {
-
-        final byte[] data = new byte[1024 * 1024 * 4];
-     // throttle rate at 1 MB/sec
-        try ( final LeakyBucketStreamThrottler throttler = new 
LeakyBucketStreamThrottler(1024 * 1024);
-                final ByteArrayInputStream bais = new 
ByteArrayInputStream(data);
-                final InputStream throttledIn = 
throttler.newThrottledInputStream(bais);
-                final ByteArrayOutputStream baos = new 
ByteArrayOutputStream()) {
-
-            final byte[] buffer = new byte[4096];
-            final long start = System.currentTimeMillis();
-            int len;
-            while ((len = throttledIn.read(buffer)) > 0) {
-                baos.write(buffer, 0, len);
-            }
-
-            final long millis = System.currentTimeMillis() - start;
-            // should take 4 sec give or take
-            assertTrue(millis > 3000);
-            assertTrue(millis < 6000);
-        }
-    }
-
-    @Test
-    @Timeout(value = 10000, unit = TimeUnit.MILLISECONDS)
-    public void testDirectInterface() throws IOException, InterruptedException 
{
-        // throttle rate at 1 MB/sec
-        try (final LeakyBucketStreamThrottler throttler = new 
LeakyBucketStreamThrottler(1024 * 1024);
-                final ByteArrayOutputStream baos = new 
ByteArrayOutputStream()) {
-            // create 3 threads, each sending ~2 MB
-            final List<Thread> threads = new ArrayList<>();
-            for (int i = 0; i < 3; i++) {
-                final Thread t = new WriterThread(i, throttler, baos);
-                threads.add(t);
-            }
-
-            final long start = System.currentTimeMillis();
-            for (final Thread t : threads) {
-                t.start();
-            }
-
-            for (final Thread t : threads) {
-                t.join();
-            }
-            final long elapsed = System.currentTimeMillis() - start;
-
-            throttler.close();
-
-            // To send 15 MB, it should have taken at least 5 seconds and no 
more than 7 seconds, to
-            // allow for busy-ness and the fact that we could write a tiny bit 
more than the limit.
-            assertTrue(elapsed > 5000);
-            assertTrue(elapsed < 7000);
-
-            // ensure bytes were copied out appropriately
-            assertEquals(3 * (2 * 1024 * 1024 + 1), baos.size());
-            assertEquals((byte) 'A', baos.toByteArray()[baos.size() - 1]);
-        }
-    }
-
-    private static class WriterThread extends Thread {
-
-        private final int idx;
-        private final byte[] data = new byte[1024 * 1024 * 2 + 1];
-        private final LeakyBucketStreamThrottler throttler;
-        private final OutputStream out;
-
-        public WriterThread(final int idx, final LeakyBucketStreamThrottler 
throttler, final OutputStream out) {
-            this.idx = idx;
-            this.throttler = throttler;
-            this.out = out;
-            this.data[this.data.length - 1] = (byte) 'A';
-        }
-
-        @Override
-        public void run() {
-            long startMillis = System.currentTimeMillis();
-            long bytesWritten = 0L;
-            try {
-                throttler.copy(new ByteArrayInputStream(data), out);
-            } catch (IOException e) {
-                e.printStackTrace();
-                return;
-            }
-            long now = System.currentTimeMillis();
-            long millisElapsed = now - startMillis;
-            bytesWritten += data.length;
-            float bytesPerSec = (float) bytesWritten / (float) millisElapsed * 
1000F;
-            logger.info("{} : copied data at a rate of {} bytes/sec", idx, 
bytesPerSec);
-        }
-    }
-
-}
diff --git 
a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenHTTP.java
 
b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenHTTP.java
index 4007a5e52f..675120d07e 100644
--- 
a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenHTTP.java
+++ 
b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenHTTP.java
@@ -23,9 +23,12 @@ import jakarta.ws.rs.Path;
 import org.apache.nifi.annotation.behavior.InputRequirement;
 import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
 import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.MultiProcessorUseCase;
+import org.apache.nifi.annotation.documentation.ProcessorConfiguration;
 import org.apache.nifi.annotation.documentation.Tags;
 import org.apache.nifi.annotation.documentation.UseCase;
 import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnShutdown;
 import org.apache.nifi.annotation.lifecycle.OnStopped;
 import org.apache.nifi.annotation.notification.OnPrimaryNodeStateChange;
 import org.apache.nifi.annotation.notification.PrimaryNodeState;
@@ -37,6 +40,7 @@ import org.apache.nifi.components.ValidationResult;
 import org.apache.nifi.expression.ExpressionLanguageScope;
 import org.apache.nifi.flowfile.FlowFile;
 import 
org.apache.nifi.jetty.configuration.connector.StandardServerConnectorFactory;
+import org.apache.nifi.migration.PropertyConfiguration;
 import org.apache.nifi.processor.AbstractSessionFactoryProcessor;
 import org.apache.nifi.processor.DataUnit;
 import org.apache.nifi.processor.ProcessContext;
@@ -55,8 +59,6 @@ import org.apache.nifi.security.util.ClientAuth;
 import org.apache.nifi.serialization.RecordReaderFactory;
 import org.apache.nifi.serialization.RecordSetWriterFactory;
 import org.apache.nifi.ssl.SSLContextProvider;
-import org.apache.nifi.stream.io.LeakyBucketStreamThrottler;
-import org.apache.nifi.stream.io.StreamThrottler;
 import org.eclipse.jetty.ee10.servlet.ServletContextHandler;
 import org.eclipse.jetty.ee10.servlet.ServletHolder;
 import org.eclipse.jetty.server.Server;
@@ -67,7 +69,6 @@ import javax.net.ssl.SSLContext;
 import javax.net.ssl.TrustManager;
 import javax.net.ssl.TrustManagerFactory;
 import javax.net.ssl.X509TrustManager;
-import java.io.IOException;
 import java.security.KeyStore;
 import java.security.cert.X509Certificate;
 import java.util.ArrayList;
@@ -93,7 +94,7 @@ import java.util.regex.Pattern;
         + "CONNECT will also result in an error and the HTTP response status 
code 400. "
         + "GET is supported on <service_URI>/healthcheck. If the service is 
available, it returns \"200 OK\" with the content \"OK\". "
         + "The health check functionality can be configured to be accessible 
via a different port. "
-        + "For details see the documentation of the \"Listening Port for 
health check requests\" property."
+        + "For details see the documentation of the \"Listening Port for 
health check requests\" property. "
         + "A Record Reader and Record Writer property can be enabled on the 
processor to process incoming requests as records. "
         + "Record processing is not allowed for multipart requests and request 
in FlowFileV3 format (minifi).")
 @UseCase(
@@ -110,6 +111,31 @@ import java.util.regex.Pattern;
         The MergeContent and PackageFlowFile processors can generate 
FlowFileV3 formatted data.
         """
 )
+@MultiProcessorUseCase(
+        description = "Limit the date flow rate that is accepted",
+        keywords = {"rate", "limit"},
+        notes = """
+            When ListenHTTP cannot output FlowFiles due to back pressure, it 
will send HTTP 503 Service Unavailable
+            response to clients, or deny connections, until more space is 
available in the output queue.
+            """,
+        configurations = {
+            @ProcessorConfiguration(
+                processorClass = ListenHTTP.class,
+                configuration = """
+                    Connect the 'success' relationship of ListenHTTP to a 
ControlRate processor and configure back pressure on that
+                    connection so that a small amount of data will fill the 
queue. The size of the back pressure configuration
+                    determines how much data to buffer to handle spikes in 
rate without affecting clients.
+                    """
+            ),
+            @ProcessorConfiguration(
+                processorClass = ControlRate.class,
+                configuration = """
+                    Use the ControlRate properties to set the desired data 
flow rate limit. When the limit it reached,
+                    the ControlRate input connection will start accumulating 
files. When this connection is full, ListenHTTP
+                    will limit the input data flow rate.
+                    """)
+        }
+)
 public class ListenHTTP extends AbstractSessionFactoryProcessor {
     private static final String MATCH_ALL = ".*";
 
@@ -196,12 +222,6 @@ public class ListenHTTP extends 
AbstractSessionFactoryProcessor {
         .defaultValue("60 secs")
         .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
         .build();
-    public static final PropertyDescriptor MAX_DATA_RATE = new 
PropertyDescriptor.Builder()
-        .name("Max Data to Receive per Second")
-        .description("The maximum amount of data to receive per second; this 
allows the bandwidth to be throttled to a specified data rate; if not 
specified, the data rate is not throttled")
-        .required(false)
-        .addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
-        .build();
     public static final PropertyDescriptor SSL_CONTEXT_SERVICE = new 
PropertyDescriptor.Builder()
         .name("SSL Context Service")
         .description("SSL Context Service enables support for HTTPS")
@@ -302,7 +322,6 @@ public class ListenHTTP extends 
AbstractSessionFactoryProcessor {
             BASE_PATH,
             PORT,
             HEALTH_CHECK_PORT,
-            MAX_DATA_RATE,
             SSL_CONTEXT_SERVICE,
             HTTP_PROTOCOL_STRATEGY,
             CLIENT_AUTHENTICATION,
@@ -336,7 +355,6 @@ public class ListenHTTP extends 
AbstractSessionFactoryProcessor {
     public static final String CONTEXT_ATTRIBUTE_AUTHORITY_ISSUER_PATTERN = 
"authorityIssuerPattern";
     public static final String CONTEXT_ATTRIBUTE_HEADER_PATTERN = 
"headerPattern";
     public static final String CONTEXT_ATTRIBUTE_FLOWFILE_MAP = "flowFileMap";
-    public static final String CONTEXT_ATTRIBUTE_STREAM_THROTTLER = 
"streamThrottler";
     public static final String CONTEXT_ATTRIBUTE_BASE_PATH = "basePath";
     public static final String CONTEXT_ATTRIBUTE_RETURN_CODE = "returnCode";
     public static final String CONTEXT_ATTRIBUTE_MULTIPART_REQUEST_MAX_SIZE = 
"multipartRequestMaxSize";
@@ -346,7 +364,6 @@ public class ListenHTTP extends 
AbstractSessionFactoryProcessor {
     private volatile Server server = null;
     private final ConcurrentMap<String, FlowFileEntryTimeWrapper> flowFileMap 
= new ConcurrentHashMap<>();
     private final AtomicReference<ProcessSessionFactory> 
sessionFactoryReference = new AtomicReference<>();
-    private final AtomicReference<StreamThrottler> throttlerRef = new 
AtomicReference<>();
 
     @Override
     protected Collection<ValidationResult> customValidate(ValidationContext 
validationContext) {
@@ -382,17 +399,17 @@ public class ListenHTTP extends 
AbstractSessionFactoryProcessor {
         return PROPERTY_DESCRIPTORS;
     }
 
-    @OnStopped
-    public void shutdownHttpServer() {
-        final StreamThrottler throttler = throttlerRef.getAndSet(null);
-        if (throttler != null) {
-            try {
-                throttler.close();
-            } catch (IOException e) {
-                getLogger().error("Failed to close StreamThrottler", e);
-            }
+    @Override
+    public void migrateProperties(PropertyConfiguration config) {
+        super.migrateProperties(config);
+        if (config.removeProperty("Max Data to Receive per Second")) {
+            getLogger().warn("ListenHTTP rate limit feature was removed. 
Please see ListenHTTP documentation for alternatives.");
         }
+    }
 
+    @OnShutdown
+    @OnStopped
+    public void shutdownHttpServer() {
         final Server toShutdown = this.server;
         if (toShutdown == null) {
             return;
@@ -423,14 +440,11 @@ public class ListenHTTP extends 
AbstractSessionFactoryProcessor {
         
runOnPrimary.set(context.getExecutionNode().equals(ExecutionNode.PRIMARY));
         final String basePath = 
context.getProperty(BASE_PATH).evaluateAttributeExpressions().getValue();
         final SSLContextProvider sslContextProvider = 
context.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextProvider.class);
-        final Double maxBytesPerSecond = 
context.getProperty(MAX_DATA_RATE).asDataSize(DataUnit.B);
-        final StreamThrottler streamThrottler = (maxBytesPerSecond == null) ? 
null : new LeakyBucketStreamThrottler(maxBytesPerSecond.intValue());
         final int returnCode = context.getProperty(RETURN_CODE).asInteger();
         final long requestMaxSize = 
context.getProperty(MULTIPART_REQUEST_MAX_SIZE).asDataSize(DataUnit.B).longValue();
         final int readBufferSize = 
context.getProperty(MULTIPART_READ_BUFFER_SIZE).asDataSize(DataUnit.B).intValue();
         final int maxThreadPoolSize = 
context.getProperty(MAX_THREAD_POOL_SIZE).asInteger();
         final int requestHeaderSize = 
context.getProperty(REQUEST_HEADER_MAX_SIZE).asDataSize(DataUnit.B).intValue();
-        throttlerRef.set(streamThrottler);
 
         final PropertyValue clientAuthenticationProperty = 
context.getProperty(CLIENT_AUTHENTICATION);
         final ClientAuthentication clientAuthentication = 
getClientAuthentication(sslContextProvider, clientAuthenticationProperty);
@@ -491,7 +505,6 @@ public class ListenHTTP extends 
AbstractSessionFactoryProcessor {
         contextHandler.setAttribute(CONTEXT_ATTRIBUTE_AUTHORITY_PATTERN, 
Pattern.compile(context.getProperty(AUTHORIZED_DN_PATTERN).getValue()));
         
contextHandler.setAttribute(CONTEXT_ATTRIBUTE_AUTHORITY_ISSUER_PATTERN, 
Pattern.compile(context.getProperty(AUTHORIZED_ISSUER_DN_PATTERN)
                 .isSet() ? 
context.getProperty(AUTHORIZED_ISSUER_DN_PATTERN).getValue() : MATCH_ALL));
-        contextHandler.setAttribute(CONTEXT_ATTRIBUTE_STREAM_THROTTLER, 
streamThrottler);
         contextHandler.setAttribute(CONTEXT_ATTRIBUTE_BASE_PATH, basePath);
         contextHandler.setAttribute(CONTEXT_ATTRIBUTE_RETURN_CODE, returnCode);
         
contextHandler.setAttribute(CONTEXT_ATTRIBUTE_MULTIPART_REQUEST_MAX_SIZE, 
requestMaxSize);
diff --git 
a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/servlets/ListenHTTPServlet.java
 
b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/servlets/ListenHTTPServlet.java
index a9b90b82e6..3e032bbfda 100644
--- 
a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/servlets/ListenHTTPServlet.java
+++ 
b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/servlets/ListenHTTPServlet.java
@@ -48,7 +48,6 @@ import org.apache.nifi.serialization.RecordReaderFactory;
 import org.apache.nifi.serialization.RecordSetWriter;
 import org.apache.nifi.serialization.RecordSetWriterFactory;
 import org.apache.nifi.serialization.record.RecordSet;
-import org.apache.nifi.stream.io.StreamThrottler;
 import org.apache.nifi.stream.io.StreamUtils;
 import org.apache.nifi.util.FlowFileUnpackager;
 import org.apache.nifi.util.FlowFileUnpackagerV1;
@@ -113,7 +112,6 @@ public class ListenHTTPServlet extends HttpServlet {
     private Pattern authorizedIssuerPattern;
     private Pattern headerPattern;
     private ConcurrentMap<String, FlowFileEntryTimeWrapper> flowFileMap;
-    private StreamThrottler streamThrottler;
     private String basePath;
     private int returnCode;
     private long multipartRequestMaxSize;
@@ -133,7 +131,6 @@ public class ListenHTTPServlet extends HttpServlet {
         this.authorizedIssuerPattern = (Pattern) 
context.getAttribute(ListenHTTP.CONTEXT_ATTRIBUTE_AUTHORITY_ISSUER_PATTERN);
         this.headerPattern = (Pattern) 
context.getAttribute(ListenHTTP.CONTEXT_ATTRIBUTE_HEADER_PATTERN);
         this.flowFileMap = (ConcurrentMap<String, FlowFileEntryTimeWrapper>) 
context.getAttribute(ListenHTTP.CONTEXT_ATTRIBUTE_FLOWFILE_MAP);
-        this.streamThrottler = (StreamThrottler) 
context.getAttribute(ListenHTTP.CONTEXT_ATTRIBUTE_STREAM_THROTTLER);
         this.basePath = (String) 
context.getAttribute(ListenHTTP.CONTEXT_ATTRIBUTE_BASE_PATH);
         this.returnCode = (int) 
context.getAttribute(ListenHTTP.CONTEXT_ATTRIBUTE_RETURN_CODE);
         this.multipartRequestMaxSize = (long) 
context.getAttribute(ListenHTTP.CONTEXT_ATTRIBUTE_MULTIPART_REQUEST_MAX_SIZE);
@@ -148,7 +145,7 @@ public class ListenHTTPServlet extends HttpServlet {
     }
 
     @Override
-    protected void doHead(final HttpServletRequest request, final 
HttpServletResponse response) throws ServletException, IOException {
+    public void doHead(final HttpServletRequest request, final 
HttpServletResponse response) throws ServletException, IOException {
         if (request.getLocalPort() == port) {
             response.addHeader(ACCEPT_ENCODING_NAME, ACCEPT_ENCODING_VALUE);
             response.addHeader(ACCEPT_HEADER_NAME, ACCEPT_HEADER_VALUE);
@@ -159,7 +156,7 @@ public class ListenHTTPServlet extends HttpServlet {
     }
 
     @Override
-    protected void doPost(final HttpServletRequest request, final 
HttpServletResponse response) throws ServletException, IOException {
+    public void doPost(final HttpServletRequest request, final 
HttpServletResponse response) throws ServletException, IOException {
 
         if (request.getLocalPort() != port) {
             super.doPost(request, response);
@@ -241,9 +238,7 @@ public class ListenHTTPServlet extends HttpServlet {
             final boolean createHold = 
Boolean.parseBoolean(request.getHeader(FLOWFILE_CONFIRMATION_HEADER));
             final String contentType = request.getContentType();
 
-            final InputStream unthrottled = contentGzipped ? new 
GZIPInputStream(request.getInputStream()) : request.getInputStream();
-
-            final InputStream in = (streamThrottler == null) ? unthrottled : 
streamThrottler.newThrottledInputStream(unthrottled);
+            final InputStream in = contentGzipped ? new 
GZIPInputStream(request.getInputStream()) : request.getInputStream();
 
             if (logger.isDebugEnabled()) {
                 logger.debug("Received request from {}, createHold={}, 
content-type={}, gzip={}", request.getRemoteHost(), createHold, contentType, 
contentGzipped);
@@ -328,7 +323,6 @@ public class ListenHTTPServlet extends HttpServlet {
     private Set<FlowFile> handleRequest(final HttpServletRequest request, 
final ProcessSession session, String foundSubject, String foundIssuer,
                                         final boolean destinationIsLegacyNiFi, 
final String contentType, final InputStream in) throws IOException {
         FlowFile flowFile;
-        String holdUuid = null;
         final AtomicBoolean hasMoreData = new AtomicBoolean(false);
         final FlowFileUnpackager unpackager = 
getFlowFileUnpackager(contentType);
 
@@ -366,13 +360,9 @@ public class ListenHTTPServlet extends HttpServlet {
                 attributes.put(CoreAttributes.FILENAME.key(), nameVal);
             }
 
-            String sourceSystemFlowFileIdentifier = 
attributes.get(CoreAttributes.UUID.key());
+            String sourceSystemFlowFileIdentifier = 
attributes.remove(CoreAttributes.UUID.key());
             if (sourceSystemFlowFileIdentifier != null) {
                 sourceSystemFlowFileIdentifier = "urn:nifi:" + 
sourceSystemFlowFileIdentifier;
-
-                // If we receveied a UUID, we want to give the FlowFile a new 
UUID and register the sending system's
-                // identifier as the SourceSystemFlowFileIdentifier field in 
the Provenance RECEIVE event
-                attributes.put(CoreAttributes.UUID.key(), 
UUID.randomUUID().toString());
             }
 
             flowFile = session.putAllAttributes(flowFile, attributes);
@@ -381,9 +371,6 @@ public class ListenHTTPServlet extends HttpServlet {
             session.getProvenanceReporter().receive(flowFile, 
request.getRequestURL().toString(), sourceSystemFlowFileIdentifier, details, 
transferMillis);
             flowFileSet.add(flowFile);
 
-            if (holdUuid == null) {
-                holdUuid = flowFile.getAttribute(CoreAttributes.UUID.key());
-            }
         } while (hasMoreData.get());
         return flowFileSet;
     }


Reply via email to