This is an automated email from the ASF dual-hosted git repository.

lakshsingla pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 1e96c85b383 WindowOperatorQueryFrameProcessor: Avoid writing multiple 
frames to output channel in runIncrementally() (#17373)
1e96c85b383 is described below

commit 1e96c85b383fdaff34faad7c3afbe3eb67c5d6c4
Author: Akshat Jain <[email protected]>
AuthorDate: Wed Oct 23 10:34:37 2024 +0530

    WindowOperatorQueryFrameProcessor: Avoid writing multiple frames to output 
channel in runIncrementally() (#17373)
    
    WindowOperatorQueryFrameProcessor: Avoid writing multiple frames to output 
channel in runIncrementally()
---
 .../WindowOperatorQueryFrameProcessor.java         |  3 +
 .../WindowOperatorQueryFrameProcessorTest.java     | 84 ++++++++++++++++++++++
 2 files changed, 87 insertions(+)

diff --git 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/WindowOperatorQueryFrameProcessor.java
 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/WindowOperatorQueryFrameProcessor.java
index 6e91b19df4d..04cdab3b1fe 100644
--- 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/WindowOperatorQueryFrameProcessor.java
+++ 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/WindowOperatorQueryFrameProcessor.java
@@ -144,6 +144,9 @@ public class WindowOperatorQueryFrameProcessor implements 
FrameProcessor<Object>
 
       if (needToProcessBatch()) {
         runAllOpsOnBatch();
+        if (inputChannel.isFinished()) {
+          return ReturnOrAwait.runAgain();
+        }
         flushAllRowsAndCols();
       }
       return ReturnOrAwait.runAgain();
diff --git 
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/querykit/WindowOperatorQueryFrameProcessorTest.java
 
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/querykit/WindowOperatorQueryFrameProcessorTest.java
index 02cb02360d9..5d1b350ca92 100644
--- 
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/querykit/WindowOperatorQueryFrameProcessorTest.java
+++ 
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/querykit/WindowOperatorQueryFrameProcessorTest.java
@@ -156,6 +156,90 @@ public class WindowOperatorQueryFrameProcessorTest extends 
FrameProcessorTestBas
     }
   }
 
+  @Test
+  public void testOutputChannelReachingCapacity() throws IOException
+  {
+    // This test validates that we don't end up writing multiple (2) frames to 
the output channel while reading from the input channel,
+    // in the scenario when the input channel has finished and receiver's 
completed() gets called.
+    final ReadableInput factChannel = buildWindowTestInputChannel();
+
+    RowSignature inputSignature = RowSignature.builder()
+                                              .add("cityName", 
ColumnType.STRING)
+                                              .add("added", ColumnType.LONG)
+                                              .build();
+
+    FrameReader frameReader = FrameReader.create(inputSignature);
+
+    RowSignature outputSignature = RowSignature.builder()
+                                               .addAll(inputSignature)
+                                               .add("w0", ColumnType.LONG)
+                                               .build();
+
+    final WindowOperatorQuery query = new WindowOperatorQuery(
+        new QueryDataSource(
+            Druids.newScanQueryBuilder()
+                  .dataSource(new TableDataSource("test"))
+                  .intervals(new LegacySegmentSpec(Intervals.ETERNITY))
+                  .columns("cityName", "added")
+                  
.resultFormat(ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST)
+                  .context(new HashMap<>())
+                  .build()),
+        new LegacySegmentSpec(Intervals.ETERNITY),
+        new HashMap<>(
+            // This ends up satisfying the criteria of needToProcessBatch() 
method,
+            // so we end up processing the rows we've read, hence writing the 
1st frame to the output channel.
+            
ImmutableMap.of(MultiStageQueryContext.MAX_ROWS_MATERIALIZED_IN_WINDOW, 12)
+        ),
+        outputSignature,
+        ImmutableList.of(
+            new WindowOperatorFactory(new WindowRowNumberProcessor("w0"))
+        ),
+        ImmutableList.of()
+    );
+
+    final FrameWriterFactory frameWriterFactory = new 
LimitedFrameWriterFactory(
+        FrameWriters.makeRowBasedFrameWriterFactory(
+            new ArenaMemoryAllocatorFactory(1 << 20),
+            outputSignature,
+            Collections.emptyList(),
+            false
+        ),
+        INPUT_ROWS.size() / 4 // This forces frameWriter's capacity to be 
reached, hence requiring another write.
+    );
+
+    final BlockingQueueFrameChannel outputChannel = 
BlockingQueueFrameChannel.minimal();
+    final WindowOperatorQueryFrameProcessor processor = new 
WindowOperatorQueryFrameProcessor(
+        query,
+        factChannel.getChannel(),
+        outputChannel.writable(),
+        frameWriterFactory,
+        frameReader,
+        new ObjectMapper(),
+        ImmutableList.of(
+            new WindowOperatorFactory(new WindowRowNumberProcessor("w0"))
+        )
+    );
+
+    exec.runFully(processor, null);
+
+    final Sequence<List<Object>> rowsFromProcessor = 
FrameTestUtil.readRowsFromFrameChannel(
+        outputChannel.readable(),
+        FrameReader.create(outputSignature)
+    );
+
+    List<List<Object>> outputRows = rowsFromProcessor.toList();
+    Assert.assertEquals(INPUT_ROWS.size(), outputRows.size());
+
+    for (int i = 0; i < INPUT_ROWS.size(); i++) {
+      Map<String, Object> inputRow = INPUT_ROWS.get(i);
+      List<Object> outputRow = outputRows.get(i);
+
+      Assert.assertEquals("cityName should match", inputRow.get("cityName"), 
outputRow.get(0));
+      Assert.assertEquals("added should match", inputRow.get("added"), 
outputRow.get(1));
+      Assert.assertEquals("row_number() should be correct", (long) i + 1, 
outputRow.get(2));
+    }
+  }
+
   @Test
   public void testProcessorRun() throws Exception
   {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to