This is an automated email from the ASF dual-hosted git repository.

mblow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/asterixdb.git

commit 6c80e8fc009b37db0a8cb2993bf2c93ac56f480c
Author: Ali Alsuliman <[email protected]>
AuthorDate: Wed Feb 8 20:20:57 2023 -0800

    [NO ISSUE][RUN] Incorrect accounting of tuple size during build phase of 
join
    
    - user model changes: no
    - storage format changes: no
    - interface changes: no
    
    Details:
    
    Change-Id: Idd732642424e38892c4f48c5ab77cdb3c747be22
    Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/17367
    Integration-Tests: Jenkins <[email protected]>
    Tested-by: Jenkins <[email protected]>
    Reviewed-by: Murtadha Hubail <[email protected]>
---
 .../IPartitionedTupleBufferManager.java            |  9 ++++++
 .../PreferToSpillFullyOccupiedFramePolicy.java     | 10 +++++++
 .../VPartitionTupleBufferManager.java              | 29 ++++++++++++-------
 .../dataflow/std/join/OptimizedHybridHashJoin.java | 33 ++++++++++++++++------
 4 files changed, 63 insertions(+), 18 deletions(-)

diff --git 
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/IPartitionedTupleBufferManager.java
 
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/IPartitionedTupleBufferManager.java
index 8051305f94..3645855f5e 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/IPartitionedTupleBufferManager.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/IPartitionedTupleBufferManager.java
@@ -74,6 +74,15 @@ public interface IPartitionedTupleBufferManager {
     boolean insertTuple(int partition, IFrameTupleAccessor tupleAccessor, int 
tupleId, TuplePointer pointer)
             throws HyracksDataException;
 
+    /**
+     * Returns the number of frames needed to accommodate the tuple.
+     *
+     * @param tupleSize tuple size
+     * @param fieldCount field count. 0 if the tuple size already accounts for 
fields offsets size.
+     * @return the number of frames needed to accommodate the tuple.
+     */
+    int framesNeeded(int tupleSize, int fieldCount);
+
     /**
      * Cancels the effect of last insertTuple() operation. i.e. undoes the 
last insertTuple() operation.
      */
diff --git 
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/PreferToSpillFullyOccupiedFramePolicy.java
 
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/PreferToSpillFullyOccupiedFramePolicy.java
index 12985c0bec..613a396831 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/PreferToSpillFullyOccupiedFramePolicy.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/PreferToSpillFullyOccupiedFramePolicy.java
@@ -98,4 +98,14 @@ public class PreferToSpillFullyOccupiedFramePolicy {
             }
         };
     }
+
+    public String partitionsStatus() {
+        StringBuilder sb = new StringBuilder();
+        int numPartitions = bufferManager.getNumPartitions();
+        for (int p = 0; p < numPartitions; p++) {
+            
sb.append("p:").append(p).append(",#t:").append(bufferManager.getNumTuples(p)).append(",s:")
+                    
.append(spilledStatus.get(p)).append(",s:").append(bufferManager.getPhysicalSize(p)).append('\n');
+        }
+        return sb.toString();
+    }
 }
