This is an automated email from the ASF dual-hosted git repository. mblow pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/asterixdb.git
commit 6c80e8fc009b37db0a8cb2993bf2c93ac56f480c Author: Ali Alsuliman <[email protected]> AuthorDate: Wed Feb 8 20:20:57 2023 -0800 [NO ISSUE][RUN] Incorrect accounting of tuple size during build phase of join - user model changes: no - storage format changes: no - interface changes: no Details: Change-Id: Idd732642424e38892c4f48c5ab77cdb3c747be22 Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/17367 Integration-Tests: Jenkins <[email protected]> Tested-by: Jenkins <[email protected]> Reviewed-by: Murtadha Hubail <[email protected]> --- .../IPartitionedTupleBufferManager.java | 9 ++++++ .../PreferToSpillFullyOccupiedFramePolicy.java | 10 +++++++ .../VPartitionTupleBufferManager.java | 29 ++++++++++++------- .../dataflow/std/join/OptimizedHybridHashJoin.java | 33 ++++++++++++++++------ 4 files changed, 63 insertions(+), 18 deletions(-) diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/IPartitionedTupleBufferManager.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/IPartitionedTupleBufferManager.java index 8051305f94..3645855f5e 100644 --- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/IPartitionedTupleBufferManager.java +++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/IPartitionedTupleBufferManager.java @@ -74,6 +74,15 @@ public interface IPartitionedTupleBufferManager { boolean insertTuple(int partition, IFrameTupleAccessor tupleAccessor, int tupleId, TuplePointer pointer) throws HyracksDataException; + /** + * Returns the number of frames needed to accommodate the tuple. + * + * @param tupleSize tuple size + * @param fieldCount field count. 0 if the tuple size already accounts for fields offsets size. + * @return the number of frames needed to accommodate the tuple. + */ + int framesNeeded(int tupleSize, int fieldCount); + /** * Cancels the effect of last insertTuple() operation. i.e. undoes the last insertTuple() operation. */ diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/PreferToSpillFullyOccupiedFramePolicy.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/PreferToSpillFullyOccupiedFramePolicy.java index 12985c0bec..613a396831 100644 --- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/PreferToSpillFullyOccupiedFramePolicy.java +++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/PreferToSpillFullyOccupiedFramePolicy.java @@ -98,4 +98,14 @@ public class PreferToSpillFullyOccupiedFramePolicy { } }; } + + public String partitionsStatus() { + StringBuilder sb = new StringBuilder(); + int numPartitions = bufferManager.getNumPartitions(); + for (int p = 0; p < numPartitions; p++) { + sb.append("p:").append(p).append(",#t:").append(bufferManager.getNumTuples(p)).append(",s:") + .append(spilledStatus.get(p)).append(",s:").append(bufferManager.getPhysicalSize(p)).append('\n'); + } + return sb.toString(); + } } diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/VPartitionTupleBufferManager.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/VPartitionTupleBufferManager.java index d3d06cb7b7..722512a3f6 100644 --- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/VPartitionTupleBufferManager.java +++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/VPartitionTupleBufferManager.java @@ -132,15 +132,15 @@ public class VPartitionTupleBufferManager implements IPartitionedTupleBufferMana @Override public boolean insertTuple(int partition, byte[] byteArray, int[] fieldEndOffsets, int start, int size, TuplePointer pointer) throws HyracksDataException { - int actualSize = calculateActualSize(fieldEndOffsets, size); - int fid = getLastBufferOrCreateNewIfNotExist(partition, actualSize); + int fieldCount = fieldEndOffsets == null ? 0 : fieldEndOffsets.length; + int fid = getLastBufferOrCreateNewIfNotExist(partition, size, fieldCount); if (fid < 0) { return false; } partitionArray[partition].getFrame(fid, tempInfo); int tid = appendTupleToBuffer(tempInfo, fieldEndOffsets, byteArray, start, size); if (tid < 0) { - fid = createNewBuffer(partition, actualSize); + fid = createNewBuffer(partition, size, fieldCount); if (fid < 0) { return false; } @@ -170,6 +170,12 @@ public class VPartitionTupleBufferManager implements IPartitionedTupleBufferMana numTuples[partition]--; } + @Override + public int framesNeeded(int tupleSize, int fieldCount) { + int minFrameSize = framePool.getMinFrameSize(); + return FrameHelper.calcAlignedFrameSizeToStore(fieldCount, tupleSize, minFrameSize) / minFrameSize; + } + public static int calculateActualSize(int[] fieldEndOffsets, int size) { if (fieldEndOffsets != null) { return FrameHelper.calcRequiredSpace(fieldEndOffsets.length, size); @@ -189,8 +195,8 @@ public class VPartitionTupleBufferManager implements IPartitionedTupleBufferMana return externalFrameId / getNumPartitions(); } - private int createNewBuffer(int partition, int size) throws HyracksDataException { - ByteBuffer newBuffer = requestNewBufferFromPool(size, partition); + private int createNewBuffer(int partition, int tupleSize, int fieldCount) throws HyracksDataException { + ByteBuffer newBuffer = requestNewBufferFromPool(tupleSize, partition, fieldCount); if (newBuffer == null) { return -1; } @@ -199,9 +205,11 @@ public class VPartitionTupleBufferManager implements IPartitionedTupleBufferMana return partitionArray[partition].insertFrame(newBuffer); } - private ByteBuffer requestNewBufferFromPool(int recordSize, int partition) throws HyracksDataException { - int frameSize = FrameHelper.calcAlignedFrameSizeToStore(0, recordSize, framePool.getMinFrameSize()); - if ((double) frameSize / (double) framePool.getMinFrameSize() + getPhysicalSize(partition) > constrain + private ByteBuffer requestNewBufferFromPool(int recordSize, int partition, int fieldCount) + throws HyracksDataException { + int minFrameSize = framePool.getMinFrameSize(); + int frameSize = FrameHelper.calcAlignedFrameSizeToStore(fieldCount, recordSize, minFrameSize); + if ((double) frameSize / (double) minFrameSize + getPhysicalSize(partition) / (double) minFrameSize > constrain .frameLimit(partition)) { return null; } @@ -238,10 +246,11 @@ public class VPartitionTupleBufferManager implements IPartitionedTupleBufferMana } } - private int getLastBufferOrCreateNewIfNotExist(int partition, int actualSize) throws HyracksDataException { + private int getLastBufferOrCreateNewIfNotExist(int partition, int tupleSize, int fieldCount) + throws HyracksDataException { if (partitionArray[partition] == null || partitionArray[partition].getNumFrames() == 0) { partitionArray[partition] = new FrameBufferManager(); - return createNewBuffer(partition, actualSize); + return createNewBuffer(partition, tupleSize, fieldCount); } return getLastBuffer(partition); } diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoin.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoin.java index 7a9bb25ff3..5f80165fed 100644 --- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoin.java +++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoin.java @@ -49,6 +49,8 @@ import org.apache.hyracks.dataflow.std.buffermanager.VPartitionTupleBufferManage import org.apache.hyracks.dataflow.std.structures.ISerializableTable; import org.apache.hyracks.dataflow.std.structures.SerializableHashTable; import org.apache.hyracks.dataflow.std.structures.TuplePointer; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; /** * This class mainly applies one level of HHJ on a pair of @@ -56,6 +58,7 @@ import org.apache.hyracks.dataflow.std.structures.TuplePointer; */ public class OptimizedHybridHashJoin { + private static final Logger LOGGER = LogManager.getLogger(); // Used for special probe BigObject which can not be held into the Join memory private FrameTupleAppender bigFrameAppender; @@ -152,19 +155,23 @@ public class OptimizedHybridHashJoin { private void processTupleBuildPhase(int tid, int pid) throws HyracksDataException { // insertTuple prevents the tuple to acquire a number of frames that is > the frame limit while (!bufferManager.insertTuple(pid, accessorBuild, tid, tempPtr)) { - int recordSize = VPartitionTupleBufferManager.calculateActualSize(null, accessorBuild.getTupleLength(tid)); - double numFrames = (double) recordSize / (double) jobletCtx.getInitialFrameSize(); + int numFrames = bufferManager.framesNeeded(accessorBuild.getTupleLength(tid), 0); int victimPartition; - if (numFrames > bufferManager.getConstrain().frameLimit(pid) - || (victimPartition = spillPolicy.selectVictimPartition(pid)) < 0) { + int partitionFrameLimit = bufferManager.getConstrain().frameLimit(pid); + if (numFrames > partitionFrameLimit || (victimPartition = spillPolicy.selectVictimPartition(pid)) < 0) { // insert request can never be satisfied - if (numFrames > memSizeInFrames || recordSize < jobletCtx.getInitialFrameSize()) { - // the tuple is greater than the memory budget or although the record is small we could not find - // a frame for it (possibly due to a bug) + if (numFrames > memSizeInFrames) { + // the tuple is greater than the memory budget + logTupleInsertionFailure(tid, pid, numFrames, partitionFrameLimit); throw HyracksDataException.create(ErrorCode.INSUFFICIENT_MEMORY); } + if (numFrames <= 1) { + // this shouldn't happen. whether the partition is spilled or not, it should be able to get 1 frame + logTupleInsertionFailure(tid, pid, numFrames, partitionFrameLimit); + throw new IllegalStateException("can't insert tuple in join memory"); + } // Record is large but insertion failed either 1) we could not satisfy the request because of the - // frame limit or 2) we could not find a victim anymore (exhaused all victims) and the partition is + // frame limit or 2) we could not find a victim anymore (exhausted all victims) and the partition is // memory-resident with no frame. flushBigObjectToDisk(pid, accessorBuild, tid, buildRFWriters, buildRelName); spilledStatus.set(pid); @@ -613,4 +620,14 @@ public class OptimizedHybridHashJoin { } this.isReversed = reversed; } + + private void logTupleInsertionFailure(int tid, int pid, int numFrames, int partitionFrameLimit) { + int recordSize = VPartitionTupleBufferManager.calculateActualSize(null, accessorBuild.getTupleLength(tid)); + String details = String.format( + "partition %s, tuple size %s, needed # frames %s, partition frame limit %s, join " + + "memory in frames %s, initial frame size %s", + pid, recordSize, numFrames, partitionFrameLimit, memSizeInFrames, jobletCtx.getInitialFrameSize()); + LOGGER.debug("can't insert tuple in join memory. {}", details); + LOGGER.debug("partitions status:\n{}", spillPolicy.partitionsStatus()); + } }
