github-actions[bot] commented on code in PR #61245:
URL: https://github.com/apache/doris/pull/61245#discussion_r2971212458
##########
be/src/exec/sink/writer/maxcompute/vmc_table_writer.cpp:
##########
@@ -171,7 +171,40 @@ Status VMCTableWriter::write(RuntimeState* state, Block&
block) {
_partitions_to_writers.insert({partition_key, writer});
it = _partitions_to_writers.find(partition_key);
}
- return it->second->write(output_block);
+ return _write_block_in_chunks(it->second, output_block);
+}
+
+Status VMCTableWriter::_write_block_in_chunks(const
std::shared_ptr<VMCPartitionWriter>& writer,
+ Block& output_block) {
+ // Limit per-JNI data to MAX_WRITE_BLOCK_BYTES. When data source is not MC
scanner
+ // (e.g. Doris internal table, Hive, JDBC), the upstream batch_size
controls Block
+ // row count but not byte size. With large rows (585KB/row), a 4096-row
Block is
+ // ~2.4GB. Splitting ensures each JNI call processes bounded data,
limiting Arrow
+ // and SDK native memory per call.
+ static constexpr size_t MAX_WRITE_BLOCK_BYTES = 256 * 1024 * 1024; // 256MB
+
+ const size_t block_bytes = output_block.allocated_bytes();
+ const size_t rows = output_block.rows();
Review Comment:
**[Minor] `allocated_bytes()` overestimates actual data size.**
`Block::allocated_bytes()` reports total allocated capacity (including
unused buffer space from geometric growth). This can be significantly larger
than actual data size (`bytes()`). The effect is that `bytes_per_row` is
inflated, causing smaller chunks and more JNI round-trips than strictly
necessary.
Consider using `output_block.bytes()` instead for a tighter estimate, or
document that this is intentionally conservative.
##########
fe/be-java-extensions/max-compute-connector/src/main/java/org/apache/doris/maxcompute/MaxComputeJniScanner.java:
##########
@@ -234,55 +253,166 @@ protected int getNext() throws IOException {
return readVectors(expectedRows);
}
+ private VectorSchemaRoot getNextBatch() throws IOException {
+ try {
+ if (!currentSplitReader.hasNext()) {
+ currentSplitReader.close();
+ currentSplitReader = null;
+ return null;
+ }
+ return currentSplitReader.get();
+ } catch (Exception e) {
+ String errorMsg = "MaxComputeJniScanner readVectors get batch
fail";
+ LOG.warn(errorMsg, e);
+ throw new IOException(e.getMessage(), e);
+ }
+ }
+
private int readVectors(int expectedRows) throws IOException {
int curReadRows = 0;
+ long accumulatedBytes = 0;
while (curReadRows < expectedRows) {
- try {
- if (!currentSplitReader.hasNext()) {
- currentSplitReader.close();
- currentSplitReader = null;
+ // Stop early if accumulated variable-width bytes approach int32
limit
+ if (accumulatedBytes >= MAX_BATCH_BYTES) {
+ break;
+ }
+ if (currentBatch == null) {
+ currentBatch = getNextBatch();
+ if (currentBatch == null || currentBatch.getRowCount() == 0) {
+ currentBatch = null;
break;
}
- } catch (Exception e) {
- String errorMsg = "MaxComputeJniScanner readVectors hasNext
fail";
- LOG.warn(errorMsg, e);
- throw new IOException(e.getMessage(), e);
+ currentBatchRowOffset = 0;
}
-
try {
- VectorSchemaRoot data = currentSplitReader.get();
- if (data.getRowCount() == 0) {
+ int rowsToAppend = Math.min(expectedRows - curReadRows,
+ currentBatch.getRowCount() - currentBatchRowOffset);
+ List<FieldVector> fieldVectors =
currentBatch.getFieldVectors();
+
+ // Limit rows to avoid int32 overflow in VectorColumn's String
byte buffer
+ rowsToAppend = limitRowsByVarWidthBytes(
+ fieldVectors, currentBatchRowOffset, rowsToAppend,
+ MAX_BATCH_BYTES - accumulatedBytes);
+ if (rowsToAppend <= 0) {
break;
}
- List<FieldVector> fieldVectors = data.getFieldVectors();
- int batchRows = 0;
long startTime = System.nanoTime();
for (FieldVector column : fieldVectors) {
Integer readColumnId =
readColumnsToId.get(column.getName());
- batchRows = column.getValueCount();
if (readColumnId == null) {
continue;
}
columnValue.reset(column);
- for (int j = 0; j < batchRows; j++) {
+ for (int j = currentBatchRowOffset; j <
currentBatchRowOffset + rowsToAppend; j++) {
columnValue.setColumnIdx(j);
appendData(readColumnId, columnValue);
}
}
appendDataTime += System.nanoTime() - startTime;
- curReadRows += batchRows;
+ // Track bytes for the rows just appended
+ accumulatedBytes += estimateVarWidthBytes(
+ fieldVectors, currentBatchRowOffset, rowsToAppend);
+
+ currentBatchRowOffset += rowsToAppend;
+ curReadRows += rowsToAppend;
+ if (currentBatchRowOffset >= currentBatch.getRowCount()) {
+ currentBatch = null;
+ currentBatchRowOffset = 0;
+ }
} catch (Exception e) {
String errorMsg = String.format("MaxComputeJniScanner Fail to
read arrow data. "
+ "curReadRows = {}, expectedRows = {}", curReadRows,
expectedRows);
LOG.warn(errorMsg, e);
throw new RuntimeException(errorMsg, e);
}
}
+ if (LOG.isDebugEnabled() && curReadRows > 0 && curReadRows <
expectedRows) {
+ LOG.debug("readVectors: returning " + curReadRows + " rows
(limited by byte budget)"
+ + ", totalVarWidthBytes=" + accumulatedBytes
+ + ", expectedRows=" + expectedRows);
+ }
return curReadRows;
}
+ /**
+ * Limit the number of rows to append so that no single variable-width
column
+ * exceeds the remaining byte budget. This prevents int32 overflow in
+ * VectorColumn's appendIndex for String/Binary child byte arrays.
+ *
+ * Uses Arrow's offset buffer for O(1)-per-row byte size calculation —
+ * no data copying involved.
+ */
+ private int limitRowsByVarWidthBytes(List<FieldVector> fieldVectors,
+ int offset, int maxRows, long remainingBudget) {
+ if (remainingBudget <= 0) {
+ return 0;
+ }
+ int safeRows = maxRows;
+ for (FieldVector fv : fieldVectors) {
+ if (fv instanceof BaseVariableWidthVector) {
+ BaseVariableWidthVector vec = (BaseVariableWidthVector) fv;
+ // Find how many rows fit within the budget for THIS column
+ int rows = findMaxRowsWithinBudget(vec, offset, maxRows,
remainingBudget);
+ safeRows = Math.min(safeRows, rows);
+ }
+ }
+ // Always allow at least 1 row to make progress, even if it exceeds
budget
+ return Math.max(1, safeRows);
+ }
+
+ /**
+ * Binary search for the maximum number of rows starting at 'offset'
+ * whose total bytes in the variable-width vector fit within 'budget'.
+ */
+ private int findMaxRowsWithinBudget(BaseVariableWidthVector vec,
+ int offset, int maxRows, long budget) {
+ if (maxRows <= 0) {
+ return 0;
+ }
+ // Total bytes for all maxRows
+ long totalBytes = (long) vec.getOffsetBuffer().getInt((long) (offset +
maxRows) * 4)
+ - (long) vec.getOffsetBuffer().getInt((long) offset * 4);
+ if (totalBytes <= budget) {
+ return maxRows;
+ }
+ // Binary search for the cutoff point
+ int lo = 1;
+ int hi = maxRows - 1;
+ int startOff = vec.getOffsetBuffer().getInt((long) offset * 4);
+ while (lo <= hi) {
+ int mid = lo + (hi - lo) / 2;
+ long bytes = (long) vec.getOffsetBuffer().getInt((long) (offset +
mid) * 4) - startOff;
+ if (bytes <= budget) {
+ lo = mid + 1;
+ } else {
+ hi = mid - 1;
+ }
+ }
+ // 'hi' is the largest count whose bytes <= budget (could be 0)
+ return hi;
+ }
+
+ /**
+ * Estimate total variable-width bytes for the given row range across all
columns.
+ * Returns the max bytes of any single column (since each column has its
own
+ * VectorColumn child buffer and the overflow is per-column).
+ */
+ private long estimateVarWidthBytes(List<FieldVector> fieldVectors,
+ int offset, int rows) {
+ long maxColumnBytes = 0;
+ for (FieldVector fv : fieldVectors) {
+ if (fv instanceof BaseVariableWidthVector) {
+ BaseVariableWidthVector vec = (BaseVariableWidthVector) fv;
+ long bytes = (long) vec.getOffsetBuffer().getInt((long)
(offset + rows) * 4)
+ - (long) vec.getOffsetBuffer().getInt((long) offset *
4);
+ maxColumnBytes = Math.max(maxColumnBytes, bytes);
+ }
Review Comment:
**[Minor] `estimateVarWidthBytes` returns per-column max, not total sum.**
This function returns `Math.max` across columns, but it's used as
`accumulatedBytes += estimateVarWidthBytes(...)` in `readVectors`. If a batch
has multiple large variable-width columns, the accumulated bytes will only
track the largest single column's bytes, not the total across all columns. With
5 STRING columns each at 200MB, `accumulatedBytes` would track 200MB while
actual memory is 1GB.
The Javadoc correctly explains this is intentional for per-column int32
overflow prevention. But the `MAX_BATCH_BYTES` comment (line 67) describes it
as a memory budget for queue control: _"256MB keeps queue memory manageable: 5
instances * 3 * 256MB = 3.8GB"_. These two framings are inconsistent — consider
updating the comment to clarify the budget is per-column, not aggregate.
##########
fe/be-java-extensions/max-compute-connector/src/main/java/org/apache/doris/maxcompute/MaxComputeJniWriter.java:
##########
@@ -210,37 +239,360 @@ protected void writeInternal(VectorTable inputTable)
throws IOException {
}
try {
- Object[][] data = inputTable.getMaterializedData();
+ // Stream data directly from off-heap VectorColumn to Arrow
vectors.
+ // Unlike the previous getMaterializedData() approach that created
+ // Object[][] (with String objects for STRING columns causing 3x
memory
+ // amplification), this reads bytes directly from VectorColumn and
writes
+ // to Arrow, keeping peak heap usage per batch to O(batch_rows *
row_size)
+ // instead of O(2 * batch_rows * row_size).
+ int rowOffset = 0;
+ while (rowOffset < numRows) {
+ int batchRows = Math.min(maxWriteBatchRows, numRows -
rowOffset);
- // Get a pre-allocated VectorSchemaRoot from the batch writer
- VectorSchemaRoot root = batchWriter.newElement();
- root.setRowCount(numRows);
+ // For variable-width columns, check byte budget to avoid
Arrow int32 overflow
+ batchRows = limitWriteBatchByBytesStreaming(inputTable,
numCols,
+ rowOffset, batchRows);
- for (int col = 0; col < numCols && col < columnTypeInfos.size();
col++) {
- OdpsType odpsType = columnTypeInfos.get(col).getOdpsType();
- fillArrowVector(root, col, odpsType, data[col], numRows);
- }
+ VectorSchemaRoot root = batchWriter.newElement();
+ try {
+ root.setRowCount(batchRows);
- batchWriter.write(root);
- writtenRows += numRows;
+ for (int col = 0; col < numCols && col <
columnTypeInfos.size(); col++) {
+ OdpsType odpsType =
columnTypeInfos.get(col).getOdpsType();
+ fillArrowVectorStreaming(root, col, odpsType,
+ inputTable.getColumn(col), rowOffset,
batchRows);
+ }
+
+ batchWriter.write(root);
+ } finally {
+ root.close();
+ }
+
+ writtenRows += batchRows;
+ segmentRows += batchRows;
+ rowOffset += batchRows;
+
+ // Segmented commit: rotate batchWriter to release SDK native
memory
+ if (segmentRows >= ROWS_PER_SEGMENT) {
+ rotateBatchWriter();
+ }
+ }
} catch (Exception e) {
String errorMsg = "Failed to write data to MaxCompute table " +
project + "." + tableName;
LOG.error(errorMsg, e);
throw new IOException(errorMsg, e);
}
}
+ /**
+ * Commit current batchWriter and create a new one with a fresh blockId.
+ * This forces the MaxCompute SDK to flush and release internal native
memory
+ * buffers that accumulate during writes. Without rotation, the SDK holds
all
+ * serialized Arrow data in native memory until close(), causing process
RSS
+ * to grow linearly with total data volume.
+ */
+ private void rotateBatchWriter() throws IOException {
+ try {
+ // 1. Commit current batchWriter and save its commit message
+ WriterCommitMessage msg = batchWriter.commit();
+ commitMessages.add(msg);
+ batchWriter = null;
+
+ // 2. Close current allocator to release Arrow native memory
+ allocator.close();
+ allocator = null;
+
+ // 3. Create new allocator and batchWriter with a new blockId
+ long newBlockId = nextBlockId++;
+ allocator = new RootAllocator(Long.MAX_VALUE);
+ batchWriter = writeSession.createArrowWriter(newBlockId,
+ WriterAttemptId.of(0), writerOptions);
+
+ LOG.info("Rotated batchWriter: oldBlockId=" + blockId + ",
newBlockId=" + newBlockId
+ + ", totalCommitMessages=" + commitMessages.size()
+ + ", totalWrittenRows=" + writtenRows);
+
+ blockId = newBlockId;
+ segmentRows = 0;
+ } catch (Exception e) {
+ throw new IOException("Failed to rotate batchWriter for table "
+ + project + "." + tableName, e);
+ }
+ }
+
+
+ private boolean isVariableWidthType(OdpsType type) {
+ return type == OdpsType.STRING || type == OdpsType.VARCHAR
+ || type == OdpsType.CHAR || type == OdpsType.BINARY;
+ }
+
+ /**
+ * Limit write batch size by estimating variable-width column bytes
directly
+ * from the off-heap VectorColumn, without materializing data to Java heap.
+ */
+ private int limitWriteBatchByBytesStreaming(VectorTable inputTable, int
numCols,
+ int rowOffset, int batchRows) {
+ for (int col = 0; col < numCols && col < columnTypeInfos.size();
col++) {
+ OdpsType odpsType = columnTypeInfos.get(col).getOdpsType();
+ if (!isVariableWidthType(odpsType)) {
+ continue;
+ }
+ VectorColumn vc = inputTable.getColumn(col);
+ batchRows = findMaxRowsForColumnStreaming(vc, rowOffset,
batchRows, MAX_ARROW_BATCH_BYTES);
+ if (batchRows <= 1) {
+ return Math.max(1, batchRows);
+ }
+ }
+ return batchRows;
+ }
+
+ /**
+ * Find the maximum number of rows (from rowOffset) whose total byte size
+ * fits within budget, by reading offset metadata directly from
VectorColumn.
+ */
+ private int findMaxRowsForColumnStreaming(VectorColumn vc, int rowOffset,
int maxRows, long budget) {
+ long totalBytes = estimateColumnBytesStreaming(vc, rowOffset, maxRows);
+ if (totalBytes <= budget) {
+ return maxRows;
+ }
+ int rows = maxRows;
+ while (rows > 1) {
+ rows = rows / 2;
+ totalBytes = estimateColumnBytesStreaming(vc, rowOffset, rows);
+ if (totalBytes <= budget) {
+ int lo = rows;
+ int hi = Math.min(rows * 2, maxRows);
+ while (lo < hi) {
+ int mid = lo + (hi - lo + 1) / 2;
+ if (estimateColumnBytesStreaming(vc, rowOffset, mid) <=
budget) {
+ lo = mid;
+ } else {
+ hi = mid - 1;
+ }
+ }
+ return lo;
+ }
+ }
+ return 1;
+ }
+
+ /**
+ * Estimate total bytes for a range of rows in a VectorColumn by reading
+ * the offset array directly from off-heap memory, without creating any
+ * byte[] objects. This is O(1) per row (just offset subtraction).
+ */
+ private long estimateColumnBytesStreaming(VectorColumn vc, int rowOffset,
int rows) {
+ long total = 0;
+ long offsetAddr = vc.offsetAddress();
Review Comment:
**[Performance] O(rows) per-row iteration when O(1) range computation is
possible.**
This function iterates row-by-row, checking nulls and computing individual
offsets. However, like the scanner's `findMaxRowsWithinBudget` which uses
`vec.getOffsetBuffer().getInt((offset + N) * 4) -
vec.getOffsetBuffer().getInt(offset * 4)` for O(1) range byte computation, you
could compute the total range bytes in O(1) here too:
```java
private long estimateColumnBytesStreaming(VectorColumn vc, int rowOffset,
int rows) {
long offsetAddr = vc.offsetAddress();
int startOff = rowOffset == 0 ? 0
: OffHeap.getInt(null, offsetAddr + 4L * (rowOffset - 1));
int endOff = OffHeap.getInt(null, offsetAddr + 4L * (rowOffset + rows -
1));
return endOff - startOff;
}
```
This would include null values' bytes in the estimate (since VectorColumn
offsets are contiguous regardless of nulls), but that's a safe overestimate.
The current O(rows) approach is called O(log N) times by the binary search in
`findMaxRowsForColumnStreaming`, making the total cost O(N log N) instead of
O(log N). For `maxWriteBatchRows=4096` this is acceptable, but the optimization
is simple.
##########
fe/be-java-extensions/max-compute-connector/src/main/java/org/apache/doris/maxcompute/MaxComputeJniWriter.java:
##########
@@ -127,10 +153,12 @@ public MaxComputeJniWriter(int batchSize, Map<String,
String> params) {
this.writeSessionId =
Objects.requireNonNull(params.get(WRITE_SESSION_ID),
"required property '" + WRITE_SESSION_ID + "'.");
this.blockId = Long.parseLong(params.getOrDefault(BLOCK_ID, "0"));
+ this.nextBlockId = this.blockId + 1; // Reserve blockId for first
writer, increment for segments
this.partitionSpec = params.getOrDefault(PARTITION_SPEC, "");
Review Comment:
**[Latent Bug] BlockId collision on segment rotation with multiple partition
writers.**
Setting `nextBlockId = blockId + 1` means the rotation ID space of one
writer overlaps with the initial blockId of the next writer created in the same
pipeline instance.
**Scenario:** Instance 0 creates two partition writers (blockIds 0 and 1 via
`_next_block_id.fetch_add(1)` in C++):
- Writer A: `blockId=0`, `nextBlockId=1`. On first rotation → uses blockId
**1**
- Writer B: `blockId=1`, `nextBlockId=2`. Initial batchWriter already uses
blockId **1**
- **Collision**: Writer A's rotated batchWriter and Writer B's initial
batchWriter both use blockId 1 in the same `writeSession`.
Currently safe because only one partition writer is created per
`VMCTableWriter` instance (static path uses one key, dynamic/non-partitioned
path uses empty string key). But this is a latent bug that will surface if
multi-partition-writer support is added.
**Suggested fix:** Have the C++ side allocate a sub-range of blockIds per
partition writer (e.g., reserve `BLOCK_ID_STRIDE / max_partitions` IDs per
writer), or pass the C++ atomic counter's next value to Java on each rotation
via callback. At minimum, add a comment documenting this constraint.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]