zhengchenyu commented on code in PR #1666:
URL:
https://github.com/apache/incubator-uniffle/pull/1666#discussion_r1582706355
##########
client-mr/core/src/main/java/org/apache/hadoop/mapred/SortWriteBuffer.java:
##########
@@ -341,22 +341,57 @@ public SortBufferIterator(SortWriteBuffer<K, V>
sortWriteBuffer) {
this.iterator = sortWriteBuffer.records.iterator();
}
+ private byte[] fetchDataFromBuffers(int startIndex, int offset, int
length) {
+ int bufferIndex = startIndex;
+ int currentOffset = offset;
+ int remainingLength = length;
+ byte[] data = new byte[length]; // Create a new array to store the
complete data
+ int copyDestPos = 0;
+
+ while (remainingLength > 0) {
+ WrappedBuffer currentBuffer = sortWriteBuffer.buffers.get(bufferIndex);
+ byte[] currentBufferData = currentBuffer.getBuffer();
+ int currentBufferCapacity = currentBuffer.getSize();
+ int copyLength = Math.min(currentBufferCapacity - currentOffset,
remainingLength);
+
+ // Copy data from the current buffer to the data array
+ System.arraycopy(currentBufferData, currentOffset, data, copyDestPos,
copyLength);
+ remainingLength -= copyLength;
+ copyDestPos += copyLength;
+
+ // Move to the next buffer
+ bufferIndex++;
+ currentOffset = 0; // Start position in the new buffer is 0
+ }
+ return data;
+ }
+
@Override
public DataInputBuffer getKey() {
- SortWriteBuffer.WrappedBuffer keyWrappedBuffer =
- sortWriteBuffer.buffers.get(currentRecord.getKeyIndex());
- byte[] rawData = keyWrappedBuffer.getBuffer();
- keyBuffer.reset(rawData, currentRecord.getKeyOffSet(),
currentRecord.getKeyLength());
+ int keyIndex = currentRecord.getKeyIndex();
+ int keyOffset = currentRecord.getKeyOffSet();
+ int keyLength = currentRecord.getKeyLength();
+ byte[] keyData = fetchDataFromBuffers(keyIndex, keyOffset, keyLength);
+
+ // Reset keyBuffer with the assembled complete key and return
+ keyBuffer.reset(keyData, 0, keyLength);
return keyBuffer;
}
@Override
public DataInputBuffer getValue() {
- SortWriteBuffer.WrappedBuffer valueWrappedBuffer =
- sortWriteBuffer.buffers.get(currentRecord.getKeyIndex());
- byte[] rawData = valueWrappedBuffer.getBuffer();
+ int valueStartIndex = currentRecord.getKeyIndex();
int valueOffset = currentRecord.getKeyOffSet() +
currentRecord.getKeyLength();
- valueBuffer.reset(rawData, valueOffset, currentRecord.getValueLength());
+ int valueLength = currentRecord.getValueLength();
+
+ // Adjust start index and offset for the start of the value
+ while (valueOffset >=
sortWriteBuffer.buffers.get(valueStartIndex).getSize()) {
Review Comment:
Can I integrate this code into fetchDataFromBuffers?
##########
client-mr/core/src/main/java/org/apache/hadoop/mapred/SortWriteBuffer.java:
##########
@@ -341,22 +341,57 @@ public SortBufferIterator(SortWriteBuffer<K, V>
sortWriteBuffer) {
this.iterator = sortWriteBuffer.records.iterator();
}
+ private byte[] fetchDataFromBuffers(int startIndex, int offset, int
length) {
+ int bufferIndex = startIndex;
Review Comment:
Since startIndex, offset, length only used here, do we need thees variables?
Here is extra data copy.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]