This is an automated email from the ASF dual-hosted git repository.

gian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 46cbb334283 FrameChannelMerger: Fix incorrect behavior of finished(). 
(#17088)
46cbb334283 is described below

commit 46cbb33428368d676203ec6cdb024432fe378390
Author: Gian Merlino <[email protected]>
AuthorDate: Tue Sep 17 08:35:54 2024 -0700

    FrameChannelMerger: Fix incorrect behavior of finished(). (#17088)
    
    Previously, the processor used "remainingChannels" to track the number of
    non-null entries of currentFrame. Now, "remainingChannels" tracks the
    number of channels that are unfinished.
    
    The difference is subtle. In the previous code, when an input channel
    was blocked upon exiting nextFrame(), the "currentFrames" entry would be
    null, and therefore the "remainingChannels" variable would be decremented.
    After the next await and call to populateCurrentFramesAndTournamentTree(),
    "remainingChannels" would be incremented if the channel had become
    unblocked after awaiting.
    
    This means that finished(), which returned true if remainingChannels was
    zero, would not be reliable if called between nextFrame() and the
    next await + populateCurrentFramesAndTournamentTree().
    
    This patch changes things such that finished() is always reliable. This
    fixes a regression introduced in PR #16911, which added a call to
    finished() that was, at that time, unsafe.
---
 .../druid/frame/processor/FrameChannelMerger.java  | 27 ++++---
 .../druid/frame/processor/SuperSorterTest.java     | 47 +++++++++++-
 ...AlwaysAsyncPartitionedReadableFrameChannel.java | 51 +++++++++++++
 .../test/AlwaysAsyncReadableFrameChannel.java      | 85 ++++++++++++++++++++++
 4 files changed, 198 insertions(+), 12 deletions(-)

diff --git 
a/processing/src/main/java/org/apache/druid/frame/processor/FrameChannelMerger.java
 
b/processing/src/main/java/org/apache/druid/frame/processor/FrameChannelMerger.java
index e806f98b926..662fd001b02 100644
--- 
a/processing/src/main/java/org/apache/druid/frame/processor/FrameChannelMerger.java
+++ 
b/processing/src/main/java/org/apache/druid/frame/processor/FrameChannelMerger.java
@@ -19,8 +19,10 @@
 
 package org.apache.druid.frame.processor;
 
+import it.unimi.dsi.fastutil.ints.IntAVLTreeSet;
 import it.unimi.dsi.fastutil.ints.IntOpenHashSet;
 import it.unimi.dsi.fastutil.ints.IntSet;
+import it.unimi.dsi.fastutil.ints.IntSets;
 import org.apache.druid.frame.Frame;
 import org.apache.druid.frame.channel.FrameWithPartition;
 import org.apache.druid.frame.channel.ReadableFrameChannel;
@@ -68,7 +70,11 @@ public class FrameChannelMerger implements 
FrameProcessor<Long>
   private final long rowLimit;
   private long rowsOutput = 0;
   private int currentPartition = 0;
-  private int remainingChannels;
+
+  /**
+   * Channels that still have input to read.
+   */
+  private final IntSet remainingChannels;
 
   // ColumnSelectorFactory that always reads from the current row in the 
merged sequence.
   final MultiColumnSelectorFactory mergedColumnSelectorFactory;
@@ -119,7 +125,7 @@ public class FrameChannelMerger implements 
FrameProcessor<Long>
     this.partitions = partitionsToUse;
     this.rowLimit = rowLimit;
     this.currentFrames = new FramePlus[inputChannels.size()];
-    this.remainingChannels = 0;
+    this.remainingChannels = new IntAVLTreeSet(IntSets.fromTo(0, 
inputChannels.size()));
     this.tournamentTree = new TournamentTree(
         inputChannels.size(),
         (k1, k2) -> {
@@ -241,7 +247,7 @@ public class FrameChannelMerger implements 
FrameProcessor<Long>
         if (rowLimit != UNLIMITED && rowsOutput >= rowLimit) {
           // Limit reached; we're done.
           Arrays.fill(currentFrames, null);
-          remainingChannels = 0;
+          remainingChannels.clear();
         } else {
           // Continue reading the currentChannel.
           final FramePlus channelFramePlus = currentFrames[currentChannel];
@@ -251,7 +257,6 @@ public class FrameChannelMerger implements 
FrameProcessor<Long>
             // Done reading current frame from "channel".
             // Clear it and see if there is another one available for 
immediate loading.
             currentFrames[currentChannel] = null;
-            remainingChannels--;
 
             final ReadableFrameChannel channel = 
inputChannels.get(currentChannel);
 
@@ -265,10 +270,10 @@ public class FrameChannelMerger implements 
FrameProcessor<Long>
                 break;
               } else {
                 currentFrames[currentChannel] = framePlus;
-                remainingChannels++;
               }
             } else if (channel.isFinished()) {
               // Done reading this channel. Fall through and continue with 
other channels.
+              remainingChannels.remove(currentChannel);
             } else {
               // Nothing available, not finished; we can't continue. Finish up 
the current frame and return it.
               break;
@@ -282,9 +287,12 @@ public class FrameChannelMerger implements 
FrameProcessor<Long>
     }
   }
 
+  /**
+   * Returns whether all input is done being read.
+   */
   private boolean finished()
   {
-    return remainingChannels == 0;
+    return remainingChannels.isEmpty();
   }
 
   @Override
@@ -302,7 +310,7 @@ public class FrameChannelMerger implements 
FrameProcessor<Long>
     final IntSet await = new IntOpenHashSet();
 
     for (int i = 0; i < inputChannels.size(); i++) {
-      if (currentFrames[i] == null) {
+      if (currentFrames[i] == null && remainingChannels.contains(i)) {
         final ReadableFrameChannel channel = inputChannels.get(i);
 
         if (channel.canRead()) {
@@ -312,9 +320,10 @@ public class FrameChannelMerger implements 
FrameProcessor<Long>
             await.add(i);
           } else {
             currentFrames[i] = framePlus;
-            remainingChannels++;
           }
-        } else if (!channel.isFinished()) {
+        } else if (channel.isFinished()) {
+          remainingChannels.remove(i);
+        } else {
           await.add(i);
         }
       }
diff --git 
a/processing/src/test/java/org/apache/druid/frame/processor/SuperSorterTest.java
 
b/processing/src/test/java/org/apache/druid/frame/processor/SuperSorterTest.java
index 7a885af49c5..80e7f6352d0 100644
--- 
a/processing/src/test/java/org/apache/druid/frame/processor/SuperSorterTest.java
+++ 
b/processing/src/test/java/org/apache/druid/frame/processor/SuperSorterTest.java
@@ -38,6 +38,8 @@ import org.apache.druid.frame.key.KeyOrder;
 import org.apache.druid.frame.key.KeyTestUtils;
 import org.apache.druid.frame.key.RowKey;
 import org.apache.druid.frame.key.RowKeyReader;
+import 
org.apache.druid.frame.processor.test.AlwaysAsyncPartitionedReadableFrameChannel;
+import org.apache.druid.frame.processor.test.AlwaysAsyncReadableFrameChannel;
 import org.apache.druid.frame.read.FrameReader;
 import org.apache.druid.frame.testutil.FrameSequenceBuilder;
 import org.apache.druid.frame.testutil.FrameTestUtil;
@@ -434,8 +436,8 @@ public class SuperSorterTest
           clusterByPartitionsFuture,
           exec,
           FrameProcessorDecorator.NONE,
-          new FileOutputChannelFactory(tempFolder, maxBytesPerFrame, null),
-          outputChannelFactory,
+          makeOutputChannelFactory(new FileOutputChannelFactory(tempFolder, 
maxBytesPerFrame, null)),
+          makeOutputChannelFactory(outputChannelFactory),
           maxActiveProcessors,
           maxChannelsPerProcessor,
           limitHint,
@@ -839,9 +841,48 @@ public class SuperSorterTest
 
     for (final BlockingQueueFrameChannel channel : channels) {
       channel.writable().close();
-      retVal.add(channel.readable());
+      retVal.add(new AlwaysAsyncReadableFrameChannel(channel.readable()));
     }
 
     return retVal;
   }
+
+  /**
+   * Wraps an underlying {@link OutputChannelFactory} in one that uses {@link 
AlwaysAsyncReadableFrameChannel}
+   * for all of its readable channels. This helps catch bugs due to improper 
usage of {@link ReadableFrameChannel}
+   * methods that enable async reads.
+   */
+  private static OutputChannelFactory makeOutputChannelFactory(final 
OutputChannelFactory baseFactory)
+  {
+    return new OutputChannelFactory() {
+      @Override
+      public OutputChannel openChannel(int partitionNumber) throws IOException
+      {
+        final OutputChannel channel = baseFactory.openChannel(partitionNumber);
+        return OutputChannel.pair(
+            channel.getWritableChannel(),
+            channel.getFrameMemoryAllocator(),
+            () -> new 
AlwaysAsyncReadableFrameChannel(channel.getReadableChannelSupplier().get()),
+            channel.getPartitionNumber()
+        );
+      }
+
+      @Override
+      public PartitionedOutputChannel openPartitionedChannel(String name, 
boolean deleteAfterRead) throws IOException
+      {
+        final PartitionedOutputChannel channel = 
baseFactory.openPartitionedChannel(name, deleteAfterRead);
+        return PartitionedOutputChannel.pair(
+            channel.getWritableChannel(),
+            channel.getFrameMemoryAllocator(),
+            () -> new 
AlwaysAsyncPartitionedReadableFrameChannel(channel.getReadableChannelSupplier().get())
+        );
+      }
+
+      @Override
+      public OutputChannel openNilChannel(int partitionNumber)
+      {
+        return baseFactory.openNilChannel(partitionNumber);
+      }
+    };
+  }
 }
diff --git 
a/processing/src/test/java/org/apache/druid/frame/processor/test/AlwaysAsyncPartitionedReadableFrameChannel.java
 
b/processing/src/test/java/org/apache/druid/frame/processor/test/AlwaysAsyncPartitionedReadableFrameChannel.java
new file mode 100644
index 00000000000..4013889df6e
--- /dev/null
+++ 
b/processing/src/test/java/org/apache/druid/frame/processor/test/AlwaysAsyncPartitionedReadableFrameChannel.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.frame.processor.test;
+
+import org.apache.druid.frame.channel.PartitionedReadableFrameChannel;
+import org.apache.druid.frame.channel.ReadableFrameChannel;
+
+import java.io.IOException;
+
+/**
+ * Implementation of {@link PartitionedReadableFrameChannel} that wraps all 
underlying channels in
+ * {@link AlwaysAsyncReadableFrameChannel}.
+ */
+public class AlwaysAsyncPartitionedReadableFrameChannel implements 
PartitionedReadableFrameChannel
+{
+  private final PartitionedReadableFrameChannel delegate;
+
+  public 
AlwaysAsyncPartitionedReadableFrameChannel(PartitionedReadableFrameChannel 
delegate)
+  {
+    this.delegate = delegate;
+  }
+
+  @Override
+  public ReadableFrameChannel getReadableFrameChannel(int partitionNumber)
+  {
+    return new 
AlwaysAsyncReadableFrameChannel(delegate.getReadableFrameChannel(partitionNumber));
+  }
+
+  @Override
+  public void close() throws IOException
+  {
+    delegate.close();
+  }
+}
diff --git 
a/processing/src/test/java/org/apache/druid/frame/processor/test/AlwaysAsyncReadableFrameChannel.java
 
b/processing/src/test/java/org/apache/druid/frame/processor/test/AlwaysAsyncReadableFrameChannel.java
new file mode 100644
index 00000000000..8ff10aeb7b0
--- /dev/null
+++ 
b/processing/src/test/java/org/apache/druid/frame/processor/test/AlwaysAsyncReadableFrameChannel.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.frame.processor.test;
+
+import com.google.common.util.concurrent.ListenableFuture;
+import org.apache.druid.frame.Frame;
+import org.apache.druid.frame.channel.ReadableFrameChannel;
+import org.apache.druid.java.util.common.ISE;
+
+/**
+ * Wraps an underlying channel and forces an async style of reading. After 
each call to {@link #read()}, the
+ * {@link #canRead()} and {@link #isFinished()} methods return false until 
{@link #readabilityFuture()} is called.
+ */
+public class AlwaysAsyncReadableFrameChannel implements ReadableFrameChannel
+{
+  private final ReadableFrameChannel delegate;
+  private boolean defer;
+
+  public AlwaysAsyncReadableFrameChannel(ReadableFrameChannel delegate)
+  {
+    this.delegate = delegate;
+  }
+
+  @Override
+  public boolean isFinished()
+  {
+    if (defer) {
+      return false;
+    }
+
+    return delegate.isFinished();
+  }
+
+  @Override
+  public boolean canRead()
+  {
+    if (defer) {
+      return false;
+    }
+
+    return delegate.canRead();
+  }
+
+  @Override
+  public Frame read()
+  {
+    if (defer) {
+      throw new ISE("Cannot call read() while deferred");
+    }
+
+    defer = true;
+    return delegate.read();
+  }
+
+  @Override
+  public ListenableFuture<?> readabilityFuture()
+  {
+    defer = false;
+    return delegate.readabilityFuture();
+  }
+
+  @Override
+  public void close()
+  {
+    defer = false;
+    delegate.close();
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to