diff --git 
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/VPartitionTupleBufferManager.java
 
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/VPartitionTupleBufferManager.java
index d3d06cb7b7..722512a3f6 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/VPartitionTupleBufferManager.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/VPartitionTupleBufferManager.java
@@ -132,15 +132,15 @@ public class VPartitionTupleBufferManager implements 
IPartitionedTupleBufferMana
     @Override
     public boolean insertTuple(int partition, byte[] byteArray, int[] 
fieldEndOffsets, int start, int size,
             TuplePointer pointer) throws HyracksDataException {
-        int actualSize = calculateActualSize(fieldEndOffsets, size);
-        int fid = getLastBufferOrCreateNewIfNotExist(partition, actualSize);
+        int fieldCount = fieldEndOffsets == null ? 0 : fieldEndOffsets.length;
+        int fid = getLastBufferOrCreateNewIfNotExist(partition, size, 
fieldCount);
         if (fid < 0) {
             return false;
         }
         partitionArray[partition].getFrame(fid, tempInfo);
         int tid = appendTupleToBuffer(tempInfo, fieldEndOffsets, byteArray, 
start, size);
         if (tid < 0) {
-            fid = createNewBuffer(partition, actualSize);
+            fid = createNewBuffer(partition, size, fieldCount);
             if (fid < 0) {
                 return false;
             }
@@ -170,6 +170,12 @@ public class VPartitionTupleBufferManager implements 
IPartitionedTupleBufferMana
         numTuples[partition]--;
     }
 
+    @Override
+    public int framesNeeded(int tupleSize, int fieldCount) {
+        int minFrameSize = framePool.getMinFrameSize();
+        return FrameHelper.calcAlignedFrameSizeToStore(fieldCount, tupleSize, 
minFrameSize) / minFrameSize;
+    }
+
     public static int calculateActualSize(int[] fieldEndOffsets, int size) {
         if (fieldEndOffsets != null) {
             return FrameHelper.calcRequiredSpace(fieldEndOffsets.length, size);
@@ -189,8 +195,8 @@ public class VPartitionTupleBufferManager implements 
IPartitionedTupleBufferMana
         return externalFrameId / getNumPartitions();
     }
 
-    private int createNewBuffer(int partition, int size) throws 
HyracksDataException {
-        ByteBuffer newBuffer = requestNewBufferFromPool(size, partition);
+    private int createNewBuffer(int partition, int tupleSize, int fieldCount) 
throws HyracksDataException {
+        ByteBuffer newBuffer = requestNewBufferFromPool(tupleSize, partition, 
fieldCount);
         if (newBuffer == null) {
             return -1;
         }
@@ -199,9 +205,11 @@ public class VPartitionTupleBufferManager implements 
IPartitionedTupleBufferMana
         return partitionArray[partition].insertFrame(newBuffer);
     }
 
-    private ByteBuffer requestNewBufferFromPool(int recordSize, int partition) 
throws HyracksDataException {
-        int frameSize = FrameHelper.calcAlignedFrameSizeToStore(0, recordSize, 
framePool.getMinFrameSize());
-        if ((double) frameSize / (double) framePool.getMinFrameSize() + 
getPhysicalSize(partition) > constrain
+    private ByteBuffer requestNewBufferFromPool(int recordSize, int partition, 
int fieldCount)
+            throws HyracksDataException {
+        int minFrameSize = framePool.getMinFrameSize();
+        int frameSize = FrameHelper.calcAlignedFrameSizeToStore(fieldCount, 
recordSize, minFrameSize);
+        if ((double) frameSize / (double) minFrameSize + 
getPhysicalSize(partition) / (double) minFrameSize > constrain
                 .frameLimit(partition)) {
             return null;
         }
@@ -238,10 +246,11 @@ public class VPartitionTupleBufferManager implements 
IPartitionedTupleBufferMana
         }
     }
 
-    private int getLastBufferOrCreateNewIfNotExist(int partition, int 
actualSize) throws HyracksDataException {
+    private int getLastBufferOrCreateNewIfNotExist(int partition, int 
tupleSize, int fieldCount)
+            throws HyracksDataException {
         if (partitionArray[partition] == null || 
partitionArray[partition].getNumFrames() == 0) {
             partitionArray[partition] = new FrameBufferManager();
-            return createNewBuffer(partition, actualSize);
+            return createNewBuffer(partition, tupleSize, fieldCount);
         }
         return getLastBuffer(partition);
     }
diff --git 
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoin.java
 
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoin.java
index 7a9bb25ff3..5f80165fed 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoin.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoin.java
@@ -49,6 +49,8 @@ import 
org.apache.hyracks.dataflow.std.buffermanager.VPartitionTupleBufferManage
 import org.apache.hyracks.dataflow.std.structures.ISerializableTable;
 import org.apache.hyracks.dataflow.std.structures.SerializableHashTable;
 import org.apache.hyracks.dataflow.std.structures.TuplePointer;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
 
 /**
  * This class mainly applies one level of HHJ on a pair of
@@ -56,6 +58,7 @@ import 
org.apache.hyracks.dataflow.std.structures.TuplePointer;
  */
 public class OptimizedHybridHashJoin {
 
+    private static final Logger LOGGER = LogManager.getLogger();
     // Used for special probe BigObject which can not be held into the Join 
memory
     private FrameTupleAppender bigFrameAppender;
 
@@ -152,19 +155,23 @@ public class OptimizedHybridHashJoin {
     private void processTupleBuildPhase(int tid, int pid) throws 
HyracksDataException {
         // insertTuple prevents the tuple to acquire a number of frames that 
is > the frame limit
         while (!bufferManager.insertTuple(pid, accessorBuild, tid, tempPtr)) {
-            int recordSize = 
VPartitionTupleBufferManager.calculateActualSize(null, 
accessorBuild.getTupleLength(tid));
-            double numFrames = (double) recordSize / (double) 
jobletCtx.getInitialFrameSize();
+            int numFrames = 
bufferManager.framesNeeded(accessorBuild.getTupleLength(tid), 0);
             int victimPartition;
-            if (numFrames > bufferManager.getConstrain().frameLimit(pid)
-                    || (victimPartition = 
spillPolicy.selectVictimPartition(pid)) < 0) {
+            int partitionFrameLimit = 
bufferManager.getConstrain().frameLimit(pid);
+            if (numFrames > partitionFrameLimit || (victimPartition = 
spillPolicy.selectVictimPartition(pid)) < 0) {
                 // insert request can never be satisfied
-                if (numFrames > memSizeInFrames || recordSize < 
jobletCtx.getInitialFrameSize()) {
-                    // the tuple is greater than the memory budget or although 
the record is small we could not find
-                    // a frame for it (possibly due to a bug)
+                if (numFrames > memSizeInFrames) {
+                    // the tuple is greater than the memory budget
+                    logTupleInsertionFailure(tid, pid, numFrames, 
partitionFrameLimit);
                     throw 
HyracksDataException.create(ErrorCode.INSUFFICIENT_MEMORY);
                 }
+                if (numFrames <= 1) {
+                    // this shouldn't happen. whether the partition is spilled 
or not, it should be able to get 1 frame
+                    logTupleInsertionFailure(tid, pid, numFrames, 
partitionFrameLimit);
+                    throw new IllegalStateException("can't insert tuple in 
join memory");
+                }
                 // Record is large but insertion failed either 1) we could not 
satisfy the request because of the
-                // frame limit or 2) we could not find a victim anymore 
(exhaused all victims) and the partition is
+                // frame limit or 2) we could not find a victim anymore 
(exhausted all victims) and the partition is
                 // memory-resident with no frame.
                 flushBigObjectToDisk(pid, accessorBuild, tid, buildRFWriters, 
buildRelName);
                 spilledStatus.set(pid);
@@ -613,4 +620,14 @@ public class OptimizedHybridHashJoin {
         }
         this.isReversed = reversed;
     }
+
+    private void logTupleInsertionFailure(int tid, int pid, int numFrames, int 
partitionFrameLimit) {
+        int recordSize = 
VPartitionTupleBufferManager.calculateActualSize(null, 
accessorBuild.getTupleLength(tid));
+        String details = String.format(
+                "partition %s, tuple size %s, needed # frames %s, partition 
frame limit %s, join "
+                        + "memory in frames %s, initial frame size %s",
+                pid, recordSize, numFrames, partitionFrameLimit, 
memSizeInFrames, jobletCtx.getInitialFrameSize());
+        LOGGER.debug("can't insert tuple in join memory. {}", details);
+        LOGGER.debug("partitions status:\n{}", spillPolicy.partitionsStatus());
+    }
 }

Reply via email to