github-actions[bot] commented on code in PR #61245:
URL: https://github.com/apache/doris/pull/61245#discussion_r2971212458


##########
be/src/exec/sink/writer/maxcompute/vmc_table_writer.cpp:
##########
@@ -171,7 +171,40 @@ Status VMCTableWriter::write(RuntimeState* state, Block& 
block) {
         _partitions_to_writers.insert({partition_key, writer});
         it = _partitions_to_writers.find(partition_key);
     }
-    return it->second->write(output_block);
+    return _write_block_in_chunks(it->second, output_block);
+}
+
+Status VMCTableWriter::_write_block_in_chunks(const 
std::shared_ptr<VMCPartitionWriter>& writer,
+                                              Block& output_block) {
+    // Limit per-JNI data to MAX_WRITE_BLOCK_BYTES. When data source is not MC 
scanner
+    // (e.g. Doris internal table, Hive, JDBC), the upstream batch_size 
controls Block
+    // row count but not byte size. With large rows (585KB/row), a 4096-row 
Block is
+    // ~2.4GB. Splitting ensures each JNI call processes bounded data, 
limiting Arrow
+    // and SDK native memory per call.
+    static constexpr size_t MAX_WRITE_BLOCK_BYTES = 256 * 1024 * 1024; // 256MB
+
+    const size_t block_bytes = output_block.allocated_bytes();
+    const size_t rows = output_block.rows();

Review Comment:
   **[Minor] `allocated_bytes()` overestimates actual data size.**
   
   `Block::allocated_bytes()` reports total allocated capacity (including 
unused buffer space from geometric growth). This can be significantly larger 
than actual data size (`bytes()`). The effect is that `bytes_per_row` is 
inflated, causing smaller chunks and more JNI round-trips than strictly 
necessary.
   
   Consider using `output_block.bytes()` instead for a tighter estimate, or 
document that this is intentionally conservative.



##########
fe/be-java-extensions/max-compute-connector/src/main/java/org/apache/doris/maxcompute/MaxComputeJniScanner.java:
##########
@@ -234,55 +253,166 @@ protected int getNext() throws IOException {
         return readVectors(expectedRows);
     }
 
+    private VectorSchemaRoot getNextBatch() throws IOException {
+        try {
+            if (!currentSplitReader.hasNext()) {
+                currentSplitReader.close();
+                currentSplitReader = null;
+                return null;
+            }
+            return currentSplitReader.get();
+        } catch (Exception e) {
+            String errorMsg = "MaxComputeJniScanner readVectors get batch 
fail";
+            LOG.warn(errorMsg, e);
+            throw new IOException(e.getMessage(), e);
+        }
+    }
+
     private int readVectors(int expectedRows) throws IOException {
         int curReadRows = 0;
+        long accumulatedBytes = 0;
         while (curReadRows < expectedRows) {
-            try {
-                if (!currentSplitReader.hasNext()) {
-                    currentSplitReader.close();
-                    currentSplitReader = null;
+            // Stop early if accumulated variable-width bytes approach int32 
limit
+            if (accumulatedBytes >= MAX_BATCH_BYTES) {
+                break;
+            }
+            if (currentBatch == null) {
+                currentBatch = getNextBatch();
+                if (currentBatch == null || currentBatch.getRowCount() == 0) {
+                    currentBatch = null;
                     break;
                 }
-            } catch (Exception e) {
-                String errorMsg = "MaxComputeJniScanner readVectors hasNext 
fail";
-                LOG.warn(errorMsg, e);
-                throw new IOException(e.getMessage(), e);
+                currentBatchRowOffset = 0;
             }
-
             try {
-                VectorSchemaRoot data = currentSplitReader.get();
-                if (data.getRowCount() == 0) {
+                int rowsToAppend = Math.min(expectedRows - curReadRows,
+                        currentBatch.getRowCount() - currentBatchRowOffset);
+                List<FieldVector> fieldVectors = 
currentBatch.getFieldVectors();
+
+                // Limit rows to avoid int32 overflow in VectorColumn's String 
byte buffer
+                rowsToAppend = limitRowsByVarWidthBytes(
+                        fieldVectors, currentBatchRowOffset, rowsToAppend,
+                        MAX_BATCH_BYTES - accumulatedBytes);
+                if (rowsToAppend <= 0) {
                     break;
                 }
 
-                List<FieldVector> fieldVectors = data.getFieldVectors();
-                int batchRows = 0;
                 long startTime = System.nanoTime();
                 for (FieldVector column : fieldVectors) {
                     Integer readColumnId = 
readColumnsToId.get(column.getName());
-                    batchRows = column.getValueCount();
                     if (readColumnId == null) {
                         continue;
                     }
                     columnValue.reset(column);
-                    for (int j = 0; j < batchRows; j++) {
+                    for (int j = currentBatchRowOffset; j < 
currentBatchRowOffset + rowsToAppend; j++) {
                         columnValue.setColumnIdx(j);
                         appendData(readColumnId, columnValue);
                     }
                 }
                 appendDataTime += System.nanoTime() - startTime;
 
-                curReadRows += batchRows;
+                // Track bytes for the rows just appended
+                accumulatedBytes += estimateVarWidthBytes(
+                        fieldVectors, currentBatchRowOffset, rowsToAppend);
+
+                currentBatchRowOffset += rowsToAppend;
+                curReadRows += rowsToAppend;
+                if (currentBatchRowOffset >= currentBatch.getRowCount()) {
+                    currentBatch = null;
+                    currentBatchRowOffset = 0;
+                }
             } catch (Exception e) {
                 String errorMsg = String.format("MaxComputeJniScanner Fail to 
read arrow data. "
                         + "curReadRows = {}, expectedRows = {}", curReadRows, 
expectedRows);
                 LOG.warn(errorMsg, e);
                 throw new RuntimeException(errorMsg, e);
             }
         }
+        if (LOG.isDebugEnabled() && curReadRows > 0 && curReadRows < 
expectedRows) {
+            LOG.debug("readVectors: returning " + curReadRows + " rows 
(limited by byte budget)"
+                    + ", totalVarWidthBytes=" + accumulatedBytes
+                    + ", expectedRows=" + expectedRows);
+        }
         return curReadRows;
     }
 
+    /**
+     * Limit the number of rows to append so that no single variable-width 
column
+     * exceeds the remaining byte budget. This prevents int32 overflow in
+     * VectorColumn's appendIndex for String/Binary child byte arrays.
+     *
+     * Uses Arrow's offset buffer for O(1)-per-row byte size calculation —
+     * no data copying involved.
+     */
+    private int limitRowsByVarWidthBytes(List<FieldVector> fieldVectors,
+            int offset, int maxRows, long remainingBudget) {
+        if (remainingBudget <= 0) {
+            return 0;
+        }
+        int safeRows = maxRows;
+        for (FieldVector fv : fieldVectors) {
+            if (fv instanceof BaseVariableWidthVector) {
+                BaseVariableWidthVector vec = (BaseVariableWidthVector) fv;
+                // Find how many rows fit within the budget for THIS column
+                int rows = findMaxRowsWithinBudget(vec, offset, maxRows, 
remainingBudget);
+                safeRows = Math.min(safeRows, rows);
+            }
+        }
+        // Always allow at least 1 row to make progress, even if it exceeds 
budget
+        return Math.max(1, safeRows);
+    }
+
+    /**
+     * Binary search for the maximum number of rows starting at 'offset'
+     * whose total bytes in the variable-width vector fit within 'budget'.
+     */
+    private int findMaxRowsWithinBudget(BaseVariableWidthVector vec,
+            int offset, int maxRows, long budget) {
+        if (maxRows <= 0) {
+            return 0;
+        }
+        // Total bytes for all maxRows
+        long totalBytes = (long) vec.getOffsetBuffer().getInt((long) (offset + 
maxRows) * 4)
+                - (long) vec.getOffsetBuffer().getInt((long) offset * 4);
+        if (totalBytes <= budget) {
+            return maxRows;
+        }
+        // Binary search for the cutoff point
+        int lo = 1;
+        int hi = maxRows - 1;
+        int startOff = vec.getOffsetBuffer().getInt((long) offset * 4);
+        while (lo <= hi) {
+            int mid = lo + (hi - lo) / 2;
+            long bytes = (long) vec.getOffsetBuffer().getInt((long) (offset + 
mid) * 4) - startOff;
+            if (bytes <= budget) {
+                lo = mid + 1;
+            } else {
+                hi = mid - 1;
+            }
+        }
+        // 'hi' is the largest count whose bytes <= budget (could be 0)
+        return hi;
+    }
+
+    /**
+     * Estimate total variable-width bytes for the given row range across all 
columns.
+     * Returns the max bytes of any single column (since each column has its 
own
+     * VectorColumn child buffer and the overflow is per-column).
+     */
+    private long estimateVarWidthBytes(List<FieldVector> fieldVectors,
+            int offset, int rows) {
+        long maxColumnBytes = 0;
+        for (FieldVector fv : fieldVectors) {
+            if (fv instanceof BaseVariableWidthVector) {
+                BaseVariableWidthVector vec = (BaseVariableWidthVector) fv;
+                long bytes = (long) vec.getOffsetBuffer().getInt((long) 
(offset + rows) * 4)
+                        - (long) vec.getOffsetBuffer().getInt((long) offset * 
4);
+                maxColumnBytes = Math.max(maxColumnBytes, bytes);
+            }

Review Comment:
   **[Minor] `estimateVarWidthBytes` returns per-column max, not total sum.**
   
   This function returns `Math.max` across columns, but it's used as 
`accumulatedBytes += estimateVarWidthBytes(...)` in `readVectors`. If a batch 
has multiple large variable-width columns, the accumulated bytes will only 
track the largest single column's bytes, not the total across all columns. With 
5 STRING columns each at 200MB, `accumulatedBytes` would track 200MB while 
actual memory is 1GB.
   
   The Javadoc correctly explains this is intentional for per-column int32 
overflow prevention. But the `MAX_BATCH_BYTES` comment (line 67) describes it 
as a memory budget for queue control: _"256MB keeps queue memory manageable: 5 
instances * 3 * 256MB = 3.8GB"_. These two framings are inconsistent — consider 
updating the comment to clarify the budget is per-column, not aggregate.



##########
fe/be-java-extensions/max-compute-connector/src/main/java/org/apache/doris/maxcompute/MaxComputeJniWriter.java:
##########
@@ -210,37 +239,360 @@ protected void writeInternal(VectorTable inputTable) 
throws IOException {
         }
 
         try {
-            Object[][] data = inputTable.getMaterializedData();
+            // Stream data directly from off-heap VectorColumn to Arrow 
vectors.
+            // Unlike the previous getMaterializedData() approach that created
+            // Object[][] (with String objects for STRING columns causing 3x 
memory
+            // amplification), this reads bytes directly from VectorColumn and 
writes
+            // to Arrow, keeping peak heap usage per batch to O(batch_rows * 
row_size)
+            // instead of O(2 * batch_rows * row_size).
+            int rowOffset = 0;
+            while (rowOffset < numRows) {
+                int batchRows = Math.min(maxWriteBatchRows, numRows - 
rowOffset);
 
-            // Get a pre-allocated VectorSchemaRoot from the batch writer
-            VectorSchemaRoot root = batchWriter.newElement();
-            root.setRowCount(numRows);
+                // For variable-width columns, check byte budget to avoid 
Arrow int32 overflow
+                batchRows = limitWriteBatchByBytesStreaming(inputTable, 
numCols,
+                        rowOffset, batchRows);
 
-            for (int col = 0; col < numCols && col < columnTypeInfos.size(); 
col++) {
-                OdpsType odpsType = columnTypeInfos.get(col).getOdpsType();
-                fillArrowVector(root, col, odpsType, data[col], numRows);
-            }
+                VectorSchemaRoot root = batchWriter.newElement();
+                try {
+                    root.setRowCount(batchRows);
 
-            batchWriter.write(root);
-            writtenRows += numRows;
+                    for (int col = 0; col < numCols && col < 
columnTypeInfos.size(); col++) {
+                        OdpsType odpsType = 
columnTypeInfos.get(col).getOdpsType();
+                        fillArrowVectorStreaming(root, col, odpsType,
+                                inputTable.getColumn(col), rowOffset, 
batchRows);
+                    }
+
+                    batchWriter.write(root);
+                } finally {
+                    root.close();
+                }
+
+                writtenRows += batchRows;
+                segmentRows += batchRows;
+                rowOffset += batchRows;
+
+                // Segmented commit: rotate batchWriter to release SDK native 
memory
+                if (segmentRows >= ROWS_PER_SEGMENT) {
+                    rotateBatchWriter();
+                }
+            }
         } catch (Exception e) {
             String errorMsg = "Failed to write data to MaxCompute table " + 
project + "." + tableName;
             LOG.error(errorMsg, e);
             throw new IOException(errorMsg, e);
         }
     }
 
+    /**
+     * Commit current batchWriter and create a new one with a fresh blockId.
+     * This forces the MaxCompute SDK to flush and release internal native 
memory
+     * buffers that accumulate during writes. Without rotation, the SDK holds 
all
+     * serialized Arrow data in native memory until close(), causing process 
RSS
+     * to grow linearly with total data volume.
+     */
+    private void rotateBatchWriter() throws IOException {
+        try {
+            // 1. Commit current batchWriter and save its commit message
+            WriterCommitMessage msg = batchWriter.commit();
+            commitMessages.add(msg);
+            batchWriter = null;
+
+            // 2. Close current allocator to release Arrow native memory
+            allocator.close();
+            allocator = null;
+
+            // 3. Create new allocator and batchWriter with a new blockId
+            long newBlockId = nextBlockId++;
+            allocator = new RootAllocator(Long.MAX_VALUE);
+            batchWriter = writeSession.createArrowWriter(newBlockId,
+                    WriterAttemptId.of(0), writerOptions);
+
+            LOG.info("Rotated batchWriter: oldBlockId=" + blockId + ", 
newBlockId=" + newBlockId
+                    + ", totalCommitMessages=" + commitMessages.size()
+                    + ", totalWrittenRows=" + writtenRows);
+
+            blockId = newBlockId;
+            segmentRows = 0;
+        } catch (Exception e) {
+            throw new IOException("Failed to rotate batchWriter for table "
+                    + project + "." + tableName, e);
+        }
+    }
+
+
+    private boolean isVariableWidthType(OdpsType type) {
+        return type == OdpsType.STRING || type == OdpsType.VARCHAR
+                || type == OdpsType.CHAR || type == OdpsType.BINARY;
+    }
+
+    /**
+     * Limit write batch size by estimating variable-width column bytes 
directly
+     * from the off-heap VectorColumn, without materializing data to Java heap.
+     */
+    private int limitWriteBatchByBytesStreaming(VectorTable inputTable, int 
numCols,
+                                               int rowOffset, int batchRows) {
+        for (int col = 0; col < numCols && col < columnTypeInfos.size(); 
col++) {
+            OdpsType odpsType = columnTypeInfos.get(col).getOdpsType();
+            if (!isVariableWidthType(odpsType)) {
+                continue;
+            }
+            VectorColumn vc = inputTable.getColumn(col);
+            batchRows = findMaxRowsForColumnStreaming(vc, rowOffset, 
batchRows, MAX_ARROW_BATCH_BYTES);
+            if (batchRows <= 1) {
+                return Math.max(1, batchRows);
+            }
+        }
+        return batchRows;
+    }
+
+    /**
+     * Find the maximum number of rows (from rowOffset) whose total byte size
+     * fits within budget, by reading offset metadata directly from 
VectorColumn.
+     */
+    private int findMaxRowsForColumnStreaming(VectorColumn vc, int rowOffset, 
int maxRows, long budget) {
+        long totalBytes = estimateColumnBytesStreaming(vc, rowOffset, maxRows);
+        if (totalBytes <= budget) {
+            return maxRows;
+        }
+        int rows = maxRows;
+        while (rows > 1) {
+            rows = rows / 2;
+            totalBytes = estimateColumnBytesStreaming(vc, rowOffset, rows);
+            if (totalBytes <= budget) {
+                int lo = rows;
+                int hi = Math.min(rows * 2, maxRows);
+                while (lo < hi) {
+                    int mid = lo + (hi - lo + 1) / 2;
+                    if (estimateColumnBytesStreaming(vc, rowOffset, mid) <= 
budget) {
+                        lo = mid;
+                    } else {
+                        hi = mid - 1;
+                    }
+                }
+                return lo;
+            }
+        }
+        return 1;
+    }
+
+    /**
+     * Estimate total bytes for a range of rows in a VectorColumn by reading
+     * the offset array directly from off-heap memory, without creating any
+     * byte[] objects. This is O(1) per row (just offset subtraction).
+     */
+    private long estimateColumnBytesStreaming(VectorColumn vc, int rowOffset, 
int rows) {
+        long total = 0;
+        long offsetAddr = vc.offsetAddress();

Review Comment:
   **[Performance] O(rows) per-row iteration when O(1) range computation is 
possible.**
   
   This function iterates row-by-row, checking nulls and computing individual 
offsets. However, like the scanner's `findMaxRowsWithinBudget` which uses 
`vec.getOffsetBuffer().getInt((offset + N) * 4) - 
vec.getOffsetBuffer().getInt(offset * 4)` for O(1) range byte computation, you 
could compute the total range bytes in O(1) here too:
   
   ```java
   private long estimateColumnBytesStreaming(VectorColumn vc, int rowOffset, 
int rows) {
       long offsetAddr = vc.offsetAddress();
       int startOff = rowOffset == 0 ? 0 
           : OffHeap.getInt(null, offsetAddr + 4L * (rowOffset - 1));
       int endOff = OffHeap.getInt(null, offsetAddr + 4L * (rowOffset + rows - 
1));
       return endOff - startOff;
   }
   ```
   
   This would include null values' bytes in the estimate (since VectorColumn 
offsets are contiguous regardless of nulls), but that's a safe overestimate. 
The current O(rows) approach is called O(log N) times by the binary search in 
`findMaxRowsForColumnStreaming`, making the total cost O(N log N) instead of 
O(log N). For `maxWriteBatchRows=4096` this is acceptable, but the optimization 
is simple.



##########
fe/be-java-extensions/max-compute-connector/src/main/java/org/apache/doris/maxcompute/MaxComputeJniWriter.java:
##########
@@ -127,10 +153,12 @@ public MaxComputeJniWriter(int batchSize, Map<String, 
String> params) {
         this.writeSessionId = 
Objects.requireNonNull(params.get(WRITE_SESSION_ID),
                 "required property '" + WRITE_SESSION_ID + "'.");
         this.blockId = Long.parseLong(params.getOrDefault(BLOCK_ID, "0"));
+        this.nextBlockId = this.blockId + 1; // Reserve blockId for first 
writer, increment for segments
         this.partitionSpec = params.getOrDefault(PARTITION_SPEC, "");

Review Comment:
   **[Latent Bug] BlockId collision on segment rotation with multiple partition 
writers.**
   
   Setting `nextBlockId = blockId + 1` means the rotation ID space of one 
writer overlaps with the initial blockId of the next writer created in the same 
pipeline instance.
   
   **Scenario:** Instance 0 creates two partition writers (blockIds 0 and 1 via 
`_next_block_id.fetch_add(1)` in C++):
   - Writer A: `blockId=0`, `nextBlockId=1`. On first rotation → uses blockId 
**1**
   - Writer B: `blockId=1`, `nextBlockId=2`. Initial batchWriter already uses 
blockId **1**
   - **Collision**: Writer A's rotated batchWriter and Writer B's initial 
batchWriter both use blockId 1 in the same `writeSession`.
   
   Currently safe because only one partition writer is created per 
`VMCTableWriter` instance (static path uses one key, dynamic/non-partitioned 
path uses empty string key). But this is a latent bug that will surface if 
multi-partition-writer support is added.
   
   **Suggested fix:** Have the C++ side allocate a sub-range of blockIds per 
partition writer (e.g., reserve `BLOCK_ID_STRIDE / max_partitions` IDs per 
writer), or pass the C++ atomic counter's next value to Java on each rotation 
via callback. At minimum, add a comment documenting this constraint.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to