This is an automated email from the ASF dual-hosted git repository.

gian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 414176fb97 Fix accounting of bytesAdded in 
ReadableByteChunksFrameChannel. (#12988)
414176fb97 is described below

commit 414176fb97b0bb852feffa11bb1914d4c82e6d64
Author: Gian Merlino <[email protected]>
AuthorDate: Mon Aug 29 18:25:28 2022 -0700

    Fix accounting of bytesAdded in ReadableByteChunksFrameChannel. (#12988)
    
    * Fix accounting of bytesAdded in ReadableByteChunksFrameChannel.
    
    Could cause WorkerInputChannelFactory to get into an infinite loop when
    reading the footer of a frame file.
    
    * Additional tests.
---
 .../druid/frame/channel/ReadableByteChunksFrameChannel.java  |  3 ++-
 .../frame/channel/ReadableByteChunksFrameChannelTest.java    | 12 ++++++++++++
 2 files changed, 14 insertions(+), 1 deletion(-)

diff --git 
a/processing/src/main/java/org/apache/druid/frame/channel/ReadableByteChunksFrameChannel.java
 
b/processing/src/main/java/org/apache/druid/frame/channel/ReadableByteChunksFrameChannel.java
index f9acf2d0e4..2d659c2b30 100644
--- 
a/processing/src/main/java/org/apache/druid/frame/channel/ReadableByteChunksFrameChannel.java
+++ 
b/processing/src/main/java/org/apache/druid/frame/channel/ReadableByteChunksFrameChannel.java
@@ -139,12 +139,13 @@ public class ReadableByteChunksFrameChannel implements 
ReadableFrameChannel
 
       try {
         if (chunk.length > 0) {
+          bytesAdded += chunk.length;
+
           if (streamPart != StreamPart.FOOTER) {
             // Footer is discarded: it isn't useful when reading frame files 
as streams. (It contains pointers
             // for random access of frames.)
             chunks.add(Either.value(chunk));
             bytesBuffered += chunk.length;
-            bytesAdded += chunk.length;
           }
 
           updateStreamState();
diff --git 
a/processing/src/test/java/org/apache/druid/frame/channel/ReadableByteChunksFrameChannelTest.java
 
b/processing/src/test/java/org/apache/druid/frame/channel/ReadableByteChunksFrameChannelTest.java
index 2040c086c8..a6e95f34ab 100644
--- 
a/processing/src/test/java/org/apache/druid/frame/channel/ReadableByteChunksFrameChannelTest.java
+++ 
b/processing/src/test/java/org/apache/druid/frame/channel/ReadableByteChunksFrameChannelTest.java
@@ -108,6 +108,7 @@ public class ReadableByteChunksFrameChannelTest
       final ReadableByteChunksFrameChannel channel = 
ReadableByteChunksFrameChannel.create("test");
       channel.addChunk(Files.toByteArray(file));
       channel.doneWriting();
+      Assert.assertEquals(file.length(), channel.getBytesAdded());
 
       while (channel.canRead()) {
         Assert.assertFalse(channel.isFinished());
@@ -145,6 +146,7 @@ public class ReadableByteChunksFrameChannelTest
       final ReadableByteChunksFrameChannel channel = 
ReadableByteChunksFrameChannel.create("test");
       channel.addChunk(truncatedFile);
       channel.doneWriting();
+      Assert.assertEquals(truncatedFile.length, channel.getBytesAdded());
 
       Assert.assertTrue(channel.canRead());
       Assert.assertFalse(channel.isFinished());
@@ -187,8 +189,11 @@ public class ReadableByteChunksFrameChannelTest
       final byte[] chunk1 = new byte[errorAtBytePosition];
       System.arraycopy(fileBytes, 0, chunk1, 0, chunk1.length);
       channel.addChunk(chunk1);
+      Assert.assertEquals(chunk1.length, channel.getBytesAdded());
+
       channel.setError(new ISE("Test error!"));
       channel.doneWriting();
+      Assert.assertEquals(chunk1.length, channel.getBytesAdded());
 
       expectedException.expect(IllegalStateException.class);
       expectedException.expectMessage("Test error!");
@@ -250,13 +255,17 @@ public class ReadableByteChunksFrameChannelTest
       final ReadableByteChunksFrameChannel channel = 
ReadableByteChunksFrameChannel.create("test");
       ListenableFuture<?> firstBackpressureFuture = null;
 
+      long totalSize = 0;
       Assert.assertEquals(0, channel.getBytesBuffered());
 
       try (final Chunker chunker = new Chunker(new FileInputStream(file), 
chunkSize)) {
         byte[] chunk;
 
         while ((chunk = chunker.nextChunk()) != null) {
+          totalSize += chunk.length;
+
           final ListenableFuture<?> backpressureFuture = 
channel.addChunk(chunk);
+          Assert.assertEquals(channel.getBytesAdded(), totalSize);
 
           // Minimally-sized channel means backpressure is exerted as soon as 
a single frame is available.
           Assert.assertEquals(channel.canRead(), backpressureFuture != null);
@@ -303,6 +312,7 @@ public class ReadableByteChunksFrameChannelTest
       ListenableFuture<?> backpressureFuture = null;
 
       int iteration = 0;
+      long totalSize = 0;
 
       try (final Chunker chunker = new Chunker(new FileInputStream(file), 
chunkSize)) {
         byte[] chunk;
@@ -327,9 +337,11 @@ public class ReadableByteChunksFrameChannelTest
           }
 
           iteration++;
+          totalSize += chunk.length;
 
           // Write next chunk.
           final ListenableFuture<?> addVal = channel.addChunk(chunk);
+          Assert.assertEquals(totalSize, channel.getBytesAdded());
 
           // Minimally-sized channel means backpressure is exerted as soon as 
a single frame is available.
           Assert.assertEquals(channel.canRead(), addVal != null);


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to