github-actions[bot] commented on code in PR #61245:
URL: https://github.com/apache/doris/pull/61245#discussion_r2963556229


##########
fe/be-java-extensions/max-compute-connector/src/main/java/org/apache/doris/maxcompute/MaxComputeJniWriter.java:
##########
@@ -210,37 +220,213 @@ protected void writeInternal(VectorTable inputTable) 
throws IOException {
         }
 
         try {
-            Object[][] data = inputTable.getMaterializedData();
+            // Materialize and write data in chunks to avoid Java heap OOM.
+            // getMaterializedData() copies off-heap data to Java heap as 
Object[].
+            // For large String columns, each row can consume ~1MB of heap.
+            // We limit each chunk to fit within available heap memory.
+            int materializeChunkRows = computeSafeMaterializeRows(inputTable, 
numRows, numCols);
 
-            // Get a pre-allocated VectorSchemaRoot from the batch writer
-            VectorSchemaRoot root = batchWriter.newElement();
-            root.setRowCount(numRows);
+            int rowOffset = 0;
+            while (rowOffset < numRows) {
+                int chunkRows = Math.min(materializeChunkRows, numRows - 
rowOffset);
 
-            for (int col = 0; col < numCols && col < columnTypeInfos.size(); 
col++) {
-                OdpsType odpsType = columnTypeInfos.get(col).getOdpsType();
-                fillArrowVector(root, col, odpsType, data[col], numRows);
-            }
+                // Materialize only this chunk's data from off-heap to Java 
heap
+                Object[][] chunkData = inputTable.getMaterializedData(
+                        rowOffset, rowOffset + chunkRows, 
java.util.Collections.emptyMap());
+
+                if (LOG.isDebugEnabled()) {
+                    Runtime rt = Runtime.getRuntime();
+                    LOG.debug("writeInternal: chunk rowOffset=" + rowOffset
+                            + ", chunkRows=" + chunkRows + " of " + numRows
+                            + ", heapUsed=" + ((rt.totalMemory() - 
rt.freeMemory()) / 1024 / 1024) + "MB"
+                            + ", heapMax=" + (rt.maxMemory() / 1024 / 1024) + 
"MB");
+                }
+
+                // Further split this chunk if variable-width bytes exceed 
Arrow's int32 limit.
+                // Note: chunkData is indexed from 0, so we pass 
chunkRowOffset=0.
+                int chunkOffset = 0;
+                while (chunkOffset < chunkRows) {
+                    int batchRows = Math.min(chunkRows - chunkOffset, 
maxWriteBatchRows);
+
+                    batchRows = limitWriteBatchByBytes(chunkData, numCols, 
chunkOffset, batchRows);
+
+                    VectorSchemaRoot root = batchWriter.newElement();
+                    try {
+                        root.setRowCount(batchRows);
 
-            batchWriter.write(root);
-            writtenRows += numRows;
+                        for (int col = 0; col < numCols && col < 
columnTypeInfos.size(); col++) {
+                            OdpsType odpsType = 
columnTypeInfos.get(col).getOdpsType();
+                            fillArrowVector(root, col, odpsType, 
chunkData[col],
+                                    chunkOffset, batchRows);
+                        }
+
+                        batchWriter.write(root);
+                    } finally {
+                        root.close();
+                    }
+                    writtenRows += batchRows;
+                    chunkOffset += batchRows;
+                }
+
+                rowOffset += chunkRows;
+                // chunkData goes out of scope here, eligible for GC
+            }
         } catch (Exception e) {
             String errorMsg = "Failed to write data to MaxCompute table " + 
project + "." + tableName;
             LOG.error(errorMsg, e);
             throw new IOException(errorMsg, e);
         }
     }
 
