This is an automated email from the ASF dual-hosted git repository.

karan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 5b093294791 Fixes an issue with AppendableMemory that can cause MSQ 
jobs to fail (#17369)
5b093294791 is described below

commit 5b09329479117a54f68d5ce9e4c8b245d76a8f79
Author: Laksh Singla <[email protected]>
AuthorDate: Fri Oct 18 09:05:53 2024 +0530

    Fixes an issue with AppendableMemory that can cause MSQ jobs to fail 
(#17369)
---
 .../org/apache/druid/msq/exec/MSQWindowTest.java   |  2 +-
 .../org/apache/druid/msq/test/MSQTestBase.java     |  4 ++
 .../druid/frame/allocation/AppendableMemory.java   | 11 +++-
 .../frame/allocation/AppendableMemoryTest.java     | 73 ++++++++++++++++++++++
 .../apache/druid/frame/write/FrameWriterTest.java  | 16 +++--
 5 files changed, 98 insertions(+), 8 deletions(-)

diff --git 
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQWindowTest.java
 
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQWindowTest.java
index effca1b06f7..aa258859b50 100644
--- 
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQWindowTest.java
+++ 
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQWindowTest.java
@@ -1805,7 +1805,7 @@ public class MSQWindowTest extends MSQTestBase
         .setSql(
             "select cityName, added, SUM(added) OVER () cc from wikipedia")
         .setQueryContext(customContext)
-        .setExpectedMSQFault(new TooManyRowsInAWindowFault(15922, 200))
+        .setExpectedMSQFault(new TooManyRowsInAWindowFault(15930, 200))
         .verifyResults();
   }
 
diff --git 
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java
 
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java
index cbaae8f1b8e..62496abacf6 100644
--- 
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java
+++ 
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java
@@ -76,6 +76,7 @@ import 
org.apache.druid.java.util.common.granularity.Granularities;
 import org.apache.druid.java.util.common.granularity.Granularity;
 import org.apache.druid.java.util.common.io.Closer;
 import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.java.util.emitter.EmittingLogger;
 import org.apache.druid.java.util.http.client.Request;
 import org.apache.druid.metadata.input.InputSourceModule;
 import org.apache.druid.msq.counters.CounterNames;
@@ -159,6 +160,7 @@ import 
org.apache.druid.server.SpecificSegmentsQuerySegmentWalker;
 import org.apache.druid.server.coordination.DataSegmentAnnouncer;
 import org.apache.druid.server.coordination.NoopDataSegmentAnnouncer;
 import org.apache.druid.server.lookup.cache.LookupLoadingSpec;
+import org.apache.druid.server.metrics.NoopServiceEmitter;
 import org.apache.druid.server.security.AuthConfig;
 import org.apache.druid.server.security.AuthorizerMapper;
 import org.apache.druid.sql.DirectStatement;
@@ -587,6 +589,8 @@ public class MSQTestBase extends BaseCalciteQueryTest
     sqlStatementFactory = CalciteTests.createSqlStatementFactory(engine, 
plannerFactory);
 
     authorizerMapper = CalciteTests.TEST_EXTERNAL_AUTHORIZER_MAPPER;
+
+    EmittingLogger.registerEmitter(new NoopServiceEmitter());
   }
 
   protected CatalogResolver createMockCatalogResolver()
diff --git 
a/processing/src/main/java/org/apache/druid/frame/allocation/AppendableMemory.java
 
b/processing/src/main/java/org/apache/druid/frame/allocation/AppendableMemory.java
index 8d961f298ce..e66871186c0 100644
--- 
a/processing/src/main/java/org/apache/druid/frame/allocation/AppendableMemory.java
+++ 
b/processing/src/main/java/org/apache/druid/frame/allocation/AppendableMemory.java
@@ -158,12 +158,17 @@ public class AppendableMemory implements Closeable
 
     releaseLastBlockIfEmpty();
 
+    final int idx = currentBlockNumber();
+
+    // The request cannot be satisfied by the available bytes in the allocator
     if (bytes > allocator.available()) {
-      return false;
+      // Check if the last allocated block has enough memory to satisfy the 
request
+      if (idx < 0 || bytes + limits.getInt(idx) > 
blockHolders.get(idx).get().getCapacity()) {
+        // The request cannot be satisfied by the allocator and the last 
allocated block. Return false
+        return false;
+      }
     }
 
