This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new 212b85f6bf9 [improve][broker] Optimize gzip compression for /metrics 
endpoint by sharing/caching compressed result (#22521)
212b85f6bf9 is described below

commit 212b85f6bf90f2e0b8232567993e43ea86525457
Author: Lari Hotari <[email protected]>
AuthorDate: Wed Apr 17 03:15:01 2024 -0700

    [improve][broker] Optimize gzip compression for /metrics endpoint by 
sharing/caching compressed result (#22521)
    
    (cherry picked from commit 94f6c7ccd2bf8bc261d45ab41f6c7f123359fa47)
    
    # Conflicts:
    #       
pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
---
 .../stats/prometheus/PrometheusMetricsServlet.java |   1 +
 .../apache/pulsar/broker/web/GzipHandlerUtil.java  |  21 +++
 .../pulsar/broker/web/GzipHandlerUtilTest.java     |  36 +++++
 .../org/apache/pulsar/broker/PulsarService.java    |   3 +-
 .../prometheus/PrometheusMetricsGenerator.java     | 176 +++++++++++++++++++--
 .../prometheus/PulsarPrometheusMetricsServlet.java |  28 +++-
 .../apache/pulsar/PrometheusMetricsTestUtil.java   |   2 +-
 7 files changed, 253 insertions(+), 14 deletions(-)

diff --git 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsServlet.java
 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsServlet.java
index 8a41bed29d4..8685348174c 100644
--- 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsServlet.java
+++ 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsServlet.java
@@ -39,6 +39,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 public class PrometheusMetricsServlet extends HttpServlet {
+    public static final String DEFAULT_METRICS_PATH = "/metrics";
     private static final long serialVersionUID = 1L;
     static final int HTTP_STATUS_OK_200 = 200;
     static final int HTTP_STATUS_INTERNAL_SERVER_ERROR_500 = 500;
diff --git 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/web/GzipHandlerUtil.java
 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/web/GzipHandlerUtil.java
index 37c9c05e5d5..9e980cecb79 100644
--- 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/web/GzipHandlerUtil.java
+++ 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/web/GzipHandlerUtil.java
@@ -19,8 +19,10 @@
 package org.apache.pulsar.broker.web;
 
 import java.util.List;
+import org.eclipse.jetty.http.pathmap.PathSpecSet;
 import org.eclipse.jetty.server.Handler;
 import org.eclipse.jetty.server.handler.gzip.GzipHandler;
+import org.eclipse.jetty.util.IncludeExclude;
 
 public class GzipHandlerUtil {
     public static Handler wrapWithGzipHandler(Handler innerHandler, 
List<String> gzipCompressionExcludedPaths) {
@@ -45,4 +47,23 @@ public class GzipHandlerUtil {
                 && (gzipCompressionExcludedPaths.get(0).equals("^.*")
                 || gzipCompressionExcludedPaths.get(0).equals("^.*$"));
     }
+
+    /**
+     * Check if GZIP compression is enabled for the given endpoint.
+     * @param gzipCompressionExcludedPaths list of paths that should not be 
compressed
+     * @param endpoint the endpoint to check
+     * @return true if GZIP compression is enabled for the endpoint, false 
otherwise
+     */
+    public static boolean isGzipCompressionEnabledForEndpoint(List<String> 
gzipCompressionExcludedPaths,
+                                                              String endpoint) 
{
+        if (gzipCompressionExcludedPaths == null || 
gzipCompressionExcludedPaths.isEmpty()) {
+            return true;
+        }
+        if (isGzipCompressionCompletelyDisabled(gzipCompressionExcludedPaths)) 
{
+            return false;
+        }
+        IncludeExclude<String> paths = new IncludeExclude<>(PathSpecSet.class);
+        paths.exclude(gzipCompressionExcludedPaths.toArray(new String[0]));
+        return paths.test(endpoint);
+    }
 }
diff --git 
a/pulsar-broker-common/src/test/java/org/apache/pulsar/broker/web/GzipHandlerUtilTest.java
 
b/pulsar-broker-common/src/test/java/org/apache/pulsar/broker/web/GzipHandlerUtilTest.java
new file mode 100644
index 00000000000..d6958695dec
--- /dev/null
+++ 
b/pulsar-broker-common/src/test/java/org/apache/pulsar/broker/web/GzipHandlerUtilTest.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.web;
+
+import static org.testng.Assert.*;
+import java.util.Arrays;
+import org.testng.annotations.Test;
+
+public class GzipHandlerUtilTest {
+
+    @Test
+    public void testIsGzipCompressionEnabledForEndpoint() {
+        assertTrue(GzipHandlerUtil.isGzipCompressionEnabledForEndpoint(null, 
"/metrics"));
+        
assertFalse(GzipHandlerUtil.isGzipCompressionEnabledForEndpoint(Arrays.asList("^.*"),
 "/metrics"));
+        
assertFalse(GzipHandlerUtil.isGzipCompressionEnabledForEndpoint(Arrays.asList("^.*$"),
 "/metrics"));
+        
assertFalse(GzipHandlerUtil.isGzipCompressionEnabledForEndpoint(Arrays.asList("/metrics"),
 "/metrics"));
+        
assertTrue(GzipHandlerUtil.isGzipCompressionEnabledForEndpoint(Arrays.asList("/metrics"),
 "/metrics2"));
+        
assertTrue(GzipHandlerUtil.isGzipCompressionEnabledForEndpoint(Arrays.asList("/admin",
 "/custom"), "/metrics"));
+    }
+}
\ No newline at end of file
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
index f1640bff581..93b7efc0243 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
@@ -107,6 +107,7 @@ import 
org.apache.pulsar.broker.service.TransactionBufferSnapshotServiceFactory;
 import org.apache.pulsar.broker.service.schema.SchemaRegistryService;
 import org.apache.pulsar.broker.service.schema.SchemaStorageFactory;
 import org.apache.pulsar.broker.stats.MetricsGenerator;
+import org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsServlet;
 import org.apache.pulsar.broker.stats.prometheus.PrometheusRawMetricsProvider;
 import 
org.apache.pulsar.broker.stats.prometheus.PulsarPrometheusMetricsServlet;
 import org.apache.pulsar.broker.storage.ManagedLedgerStorage;
@@ -1013,7 +1014,7 @@ public class PulsarService implements AutoCloseable, 
ShutdownService {
                 true, attributeMap, true, Topics.class);
 
         // Add metrics servlet
-        webService.addServlet("/metrics",
+        webService.addServlet(PrometheusMetricsServlet.DEFAULT_METRICS_PATH,
                 new ServletHolder(metricsServlet),
                 config.isAuthenticateMetricsEndpoint(), attributeMap);
 
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsGenerator.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsGenerator.java
index 4c00442ab5b..bfcbb5ec89d 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsGenerator.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsGenerator.java
@@ -30,6 +30,8 @@ import java.io.IOException;
 import java.io.OutputStream;
 import java.io.OutputStreamWriter;
 import java.io.Writer;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
 import java.nio.CharBuffer;
 import java.nio.charset.StandardCharsets;
 import java.time.Clock;
@@ -44,6 +46,8 @@ import java.util.concurrent.Executor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
+import java.util.zip.CRC32;
+import java.util.zip.Deflater;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.bookkeeper.stats.NullStatsProvider;
 import org.apache.bookkeeper.stats.StatsProvider;
@@ -73,7 +77,7 @@ public class PrometheusMetricsGenerator implements 
AutoCloseable {
     private volatile boolean closed;
 
     public static class MetricsBuffer {
-        private final CompletableFuture<ByteBuf> bufferFuture;
+        private final CompletableFuture<ResponseBuffer> bufferFuture;
         private final long createTimeslot;
         private final AtomicInteger refCnt = new AtomicInteger(2);
 
@@ -82,7 +86,7 @@ public class PrometheusMetricsGenerator implements 
AutoCloseable {
             createTimeslot = timeslot;
         }
 
-        public CompletableFuture<ByteBuf> getBufferFuture() {
+        public CompletableFuture<ResponseBuffer> getBufferFuture() {
             return bufferFuture;
         }
 
@@ -114,6 +118,151 @@ public class PrometheusMetricsGenerator implements 
AutoCloseable {
         }
     }
 
+    /**
+     * A wraps the response buffer and asynchronously provides a gzip 
compressed buffer when requested.
+     */
+    public static class ResponseBuffer {
+        private final ByteBuf uncompressedBuffer;
+        private boolean released = false;
+        private CompletableFuture<ByteBuf> compressedBuffer;
+
+        private ResponseBuffer(final ByteBuf uncompressedBuffer) {
+            this.uncompressedBuffer = uncompressedBuffer;
+        }
+
+        public ByteBuf getUncompressedBuffer() {
+            return uncompressedBuffer;
+        }
+
+        public synchronized CompletableFuture<ByteBuf> 
getCompressedBuffer(Executor executor) {
+            if (released) {
+                throw new IllegalStateException("Already released!");
+            }
+            if (compressedBuffer == null) {
+                compressedBuffer = new CompletableFuture<>();
+                ByteBuf retainedDuplicate = 
uncompressedBuffer.retainedDuplicate();
+                executor.execute(() -> {
+                    try {
+                        compressedBuffer.complete(compress(retainedDuplicate));
+                    } catch (Exception e) {
+                        compressedBuffer.completeExceptionally(e);
+                    } finally {
+                        retainedDuplicate.release();
+                    }
+                });
+            }
+            return compressedBuffer;
+        }
+
+        private ByteBuf compress(ByteBuf uncompressedBuffer) {
+            GzipByteBufferWriter gzipByteBufferWriter = new 
GzipByteBufferWriter(uncompressedBuffer.alloc(),
+                    uncompressedBuffer.readableBytes());
+            return gzipByteBufferWriter.compress(uncompressedBuffer);
+        }
+
+        public synchronized void release() {
+            released = true;
+            uncompressedBuffer.release();
+            if (compressedBuffer != null) {
+                compressedBuffer.whenComplete((byteBuf, throwable) -> {
+                    if (byteBuf != null) {
+                        byteBuf.release();
+                    }
+                });
+            }
+        }
+    }
+
+    /**
+     * Compress input nio buffers into gzip format with output in a Netty 
composite ByteBuf.
+     */
+    private static class GzipByteBufferWriter {
+        private static final byte[] GZIP_HEADER =
+                new byte[] {(byte) 0x1f, (byte) 0x8b, Deflater.DEFLATED, 0, 0, 
0, 0, 0, 0, 0};
+        private final ByteBufAllocator bufAllocator;
+        private final Deflater deflater;
+        private final CRC32 crc;
+        private final int bufferSize;
+        private final CompositeByteBuf resultBuffer;
+        private ByteBuf backingCompressBuffer;
+        private ByteBuffer compressBuffer;
+
+        GzipByteBufferWriter(ByteBufAllocator bufAllocator, int readableBytes) 
{
+            deflater = new Deflater(Deflater.DEFAULT_COMPRESSION, true);
+            crc = new CRC32();
+            this.bufferSize = 
Math.max(Math.min(resolveChunkSize(bufAllocator), readableBytes), 8192);
+            this.bufAllocator = bufAllocator;
+            this.resultBuffer = 
bufAllocator.compositeDirectBuffer(readableBytes / bufferSize + 1);
+            allocateBuffer();
+        }
+
+        /**
+         * Compress the input Netty buffer and append it to the result buffer 
in gzip format.
+         * @param uncompressedBuffer
+         */
+        public ByteBuf compress(ByteBuf uncompressedBuffer) {
+            try {
+                ByteBuffer[] nioBuffers = uncompressedBuffer.nioBuffers();
+                for (int i = 0, nioBuffersLength = nioBuffers.length; i < 
nioBuffersLength; i++) {
+                    ByteBuffer nioBuffer = nioBuffers[i];
+                    compressAndAppend(nioBuffer, i == 0, i == nioBuffersLength 
- 1);
+                }
+                return resultBuffer;
+            } finally {
+                close();
+            }
+        }
+
+        private void compressAndAppend(ByteBuffer nioBuffer, boolean isFirst, 
boolean isLast) {
+            if (isFirst) {
+                // write gzip header
+                compressBuffer.put(GZIP_HEADER);
+            }
+            nioBuffer.mark();
+            crc.update(nioBuffer);
+            nioBuffer.reset();
+            deflater.setInput(nioBuffer);
+            if (isLast) {
+                deflater.finish();
+            }
+            while (!deflater.needsInput() && !deflater.finished()) {
+                int written = deflater.deflate(compressBuffer);
+                if (written == 0 && !compressBuffer.hasRemaining()) {
+                    backingCompressBuffer.setIndex(0, 
compressBuffer.position());
+                    resultBuffer.addComponent(true, backingCompressBuffer);
+                    allocateBuffer();
+                }
+            }
+            if (isLast) {
+                // write gzip footer, integer values are in little endian byte 
order
+                compressBuffer.order(ByteOrder.LITTLE_ENDIAN);
+                // write CRC32 checksum
+                compressBuffer.putInt((int) crc.getValue());
+                // write uncompressed size
+                compressBuffer.putInt(deflater.getTotalIn());
+                // append the last compressed buffer
+                backingCompressBuffer.setIndex(0, compressBuffer.position());
+                resultBuffer.addComponent(true, backingCompressBuffer);
+                backingCompressBuffer = null;
+                compressBuffer = null;
+            }
+        }
+
+        private void allocateBuffer() {
+            backingCompressBuffer = bufAllocator.directBuffer(bufferSize);
+            compressBuffer = backingCompressBuffer.nioBuffer(0, bufferSize);
+        }
+
+        private void close() {
+            if (deflater != null) {
+                deflater.end();
+            }
+            if (backingCompressBuffer != null) {
+                backingCompressBuffer.release();
+            }
+        }
+    }
+
     private final PulsarService pulsar;
     private final boolean includeTopicMetrics;
     private final boolean includeConsumerMetrics;
@@ -188,13 +337,7 @@ public class PrometheusMetricsGenerator implements 
AutoCloseable {
         // use composite buffer with pre-allocated buffers to ensure that the 
pooled allocator can be used
         // for allocating the buffers
         ByteBufAllocator byteBufAllocator = PulsarByteBufAllocator.DEFAULT;
-        int chunkSize;
-        if (byteBufAllocator instanceof PooledByteBufAllocator) {
-            PooledByteBufAllocator pooledByteBufAllocator = 
(PooledByteBufAllocator) byteBufAllocator;
-            chunkSize = Math.max(pooledByteBufAllocator.metric().chunkSize(), 
DEFAULT_INITIAL_BUFFER_SIZE);
-        } else {
-            chunkSize = DEFAULT_INITIAL_BUFFER_SIZE;
-        }
+        int chunkSize = resolveChunkSize(byteBufAllocator);
         CompositeByteBuf buf = byteBufAllocator.compositeDirectBuffer(
                 Math.max(MINIMUM_FOR_MAX_COMPONENTS, (initialBufferSize / 
chunkSize) + 1));
         int totalLen = 0;
@@ -205,6 +348,17 @@ public class PrometheusMetricsGenerator implements 
AutoCloseable {
         return buf;
     }
 
+    private static int resolveChunkSize(ByteBufAllocator byteBufAllocator) {
+        int chunkSize;
+        if (byteBufAllocator instanceof PooledByteBufAllocator) {
+            PooledByteBufAllocator pooledByteBufAllocator = 
(PooledByteBufAllocator) byteBufAllocator;
+            chunkSize = Math.max(pooledByteBufAllocator.metric().chunkSize(), 
DEFAULT_INITIAL_BUFFER_SIZE);
+        } else {
+            chunkSize = DEFAULT_INITIAL_BUFFER_SIZE;
+        }
+        return chunkSize;
+    }
+
     private static void generateBrokerBasicMetrics(PulsarService pulsar, 
SimpleTextOutputStream stream) {
         String clusterName = pulsar.getConfiguration().getClusterName();
         // generate managedLedgerCache metrics
@@ -336,10 +490,10 @@ public class PrometheusMetricsGenerator implements 
AutoCloseable {
                     if (currentMetricsBuffer != null) {
                         currentMetricsBuffer.release();
                     }
-                    CompletableFuture<ByteBuf> bufferFuture = 
newMetricsBuffer.getBufferFuture();
+                    CompletableFuture<ResponseBuffer> bufferFuture = 
newMetricsBuffer.getBufferFuture();
                     executor.execute(() -> {
                         try {
-                            bufferFuture.complete(generate0(metricsProviders));
+                            bufferFuture.complete(new 
ResponseBuffer(generate0(metricsProviders)));
                         } catch (Exception e) {
                             bufferFuture.completeExceptionally(e);
                         } finally {
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PulsarPrometheusMetricsServlet.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PulsarPrometheusMetricsServlet.java
index 7fcc74e965c..43514d481dc 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PulsarPrometheusMetricsServlet.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PulsarPrometheusMetricsServlet.java
@@ -18,10 +18,13 @@
  */
 package org.apache.pulsar.broker.stats.prometheus;
 
+import static 
org.apache.pulsar.broker.web.GzipHandlerUtil.isGzipCompressionEnabledForEndpoint;
 import java.io.EOFException;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.time.Clock;
+import java.util.Arrays;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import javax.servlet.AsyncContext;
@@ -40,6 +43,7 @@ public class PulsarPrometheusMetricsServlet extends 
PrometheusMetricsServlet {
     private static final int EXECUTOR_MAX_THREADS = 4;
 
     private final PrometheusMetricsGenerator prometheusMetricsGenerator;
+    private final boolean gzipCompressionEnabledForMetrics;
 
     public PulsarPrometheusMetricsServlet(PulsarService pulsar, boolean 
includeTopicMetrics,
                                           boolean includeConsumerMetrics, 
boolean includeProducerMetrics,
@@ -50,6 +54,8 @@ public class PulsarPrometheusMetricsServlet extends 
PrometheusMetricsServlet {
         prometheusMetricsGenerator =
                 new PrometheusMetricsGenerator(pulsar, includeTopicMetrics, 
includeConsumerMetrics,
                         includeProducerMetrics, splitTopicAndPartitionLabel, 
Clock.systemUTC());
+        gzipCompressionEnabledForMetrics = isGzipCompressionEnabledForEndpoint(
+                
pulsar.getConfiguration().getHttpServerGzipCompressionExcludedPaths(), 
DEFAULT_METRICS_PATH);
     }
 
 
@@ -100,7 +106,14 @@ public class PulsarPrometheusMetricsServlet extends 
PrometheusMetricsServlet {
             context.complete();
             return;
         }
-        metricsBuffer.getBufferFuture().whenComplete((buffer, ex) -> 
executor.execute(() -> {
+        boolean compressOutput = gzipCompressionEnabledForMetrics && 
isGzipAccepted(request);
+        metricsBuffer.getBufferFuture().thenCompose(responseBuffer -> {
+            if (compressOutput) {
+                return responseBuffer.getCompressedBuffer(executor);
+            } else {
+                return 
CompletableFuture.completedFuture(responseBuffer.getUncompressedBuffer());
+            }
+        }).whenComplete((buffer, ex) -> executor.execute(() -> {
             try {
                 long elapsedNanos = System.nanoTime() - startNanos;
                 // check if the request has been timed out, implement a soft 
timeout
@@ -133,6 +146,9 @@ public class PulsarPrometheusMetricsServlet extends 
PrometheusMetricsServlet {
                 } else {
                     response.setStatus(HTTP_STATUS_OK_200);
                     response.setContentType("text/plain;charset=utf-8");
+                    if (compressOutput) {
+                        response.setHeader("Content-Encoding", "gzip");
+                    }
                     ServletOutputStream outputStream = 
response.getOutputStream();
                     if (outputStream instanceof HttpOutput) {
                         HttpOutput output = (HttpOutput) outputStream;
@@ -156,4 +172,14 @@ public class PulsarPrometheusMetricsServlet extends 
PrometheusMetricsServlet {
             }
         }));
     }
+
+    private boolean isGzipAccepted(HttpServletRequest request) {
+        String acceptEncoding = request.getHeader("Accept-Encoding");
+        if (acceptEncoding != null) {
+            return Arrays.stream(acceptEncoding.split(","))
+                    .map(String::trim)
+                    .anyMatch(str -> "gzip".equalsIgnoreCase(str));
+        }
+        return false;
+    }
 }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/PrometheusMetricsTestUtil.java 
b/pulsar-broker/src/test/java/org/apache/pulsar/PrometheusMetricsTestUtil.java
index fcc3b6aa88f..68826372b7b 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/PrometheusMetricsTestUtil.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/PrometheusMetricsTestUtil.java
@@ -55,7 +55,7 @@ public class PrometheusMetricsTestUtil {
         try {
             ByteBuf buffer = null;
             try {
-                buffer = metricsBuffer.getBufferFuture().get(5, 
TimeUnit.SECONDS);
+                buffer = metricsBuffer.getBufferFuture().get(5, 
TimeUnit.SECONDS).getUncompressedBuffer();
             } catch (InterruptedException e) {
                 Thread.currentThread().interrupt();
                 throw new IOException(e);

Reply via email to