+    /**
+     * Compute a safe number of rows to materialize at once based on available
+     * Java heap memory. For tables with large String columns, materializing 
all
+     * rows at once can cause OOM (e.g., 1790 rows × 600KB = 1.5GB heap spike).
+     *
+     * Strategy: materialize a small probe batch (10 rows), measure actual heap
+     * growth, then compute safe chunk size to fit within 1/4 of free heap.
+     */
+    private int computeSafeMaterializeRows(VectorTable inputTable, int 
numRows, int numCols) {
+        if (numRows == 0) {
+            return maxWriteBatchRows;
+        }
+
+        // Check if there are variable-width columns
+        boolean hasVarWidth = false;
+        for (int col = 0; col < numCols && col < columnTypeInfos.size(); 
col++) {
+            if (isVariableWidthType(columnTypeInfos.get(col).getOdpsType())) {
+                hasVarWidth = true;
+                break;
+            }
+        }
+        if (!hasVarWidth) {
+            return maxWriteBatchRows;
+        }
+
+        // Probe: materialize a small sample to measure actual heap cost per 
row
+        int probeRows = Math.min(10, numRows);
+        Runtime rt = Runtime.getRuntime();
+        rt.gc(); // Request GC to get a cleaner measurement
+        long heapBefore = rt.totalMemory() - rt.freeMemory();
+        long heapAfter = rt.totalMemory() - rt.freeMemory();

Review Comment:
   **BUG**: This heap probe is dead code. `heapBefore` and `heapAfter` are 
measured on consecutive lines without any actual materialization in between. 
The intended approach (per the Javadoc: "materialize a small probe batch") 
never actually calls `inputTable.getMaterializedData(...)` between these two 
measurements.
   
   As a result, `probeHeapCost` will always be ~0, and the method always falls 
through to the 1MB/row fallback (`bytesPerRow = 1024 * 1024`). While the 
fallback is conservative and safe, the probe logic is misleading and 
ineffective.
   
   To fix, you need to actually materialize the probe batch between the two 
heap measurements:
   ```java
   long heapBefore = rt.totalMemory() - rt.freeMemory();
   Object[][] probeData = inputTable.getMaterializedData(
           0, probeRows, java.util.Collections.emptyMap());
   long heapAfter = rt.totalMemory() - rt.freeMemory();
   ```
   Or, if the intent is to just use the fallback, remove the dead probe code to 
avoid confusion.



##########
fe/be-java-extensions/max-compute-connector/src/main/java/org/apache/doris/maxcompute/MaxComputeJniWriter.java:
##########
@@ -210,37 +220,213 @@ protected void writeInternal(VectorTable inputTable) 
throws IOException {
         }
 
         try {
-            Object[][] data = inputTable.getMaterializedData();
+            // Materialize and write data in chunks to avoid Java heap OOM.
+            // getMaterializedData() copies off-heap data to Java heap as 
Object[].
+            // For large String columns, each row can consume ~1MB of heap.
+            // We limit each chunk to fit within available heap memory.
+            int materializeChunkRows = computeSafeMaterializeRows(inputTable, 
numRows, numCols);
 
-            // Get a pre-allocated VectorSchemaRoot from the batch writer
-            VectorSchemaRoot root = batchWriter.newElement();
-            root.setRowCount(numRows);
+            int rowOffset = 0;
+            while (rowOffset < numRows) {
+                int chunkRows = Math.min(materializeChunkRows, numRows - 
rowOffset);
 
-            for (int col = 0; col < numCols && col < columnTypeInfos.size(); 
col++) {
-                OdpsType odpsType = columnTypeInfos.get(col).getOdpsType();
-                fillArrowVector(root, col, odpsType, data[col], numRows);
-            }
+                // Materialize only this chunk's data from off-heap to Java 
heap
+                Object[][] chunkData = inputTable.getMaterializedData(
+                        rowOffset, rowOffset + chunkRows, 
java.util.Collections.emptyMap());
+
+                if (LOG.isDebugEnabled()) {
+                    Runtime rt = Runtime.getRuntime();
+                    LOG.debug("writeInternal: chunk rowOffset=" + rowOffset
+                            + ", chunkRows=" + chunkRows + " of " + numRows
+                            + ", heapUsed=" + ((rt.totalMemory() - 
rt.freeMemory()) / 1024 / 1024) + "MB"
+                            + ", heapMax=" + (rt.maxMemory() / 1024 / 1024) + 
"MB");
+                }
+
+                // Further split this chunk if variable-width bytes exceed 
Arrow's int32 limit.
+                // Note: chunkData is indexed from 0, so we pass 
chunkRowOffset=0.
+                int chunkOffset = 0;
+                while (chunkOffset < chunkRows) {
+                    int batchRows = Math.min(chunkRows - chunkOffset, 
maxWriteBatchRows);
+
+                    batchRows = limitWriteBatchByBytes(chunkData, numCols, 
chunkOffset, batchRows);
+
+                    VectorSchemaRoot root = batchWriter.newElement();
+                    try {
+                        root.setRowCount(batchRows);
 
-            batchWriter.write(root);
-            writtenRows += numRows;
+                        for (int col = 0; col < numCols && col < 
columnTypeInfos.size(); col++) {
+                            OdpsType odpsType = 
columnTypeInfos.get(col).getOdpsType();
+                            fillArrowVector(root, col, odpsType, 
chunkData[col],
+                                    chunkOffset, batchRows);
+                        }
+
+                        batchWriter.write(root);
+                    } finally {
+                        root.close();
+                    }
+                    writtenRows += batchRows;
+                    chunkOffset += batchRows;
+                }
+
+                rowOffset += chunkRows;
+                // chunkData goes out of scope here, eligible for GC
+            }
         } catch (Exception e) {
             String errorMsg = "Failed to write data to MaxCompute table " + 
project + "." + tableName;
             LOG.error(errorMsg, e);
             throw new IOException(errorMsg, e);
         }
     }
 
