This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-4.0 by this push:
new 6f5ef0a4da5 [fix](mc) fix memory leak and optimize large data write
for MaxCompute connector (#61245) (#64449)
6f5ef0a4da5 is described below
commit 6f5ef0a4da5579ae25dfecc9bf556ffefb00f61e
Author: daidai <[email protected]>
AuthorDate: Fri Jun 12 22:41:24 2026 +0800
[fix](mc) fix memory leak and optimize large data write for MaxCompute
connector (#61245) (#64449)
pick #61245
Fix:
- Fix potential memory leak in MaxComputeJniScanner by closing
currentSplitReader in close().
Optimization:
- mc.max_field_size_bytes: max field size in bytes for write session
(default: 8MB)
---
.../doris/maxcompute/MaxComputeJniScanner.java | 164 ++++++++++++++++++---
.../doris/common/maxcompute/MCProperties.java | 3 +
.../maxcompute/MaxComputeExternalCatalog.java | 8 +
3 files changed, 158 insertions(+), 17 deletions(-)
diff --git
a/fe/be-java-extensions/max-compute-scanner/src/main/java/org/apache/doris/maxcompute/MaxComputeJniScanner.java
b/fe/be-java-extensions/max-compute-scanner/src/main/java/org/apache/doris/maxcompute/MaxComputeJniScanner.java
index 0b196fcb9cd..336991f3802 100644
---
a/fe/be-java-extensions/max-compute-scanner/src/main/java/org/apache/doris/maxcompute/MaxComputeJniScanner.java
+++
b/fe/be-java-extensions/max-compute-scanner/src/main/java/org/apache/doris/maxcompute/MaxComputeJniScanner.java
@@ -33,6 +33,7 @@ import com.aliyun.odps.table.read.split.InputSplit;
import com.aliyun.odps.table.read.split.impl.IndexedInputSplit;
import com.aliyun.odps.table.read.split.impl.RowRangeInputSplit;
import com.google.common.base.Strings;
+import org.apache.arrow.vector.BaseVariableWidthVector;
import org.apache.arrow.vector.FieldVector;
import org.apache.arrow.vector.VectorSchemaRoot;
import org.apache.log4j.Logger;
@@ -60,6 +61,13 @@ public class MaxComputeJniScanner extends JniScanner {
private static final Logger LOG =
Logger.getLogger(MaxComputeJniScanner.class);
+ // 256MB byte budget per scanner batch — limits the C++ Block size at the
source.
+ // With large rows (e.g. 585KB/row STRING), batch_size=4096 would create
~2.4GB Blocks.
+ // The pipeline's AsyncResultWriter queues up to 3 Blocks per instance,
and with
+ // parallel_pipeline_task_num instances, total queue memory = instances *
3 * block_size.
+ // 256MB keeps queue memory manageable: 5 instances * 3 * 256MB = 3.8GB.
+ private static final long MAX_BATCH_BYTES = 256 * 1024 * 1024L;
+
private static final String ACCESS_KEY = "access_key";
private static final String SECRET_KEY = "secret_key";
private static final String ENDPOINT = "endpoint";
@@ -86,10 +94,12 @@ public class MaxComputeJniScanner extends JniScanner {
private TableBatchReadSession scan;
public String sessionId;
- private String project; //final ???
+ private String project;
private String table;
private SplitReader<VectorSchemaRoot> currentSplitReader;
+ private VectorSchemaRoot currentBatch = null;
+ private int currentBatchRowOffset = 0;
private MaxComputeColumnValue columnValue;
private Map<String, Integer> readColumnsToId;
@@ -215,8 +225,17 @@ public class MaxComputeJniScanner extends JniScanner {
@Override
public void close() throws IOException {
+ if (currentSplitReader != null) {
+ try {
+ currentSplitReader.close();
+ } catch (Exception e) {
+ LOG.warn("Failed to close MaxCompute split reader for table "
+ project + "." + table, e);
+ }
+ }
startOffset = -1;
splitSize = -1;
+ currentBatch = null;
+ currentBatchRowOffset = 0;
currentSplitReader = null;
settings = null;
scan = null;
@@ -234,45 +253,74 @@ public class MaxComputeJniScanner extends JniScanner {
return readVectors(expectedRows);
}
+ private VectorSchemaRoot getNextBatch() throws IOException {
+ try {
+ if (!currentSplitReader.hasNext()) {
+ currentSplitReader.close();
+ currentSplitReader = null;
+ return null;
+ }
+ return currentSplitReader.get();
+ } catch (Exception e) {
+ String errorMsg = "MaxComputeJniScanner readVectors get batch
fail";
+ LOG.warn(errorMsg, e);
+ throw new IOException(e.getMessage(), e);
+ }
+ }
+
private int readVectors(int expectedRows) throws IOException {
int curReadRows = 0;
+ long accumulatedBytes = 0;
while (curReadRows < expectedRows) {
- try {
- if (!currentSplitReader.hasNext()) {
- currentSplitReader.close();
- currentSplitReader = null;
+ // Stop early if accumulated variable-width bytes approach int32
limit
+ if (accumulatedBytes >= MAX_BATCH_BYTES) {
+ break;
+ }
+ if (currentBatch == null) {
+ currentBatch = getNextBatch();
+ if (currentBatch == null || currentBatch.getRowCount() == 0) {
+ currentBatch = null;
break;
}
- } catch (Exception e) {
- String errorMsg = "MaxComputeJniScanner readVectors hasNext
fail";
- LOG.warn(errorMsg, e);
- throw new IOException(e.getMessage(), e);
+ currentBatchRowOffset = 0;
}
-
try {
- VectorSchemaRoot data = currentSplitReader.get();
- if (data.getRowCount() == 0) {
+ int rowsToAppend = Math.min(expectedRows - curReadRows,
+ currentBatch.getRowCount() - currentBatchRowOffset);
+ List<FieldVector> fieldVectors =
currentBatch.getFieldVectors();
+
+ // Limit rows to avoid int32 overflow in VectorColumn's String
byte buffer
+ rowsToAppend = limitRowsByVarWidthBytes(
+ fieldVectors, currentBatchRowOffset, rowsToAppend,
+ MAX_BATCH_BYTES - accumulatedBytes);
+ if (rowsToAppend <= 0) {
break;
}
- List<FieldVector> fieldVectors = data.getFieldVectors();
- int batchRows = 0;
long startTime = System.nanoTime();
for (FieldVector column : fieldVectors) {
Integer readColumnId =
readColumnsToId.get(column.getName());
- batchRows = column.getValueCount();
if (readColumnId == null) {
continue;
}
columnValue.reset(column);
- for (int j = 0; j < batchRows; j++) {
+ for (int j = currentBatchRowOffset; j <
currentBatchRowOffset + rowsToAppend; j++) {
columnValue.setColumnIdx(j);
appendData(readColumnId, columnValue);
}
}
appendDataTime += System.nanoTime() - startTime;
- curReadRows += batchRows;
+ // Track bytes for the rows just appended
+ accumulatedBytes += estimateVarWidthBytes(
+ fieldVectors, currentBatchRowOffset, rowsToAppend);
+
+ currentBatchRowOffset += rowsToAppend;
+ curReadRows += rowsToAppend;
+ if (currentBatchRowOffset >= currentBatch.getRowCount()) {
+ currentBatch = null;
+ currentBatchRowOffset = 0;
+ }
} catch (Exception e) {
String errorMsg = String.format("MaxComputeJniScanner Fail to
read arrow data. "
+ "curReadRows = {}, expectedRows = {}", curReadRows,
expectedRows);
@@ -280,9 +328,91 @@ public class MaxComputeJniScanner extends JniScanner {
throw new RuntimeException(errorMsg, e);
}
}
+ if (LOG.isDebugEnabled() && curReadRows > 0 && curReadRows <
expectedRows) {
+ LOG.debug("readVectors: returning " + curReadRows + " rows
(limited by byte budget)"
+ + ", totalVarWidthBytes=" + accumulatedBytes
+ + ", expectedRows=" + expectedRows);
+ }
return curReadRows;
}
+ /**
+ * Limit the number of rows to append so that no single variable-width
column
+ * exceeds the remaining byte budget. This prevents int32 overflow in
+ * VectorColumn's appendIndex for String/Binary child byte arrays.
+ *
+ * Uses Arrow's offset buffer for O(1)-per-row byte size calculation —
+ * no data copying involved.
+ */
+ private int limitRowsByVarWidthBytes(List<FieldVector> fieldVectors,
+ int offset, int maxRows, long remainingBudget) {
+ if (remainingBudget <= 0) {
+ return 0;
+ }
+ int safeRows = maxRows;
+ for (FieldVector fv : fieldVectors) {
+ if (fv instanceof BaseVariableWidthVector) {
+ BaseVariableWidthVector vec = (BaseVariableWidthVector) fv;
+ // Find how many rows fit within the budget for THIS column
+ int rows = findMaxRowsWithinBudget(vec, offset, maxRows,
remainingBudget);
+ safeRows = Math.min(safeRows, rows);
+ }
+ }
+ // Always allow at least 1 row to make progress, even if it exceeds
budget
+ return Math.max(1, safeRows);
+ }
+
+ /**
+ * Binary search for the maximum number of rows starting at 'offset'
+ * whose total bytes in the variable-width vector fit within 'budget'.
+ */
+ private int findMaxRowsWithinBudget(BaseVariableWidthVector vec,
+ int offset, int maxRows, long budget) {
+ if (maxRows <= 0) {
+ return 0;
+ }
+ // Total bytes for all maxRows
+ long totalBytes = (long) vec.getOffsetBuffer().getInt((long) (offset +
maxRows) * 4)
+ - (long) vec.getOffsetBuffer().getInt((long) offset * 4);
+ if (totalBytes <= budget) {
+ return maxRows;
+ }
+ // Binary search for the cutoff point
+ int lo = 1;
+ int hi = maxRows - 1;
+ int startOff = vec.getOffsetBuffer().getInt((long) offset * 4);
+ while (lo <= hi) {
+ int mid = lo + (hi - lo) / 2;
+ long bytes = (long) vec.getOffsetBuffer().getInt((long) (offset +
mid) * 4) - startOff;
+ if (bytes <= budget) {
+ lo = mid + 1;
+ } else {
+ hi = mid - 1;
+ }
+ }
+ // 'hi' is the largest count whose bytes <= budget (could be 0)
+ return hi;
+ }
+
+ /**
+ * Estimate total variable-width bytes for the given row range across all
columns.
+ * Returns the max bytes of any single column (since each column has its
own
+ * VectorColumn child buffer and the overflow is per-column).
+ */
+ private long estimateVarWidthBytes(List<FieldVector> fieldVectors,
+ int offset, int rows) {
+ long maxColumnBytes = 0;
+ for (FieldVector fv : fieldVectors) {
+ if (fv instanceof BaseVariableWidthVector) {
+ BaseVariableWidthVector vec = (BaseVariableWidthVector) fv;
+ long bytes = (long) vec.getOffsetBuffer().getInt((long)
(offset + rows) * 4)
+ - (long) vec.getOffsetBuffer().getInt((long) offset *
4);
+ maxColumnBytes = Math.max(maxColumnBytes, bytes);
+ }
+ }
+ return maxColumnBytes;
+ }
+
private static Object deserialize(String serializedString) throws
IOException, ClassNotFoundException {
byte[] serializedBytes = Base64.getDecoder().decode(serializedString);
ByteArrayInputStream byteArrayInputStream = new
ByteArrayInputStream(serializedBytes);
diff --git
a/fe/fe-common/src/main/java/org/apache/doris/common/maxcompute/MCProperties.java
b/fe/fe-common/src/main/java/org/apache/doris/common/maxcompute/MCProperties.java
index 627f3bc03e2..9cdf67bba7f 100644
---
a/fe/fe-common/src/main/java/org/apache/doris/common/maxcompute/MCProperties.java
+++
b/fe/fe-common/src/main/java/org/apache/doris/common/maxcompute/MCProperties.java
@@ -57,6 +57,9 @@ public class MCProperties {
public static final String DEFAULT_READ_TIMEOUT = "120"; // 120s
public static final String DEFAULT_RETRY_COUNT = "4"; // 4 times
+ public static final String MAX_FIELD_SIZE = "mc.max_field_size_bytes";
+ public static final String DEFAULT_MAX_FIELD_SIZE = "8388608"; // 8 * 1024
* 1024 = 8MB
+
//withCrossPartition(true):
// Very friendly to scenarios where there are many partitions but
each partition is very small.
//withCrossPartition(false):
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/maxcompute/MaxComputeExternalCatalog.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/maxcompute/MaxComputeExternalCatalog.java
index edf339e6702..f03fce35fd7 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/maxcompute/MaxComputeExternalCatalog.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/maxcompute/MaxComputeExternalCatalog.java
@@ -67,6 +67,7 @@ public class MaxComputeExternalCatalog extends
ExternalCatalog {
private int connectTimeout;
private int readTimeout;
private int retryTimes;
+ private long maxFieldSize;
public boolean dateTimePredicatePushDown;
@@ -189,6 +190,8 @@ public class MaxComputeExternalCatalog extends
ExternalCatalog {
props.getOrDefault(MCProperties.READ_TIMEOUT,
MCProperties.DEFAULT_READ_TIMEOUT));
retryTimes = Integer.parseInt(
props.getOrDefault(MCProperties.RETRY_COUNT,
MCProperties.DEFAULT_RETRY_COUNT));
+ maxFieldSize = Long.parseLong(
+ props.getOrDefault(MCProperties.MAX_FIELD_SIZE,
MCProperties.DEFAULT_MAX_FIELD_SIZE));
RestOptions restOptions = RestOptions.newBuilder()
.withConnectTimeout(connectTimeout)
@@ -310,6 +313,11 @@ public class MaxComputeExternalCatalog extends
ExternalCatalog {
return readTimeout;
}
+ public long getMaxFieldSize() {
+ makeSureInitialized();
+ return maxFieldSize;
+ }
+
public boolean getDateTimePredicatePushDown() {
return dateTimePredicatePushDown;
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]