-    final int idx = currentBlockNumber();
-
     if (idx < 0 || bytes + limits.getInt(idx) > 
blockHolders.get(idx).get().getCapacity()) {
       // Allocation needed.
       // Math.max(allocationSize, bytes) in case "bytes" is greater than 
SOFT_MAXIMUM_ALLOCATION_SIZE.
diff --git 
a/processing/src/test/java/org/apache/druid/frame/allocation/AppendableMemoryTest.java
 
b/processing/src/test/java/org/apache/druid/frame/allocation/AppendableMemoryTest.java
new file mode 100644
index 00000000000..eca9f135c87
--- /dev/null
+++ 
b/processing/src/test/java/org/apache/druid/frame/allocation/AppendableMemoryTest.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.frame.allocation;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+public class AppendableMemoryTest
+{
+
+  @Test
+  public void testReserveAdditionalWithLargeLastBlockAndSmallAllocator()
+  {
+    /*
+    This tests the edge case when the last chunk of memory allocated by the 
allocator has greater available free space
+    than what can be allocated by the allocator. In that case, the 
availableToReserve call should return the free memory from
+    the last allocated block, and reserveAdditional when called with that 
value should return true (and not do any additional
+    allocation). This test case assumes a lot about the implementation of the 
AppendableMemory, but a lot of the assertions made
+    in this test are logical, and should hold true. The final assertion is the 
most important which checks that the free space
+    in the last block takes precedence over the memory allocator, which should 
hold true irrespective of the implementation
+     */
+
+    // Allocator that can allocate atmost 100 bytes and AppendableMemory 
created with that allocator
+    MemoryAllocator memoryAllocator = new HeapMemoryAllocator(100);
+    AppendableMemory appendableMemory = 
AppendableMemory.create(memoryAllocator, 10);
+
+    // Reserves a chunk of 10 bytes. The call should return true since the 
allocator can allocate 100 bytes
+    Assert.assertTrue(appendableMemory.reserveAdditional(10));
+
+    // Last block is empty, the appendable memory is essentially empty
+    Assert.assertEquals(100, appendableMemory.availableToReserve());
+
+    // Advance the cursor so that it is not treated as empty chunk
+    appendableMemory.advanceCursor(4);
+
+    // We should be able to use the remaining 90 bytes from the allocator
+    Assert.assertEquals(90, appendableMemory.availableToReserve());
+
+    // Reserve a chunk of 80 bytes, and advance the cursor so that it is not 
treated as an empty chunk
+    Assert.assertTrue(appendableMemory.reserveAdditional(80));
+    appendableMemory.advanceCursor(4);
+
+    // At this point, we have 2 chunks with the following (used:free) / total 
statistics
+    // chunk0 - (4:6)/10
+    // chunk1 - (4:76)/80
+    // The allocator still has 10 additional bytes to reserve
+
+    // Even though the allocator has only 10 bytes, the last chunk has 76 free 
bytes which can be used. That would take precedence
+    // since that is a larger number
+    Assert.assertEquals(76, appendableMemory.availableToReserve());
+
+    // This assertion must always be true irrespective of the internal 
implementation
+    
Assert.assertTrue(appendableMemory.reserveAdditional(appendableMemory.availableToReserve()));
+  }
+
+}
diff --git 
a/processing/src/test/java/org/apache/druid/frame/write/FrameWriterTest.java 
b/processing/src/test/java/org/apache/druid/frame/write/FrameWriterTest.java
index 3103348284b..073b83dbe22 100644
--- a/processing/src/test/java/org/apache/druid/frame/write/FrameWriterTest.java
+++ b/processing/src/test/java/org/apache/druid/frame/write/FrameWriterTest.java
@@ -344,7 +344,7 @@ public class FrameWriterTest extends 
InitializedNullHandlingTest
     for (final FrameWriterTestData.Dataset<?> dataset1 : 
FrameWriterTestData.DATASETS) {
       for (final FrameWriterTestData.Dataset<?> dataset2 : 
FrameWriterTestData.DATASETS) {
         final RowSignature signature = makeSignature(Arrays.asList(dataset1, 
dataset2));
-        final Sequence<List<Object>> rowSequence = 
unsortAndMakeRows(Arrays.asList(dataset1, dataset2));
+        final Sequence<List<Object>> rowSequence = 
unsortAndMakeRows(Arrays.asList(dataset1, dataset2), 1);
 
         final List<String> sortColumns = new ArrayList<>();
         sortColumns.add(signature.getColumnName(0));
@@ -378,7 +378,7 @@ public class FrameWriterTest extends 
InitializedNullHandlingTest
     // Test every possible capacity, up to the amount required to write all 
items from every list.
     Assume.assumeFalse(inputFrameType == FrameType.COLUMNAR || outputFrameType 
== FrameType.COLUMNAR);
     final RowSignature signature = makeSignature(FrameWriterTestData.DATASETS);
-    final Sequence<List<Object>> rowSequence = 
unsortAndMakeRows(FrameWriterTestData.DATASETS);
+    final Sequence<List<Object>> rowSequence = 
unsortAndMakeRows(FrameWriterTestData.DATASETS, 3);
     final int totalRows = rowSequence.toList().size();
 
     final List<String> sortColumns = new ArrayList<>();
@@ -648,7 +648,10 @@ public class FrameWriterTest extends 
InitializedNullHandlingTest
   /**
    * Create rows out of shuffled (unsorted) datasets.
    */
-  private static Sequence<List<Object>> unsortAndMakeRows(final 
List<FrameWriterTestData.Dataset<?>> datasets)
+  private static Sequence<List<Object>> unsortAndMakeRows(
+      final List<FrameWriterTestData.Dataset<?>> datasets,
+      final int multiplicationFactor
+  )
   {
     final List<List<Object>> retVal = new ArrayList<>();
 
@@ -672,7 +675,12 @@ public class FrameWriterTest extends 
InitializedNullHandlingTest
       retVal.add(row);
     }
 
-    return Sequences.simple(retVal);
+    List<List<Object>> multipliedRetVal = new ArrayList<>();
+    for (int i = 0; i < multiplicationFactor; ++i) {
+      multipliedRetVal.addAll(retVal);
+    }
+
+    return Sequences.simple(multipliedRetVal);
   }
 
   /**


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to