+    /**
+     * Compute a safe number of rows to materialize at once based on available
+     * Java heap memory. For tables with large String columns, materializing 
all
+     * rows at once can cause OOM (e.g., 1790 rows × 600KB = 1.5GB heap spike).
+     *
+     * Strategy: materialize a small probe batch (10 rows), measure actual heap
+     * growth, then compute safe chunk size to fit within 1/4 of free heap.
+     */
+    private int computeSafeMaterializeRows(VectorTable inputTable, int 
numRows, int numCols) {
+        if (numRows == 0) {
+            return maxWriteBatchRows;
+        }
+
+        // Check if there are variable-width columns
+        boolean hasVarWidth = false;
+        for (int col = 0; col < numCols && col < columnTypeInfos.size(); 
col++) {
+            if (isVariableWidthType(columnTypeInfos.get(col).getOdpsType())) {
+                hasVarWidth = true;
+                break;
+            }
+        }
+        if (!hasVarWidth) {
+            return maxWriteBatchRows;
+        }
+
+        // Probe: materialize a small sample to measure actual heap cost per 
row
+        int probeRows = Math.min(10, numRows);
+        Runtime rt = Runtime.getRuntime();
+        rt.gc(); // Request GC to get a cleaner measurement
+        long heapBefore = rt.totalMemory() - rt.freeMemory();

Review Comment:
   **Suggestion**: Calling `rt.gc()` in a hot path is generally inadvisable — 
it can cause unpredictable pauses and doesn't guarantee collection. Since the 
probe itself is dead code (see comment on heapBefore/heapAfter), this `gc()` 
call adds latency for no benefit. If the probe is fixed to actually materialize 
data, consider whether the GC pause is acceptable for the measurement accuracy 
gained.



##########
fe/be-java-extensions/max-compute-connector/src/main/java/org/apache/doris/maxcompute/MaxComputeJniScanner.java:
##########
@@ -234,55 +250,166 @@ protected int getNext() throws IOException {
         return readVectors(expectedRows);
     }
 
+    private VectorSchemaRoot getNextBatch() throws IOException {
+        try {
+            if (!currentSplitReader.hasNext()) {
+                currentSplitReader.close();
+                currentSplitReader = null;
+                return null;
+            }
+            return currentSplitReader.get();
+        } catch (Exception e) {
+            String errorMsg = "MaxComputeJniScanner readVectors get batch 
fail";
+            LOG.warn(errorMsg, e);
+            throw new IOException(e.getMessage(), e);
+        }
+    }
+
     private int readVectors(int expectedRows) throws IOException {
         int curReadRows = 0;
+        long accumulatedBytes = 0;
         while (curReadRows < expectedRows) {
-            try {
-                if (!currentSplitReader.hasNext()) {
-                    currentSplitReader.close();
-                    currentSplitReader = null;
+            // Stop early if accumulated variable-width bytes approach int32 
limit
+            if (accumulatedBytes >= MAX_BATCH_BYTES) {
+                break;
+            }
+            if (currentBatch == null) {
+                currentBatch = getNextBatch();
+                if (currentBatch == null || currentBatch.getRowCount() == 0) {
+                    currentBatch = null;
                     break;
                 }
-            } catch (Exception e) {
-                String errorMsg = "MaxComputeJniScanner readVectors hasNext 
fail";
-                LOG.warn(errorMsg, e);
-                throw new IOException(e.getMessage(), e);
+                currentBatchRowOffset = 0;
             }
-
             try {
-                VectorSchemaRoot data = currentSplitReader.get();
-                if (data.getRowCount() == 0) {
+                int rowsToAppend = Math.min(expectedRows - curReadRows,
+                        currentBatch.getRowCount() - currentBatchRowOffset);
+                List<FieldVector> fieldVectors = 
currentBatch.getFieldVectors();
+
+                // Limit rows to avoid int32 overflow in VectorColumn's String 
byte buffer
+                rowsToAppend = limitRowsByVarWidthBytes(
+                        fieldVectors, currentBatchRowOffset, rowsToAppend,
+                        MAX_BATCH_BYTES - accumulatedBytes);
+                if (rowsToAppend <= 0) {
                     break;
                 }
 
-                List<FieldVector> fieldVectors = data.getFieldVectors();
-                int batchRows = 0;
                 long startTime = System.nanoTime();
                 for (FieldVector column : fieldVectors) {
                     Integer readColumnId = 
readColumnsToId.get(column.getName());
-                    batchRows = column.getValueCount();
                     if (readColumnId == null) {
                         continue;
                     }
                     columnValue.reset(column);
-                    for (int j = 0; j < batchRows; j++) {
+                    for (int j = currentBatchRowOffset; j < 
currentBatchRowOffset + rowsToAppend; j++) {
                         columnValue.setColumnIdx(j);
                         appendData(readColumnId, columnValue);
                     }
                 }
                 appendDataTime += System.nanoTime() - startTime;
 
-                curReadRows += batchRows;
+                // Track bytes for the rows just appended
+                accumulatedBytes += estimateVarWidthBytes(
+                        fieldVectors, currentBatchRowOffset, rowsToAppend);
+
+                currentBatchRowOffset += rowsToAppend;
+                curReadRows += rowsToAppend;
+                if (currentBatchRowOffset >= currentBatch.getRowCount()) {
+                    currentBatch = null;
+                    currentBatchRowOffset = 0;
+                }
             } catch (Exception e) {
                 String errorMsg = String.format("MaxComputeJniScanner Fail to 
read arrow data. "
                         + "curReadRows = {}, expectedRows = {}", curReadRows, 
expectedRows);
                 LOG.warn(errorMsg, e);
                 throw new RuntimeException(errorMsg, e);
             }
         }
+        if (LOG.isDebugEnabled() && curReadRows > 0 && curReadRows < 
expectedRows) {
+            LOG.debug("readVectors: returning " + curReadRows + " rows 
(limited by byte budget)"
+                    + ", totalVarWidthBytes=" + accumulatedBytes
+                    + ", expectedRows=" + expectedRows);
+        }
         return curReadRows;
     }
 
+    /**
+     * Limit the number of rows to append so that no single variable-width 
column
+     * exceeds the remaining byte budget. This prevents int32 overflow in
+     * VectorColumn's appendIndex for String/Binary child byte arrays.
+     *
+     * Uses Arrow's offset buffer for O(1)-per-row byte size calculation —
+     * no data copying involved.
+     */
+    private int limitRowsByVarWidthBytes(List<FieldVector> fieldVectors,
+            int offset, int maxRows, long remainingBudget) {
+        if (remainingBudget <= 0) {
+            return 0;
+        }
+        int safeRows = maxRows;
+        for (FieldVector fv : fieldVectors) {
+            if (fv instanceof BaseVariableWidthVector) {
+                BaseVariableWidthVector vec = (BaseVariableWidthVector) fv;
+                // Find how many rows fit within the budget for THIS column
+                int rows = findMaxRowsWithinBudget(vec, offset, maxRows, 
remainingBudget);
+                safeRows = Math.min(safeRows, rows);
+            }
+        }
+        // Always allow at least 1 row to make progress, even if it exceeds 
budget
+        return Math.max(1, safeRows);
+    }
+
+    /**
+     * Binary search for the maximum number of rows starting at 'offset'
+     * whose total bytes in the variable-width vector fit within 'budget'.
+     */
+    private int findMaxRowsWithinBudget(BaseVariableWidthVector vec,
+            int offset, int maxRows, long budget) {
+        if (maxRows <= 0) {
+            return 0;
+        }
+        // Total bytes for all maxRows
+        long totalBytes = (long) vec.getOffsetBuffer().getInt((long) (offset + 
maxRows) * 4)
+                - (long) vec.getOffsetBuffer().getInt((long) offset * 4);
+        if (totalBytes <= budget) {
+            return maxRows;
+        }
+        // Binary search for the cutoff point
+        int lo = 1;
+        int hi = maxRows - 1;
+        int startOff = vec.getOffsetBuffer().getInt((long) offset * 4);
+        while (lo <= hi) {
+            int mid = lo + (hi - lo) / 2;
+            long bytes = (long) vec.getOffsetBuffer().getInt((long) (offset + 
mid) * 4) - startOff;
+            if (bytes <= budget) {
+                lo = mid + 1;
+            } else {
+                hi = mid - 1;
+            }
+        }
+        // 'hi' is the largest count whose bytes <= budget (could be 0)
+        return hi;
+    }

Review Comment:
   **Minor**: `findMaxRowsWithinBudget` can return `hi` which may be 0 (when 
even 1 row exceeds budget) — this is handled by the caller's `Math.max(1, 
safeRows)`. However, if `lo=1, hi=0` initially (when `maxRows=1` path is 
already handled above), the while loop won't execute and `hi=0` is returned 
correctly.
   
   But consider the edge case where `maxRows >= 2`, the fast-path check fails 
(totalBytes > budget), and then the binary search starts with `lo=1, 
hi=maxRows-1`. If even the first row exceeds budget, the loop will keep setting 
`hi = mid - 1` until `hi = 0`, and return 0. The caller handles this with 
`Math.max(1, ...)`, so it's functionally correct, but documenting that this 
method can return 0 (meaning "even 1 row exceeds budget") would improve clarity.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to