This is an automated email from the ASF dual-hosted git repository. imaxon pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/asterixdb.git
commit a0d1fb7acb588f4eab940501deee738789e9af50 Author: Ian Maxon <[email protected]> AuthorDate: Sun May 2 11:23:41 2021 -0700 [ASTERIXDB-2896] Increase UDF argument buffer size -user model changes: no -storage format changes: no -interface changes: yes Details: Bump buffer sizes in Python IPC to 1MB for individual arguments, and in the case of batching, match the buffer size of the Hyracks IPC layer for deserialization. Change-Id: If847ac3b09406d1e9e6a976a7e0193b6e81bcc8b Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/11243 Reviewed-by: Ian Maxon <[email protected]> Reviewed-by: Michael Blow <[email protected]> Integration-Tests: Jenkins <[email protected]> Tested-by: Jenkins <[email protected]> Contrib: Ian Maxon <[email protected]> --- .../ExternalScalarPythonFunctionEvaluator.java | 9 ++++++--- .../operators/ExternalAssignBatchRuntimeFactory.java | 19 ++++++++++++++++--- .../asterix/external/util/ExternalDataUtils.java | 19 +++++++++++++++++++ 3 files changed, 41 insertions(+), 6 deletions(-) diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/ExternalScalarPythonFunctionEvaluator.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/ExternalScalarPythonFunctionEvaluator.java index eb87399..7c860a2 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/ExternalScalarPythonFunctionEvaluator.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/ExternalScalarPythonFunctionEvaluator.java @@ -29,6 +29,7 @@ import java.nio.ByteBuffer; import org.apache.asterix.common.exceptions.AsterixException; import org.apache.asterix.common.exceptions.ErrorCode; import org.apache.asterix.external.library.msgpack.MessageUnpackerToADM; +import org.apache.asterix.external.util.ExternalDataUtils; import org.apache.asterix.om.functions.IExternalFunctionInfo; import org.apache.asterix.om.types.ATypeTag; import org.apache.asterix.om.types.IAType; @@ -77,9 +78,11 @@ class ExternalScalarPythonFunctionEvaluator extends ExternalScalarFunctionEvalua for (int i = 0; i < argValues.length; i++) { argValues[i] = VoidPointable.FACTORY.createPointable(); } - //TODO: these should be dynamic - this.argHolder = ByteBuffer.wrap(new byte[Short.MAX_VALUE * 2]); - this.outputWrapper = ByteBuffer.wrap(new byte[Short.MAX_VALUE * 2]); + //TODO: these should be dynamic. this static size picking is a temporary bodge until this works like + // v-size frames do or these construction buffers are removed entirely + int maxArgSz = ExternalDataUtils.getArgBufferSize(); + this.argHolder = ByteBuffer.wrap(new byte[maxArgSz]); + this.outputWrapper = ByteBuffer.wrap(new byte[maxArgSz]); this.evaluatorContext = ctx; this.sourceLocation = sourceLoc; this.unpackerInput = new ArrayBufferInput(new byte[0]); diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalAssignBatchRuntimeFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalAssignBatchRuntimeFactory.java index 39e480a..593bac6 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalAssignBatchRuntimeFactory.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalAssignBatchRuntimeFactory.java @@ -36,6 +36,7 @@ import org.apache.asterix.common.exceptions.RuntimeDataException; import org.apache.asterix.external.library.PythonLibraryEvaluator; import org.apache.asterix.external.library.PythonLibraryEvaluatorFactory; import org.apache.asterix.external.library.msgpack.MessageUnpackerToADM; +import org.apache.asterix.external.util.ExternalDataUtils; import org.apache.asterix.om.functions.IExternalFunctionDescriptor; import org.apache.asterix.om.types.ATypeTag; import org.apache.hyracks.algebricks.common.utils.Pair; @@ -61,6 +62,8 @@ public final class ExternalAssignBatchRuntimeFactory extends AbstractOneInputOne private final IExternalFunctionDescriptor[] fnDescs; private final int[][] fnArgColumns; + private int rpcBufferSize; + public ExternalAssignBatchRuntimeFactory(int[] outColumns, IExternalFunctionDescriptor[] fnDescs, int[][] fnArgColumns, int[] projectionList) { super(projectionList); @@ -73,6 +76,9 @@ public final class ExternalAssignBatchRuntimeFactory extends AbstractOneInputOne public AbstractOneInputOneOutputPushRuntime createOneOutputPushRuntime(IHyracksTaskContext ctx) { final int[] projectionToOutColumns = new int[projectionList.length]; + //this is a temporary bodge. these buffers need to work like vsize frames, or be absent entirely + int maxArgSz = ExternalDataUtils.getArgBufferSize(); + rpcBufferSize = ExternalDataUtils.roundUpToNearestFrameSize(maxArgSz, ctx.getInitialFrameSize()); for (int j = 0; j < projectionList.length; j++) { projectionToOutColumns[j] = Arrays.binarySearch(outColumns, projectionList[j]); } @@ -110,14 +116,14 @@ public final class ExternalAssignBatchRuntimeFactory extends AbstractOneInputOne } argHolders = new ArrayList<>(fnArgColumns.length); for (int i = 0; i < fnArgColumns.length; i++) { - argHolders.add(ctx.allocateFrame()); + argHolders.add(ctx.allocateFrame(rpcBufferSize)); } outputWrapper = ctx.allocateFrame(); nullCalls = new ATypeTag[argHolders.size()][0]; numCalls = new int[fnArgColumns.length]; batchResults = new ArrayList<>(argHolders.size()); for (int i = 0; i < argHolders.size(); i++) { - batchResults.add(new Pair<>(ctx.allocateFrame(), new Counter(-1))); + batchResults.add(new Pair<>(ctx.allocateFrame(rpcBufferSize), new Counter(-1))); } unpackerInput = new ArrayBufferInput(new byte[0]); unpacker = MessagePack.newDefaultUnpacker(unpackerInput); @@ -230,7 +236,8 @@ public final class ExternalAssignBatchRuntimeFactory extends AbstractOneInputOne if (columnResult != null) { Pair<ByteBuffer, Counter> resultholder = batchResults.get(argHolderIdx); if (resultholder.getFirst().capacity() < columnResult.capacity()) { - resultholder.setFirst(ctx.allocateFrame(columnResult.capacity())); + resultholder.setFirst(ctx.allocateFrame(ExternalDataUtils.roundUpToNearestFrameSize( + columnResult.capacity(), ctx.getInitialFrameSize()))); } ByteBuffer resultBuf = resultholder.getFirst(); resultBuf.clear(); @@ -262,6 +269,12 @@ public final class ExternalAssignBatchRuntimeFactory extends AbstractOneInputOne outputWrapper.clear(); outputWrapper.position(0); Pair<ByteBuffer, Counter> result = batchResults.get(k); + if (result.getFirst() != null) { + if (result.getFirst().capacity() > outputWrapper.capacity()) { + outputWrapper = ctx.allocateFrame(ExternalDataUtils.roundUpToNearestFrameSize( + outputWrapper.capacity(), ctx.getInitialFrameSize())); + } + } int start = outputWrapper.arrayOffset(); ATypeTag functionCalled = nullCalls[k][i]; if (functionCalled == ATypeTag.TYPE) { diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java index f5c62a5..8e94263 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java @@ -80,6 +80,7 @@ import org.apache.hyracks.dataflow.common.data.parsers.IValueParserFactory; import org.apache.hyracks.dataflow.common.data.parsers.IntegerParserFactory; import org.apache.hyracks.dataflow.common.data.parsers.LongParserFactory; import org.apache.hyracks.dataflow.common.data.parsers.UTF8StringParserFactory; +import org.apache.hyracks.util.StorageUtil; import com.azure.storage.blob.BlobContainerClient; import com.azure.storage.blob.BlobServiceClient; @@ -106,6 +107,8 @@ import software.amazon.awssdk.services.s3.model.S3Response; public class ExternalDataUtils { private static final Map<ATypeTag, IValueParserFactory> valueParserFactoryMap = new EnumMap<>(ATypeTag.class); + private static final int DEFAULT_MAX_ARGUMENT_SZ = 1024 * 1024; + private static final int HEADER_FUDGE = 64; static { valueParserFactoryMap.put(ATypeTag.INTEGER, IntegerParserFactory.INSTANCE); @@ -997,4 +1000,20 @@ public class ExternalDataUtils { } } } + + public static int roundUpToNearestFrameSize(int size, int framesize) { + return ((size / framesize) + 1) * framesize; + } + + public static int getArgBufferSize() { + int maxArgSz = DEFAULT_MAX_ARGUMENT_SZ + HEADER_FUDGE; + String userArgSz = System.getProperty("udf.buf.size"); + if (userArgSz != null) { + long parsedSize = StorageUtil.getByteValue(userArgSz) + HEADER_FUDGE; + if (parsedSize < Integer.MAX_VALUE && parsedSize > 0) { + maxArgSz = (int) parsedSize; + } + } + return maxArgSz; + } }
