This is an automated email from the ASF dual-hosted git repository.
umamahesh pushed a commit to branch HDDS-3816-ec
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/HDDS-3816-ec by this push:
new 78ca327 HDDS-6358. EC: Refactor ECKeyOutputStream#write() (#3120)
78ca327 is described below
commit 78ca327e7791accfd2b72375a01b441b593ba376
Author: Kaijie Chen <[email protected]>
AuthorDate: Fri Feb 25 05:05:44 2022 +0800
HDDS-6358. EC: Refactor ECKeyOutputStream#write() (#3120)
---
.../hadoop/ozone/client/io/ECKeyOutputStream.java | 101 ++++++++-------------
.../hadoop/ozone/client/TestOzoneECClient.java | 4 +-
2 files changed, 38 insertions(+), 67 deletions(-)
diff --git
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECKeyOutputStream.java
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECKeyOutputStream.java
index 4be43e7..1066f0a 100644
---
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECKeyOutputStream.java
+++
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECKeyOutputStream.java
@@ -151,52 +151,34 @@ public class ECKeyOutputStream extends KeyOutputStream {
|| ((off + len) < 0)) {
throw new IndexOutOfBoundsException();
}
- if (len == 0) {
- return;
- }
- blockOutputStreamEntryPool.allocateBlockIfNeeded();
-
- int currentStreamIdx = blockOutputStreamEntryPool.getCurrentStreamEntry()
- .getCurrentStreamIdx();
- int currentChunkBufferRemainingLength =
- ecChunkBufferCache.dataBuffers[currentStreamIdx].remaining();
- int currentChunkBufferLen =
- ecChunkBufferCache.dataBuffers[currentStreamIdx]
- .position();
- int maxLenToCurrChunkBuffer = Math.min(len, ecChunkSize);
- int currentWriterChunkLenToWrite =
- Math.min(currentChunkBufferRemainingLength, maxLenToCurrChunkBuffer);
- int pos = handleDataWrite(currentStreamIdx, b, off,
- currentWriterChunkLenToWrite,
- currentChunkBufferLen + currentWriterChunkLenToWrite == ecChunkSize);
- checkAndWriteParityCells(pos, false);
- int remLen = len - currentWriterChunkLenToWrite;
- int iters = remLen / ecChunkSize;
- int lastCellSize = remLen % ecChunkSize;
- off += currentWriterChunkLenToWrite;
-
- while (iters > 0) {
- currentStreamIdx = blockOutputStreamEntryPool.getCurrentStreamEntry()
- .getCurrentStreamIdx();
- pos = handleDataWrite(currentStreamIdx, b, off, ecChunkSize, true);
- off += ecChunkSize;
- iters--;
- checkAndWriteParityCells(pos, iters > 0 || remLen > 0);
- }
-
- if (lastCellSize > 0) {
- currentStreamIdx = blockOutputStreamEntryPool.getCurrentStreamEntry()
- .getCurrentStreamIdx();
- pos = handleDataWrite(currentStreamIdx, b, off,
- lastCellSize, false);
- checkAndWriteParityCells(pos, false);
+ int rem = len;
+ while (rem > 0) {
+ try {
+ blockOutputStreamEntryPool.allocateBlockIfNeeded();
+ int currentStreamIdx = blockOutputStreamEntryPool
+ .getCurrentStreamEntry().getCurrentStreamIdx();
+ int bufferRem =
+ ecChunkBufferCache.dataBuffers[currentStreamIdx].remaining();
+ int expectedWriteLen = Math.min(rem, Math.min(bufferRem, ecChunkSize));
+ int oldPos =
+ ecChunkBufferCache.dataBuffers[currentStreamIdx].position();
+ int pos =
+ handleDataWrite(currentStreamIdx, b, off, expectedWriteLen,
+ oldPos + expectedWriteLen == ecChunkSize);
+ checkAndWriteParityCells(pos);
+ long writtenLength = pos - oldPos;
+ rem -= writtenLength;
+ off += writtenLength;
+ } catch (Exception e) {
+ markStreamClosed();
+ throw new IOException(e.getMessage());
+ }
}
writeOffset += len;
}
private StripeWriteStatus rewriteStripeToNewBlockGroup(
- long failedStripeDataSize, boolean allocateBlockIfFull, boolean close)
- throws IOException {
+ long failedStripeDataSize, boolean close) throws IOException {
int[] failedDataStripeChunkLens = new int[numDataBlks];
int[] failedParityStripeChunkLens = new int[numParityBlks];
final ByteBuffer[] dataBuffers = ecChunkBufferCache.getDataBuffers();
@@ -259,9 +241,6 @@ public class ECKeyOutputStream extends KeyOutputStream {
// In most cases this should not happen except in the case stripe size
and
// block size same.
newBlockGroupStreamEntry.close();
- if (allocateBlockIfFull) {
- blockOutputStreamEntryPool.allocateBlockIfNeeded();
- }
} else {
newBlockGroupStreamEntry.resetToFirstEntry();
}
@@ -269,20 +248,19 @@ public class ECKeyOutputStream extends KeyOutputStream {
return StripeWriteStatus.SUCCESS;
}
- private void checkAndWriteParityCells(int lastDataBuffPos,
- boolean allocateBlockIfFull) throws IOException {
- //check data blocks finished
- //If index is > datanum blks
+ private void checkAndWriteParityCells(int lastDataBuffPos)
+ throws IOException {
+ // Check data blocks finished
+ // If index > numDataBlks
ECBlockOutputStreamEntry currentStreamEntry =
blockOutputStreamEntryPool.getCurrentStreamEntry();
int currentStreamIdx = currentStreamEntry.getCurrentStreamIdx();
if (currentStreamIdx == numDataBlks && lastDataBuffPos == ecChunkSize) {
//Lets encode and write
boolean shouldClose = currentStreamEntry.getRemaining() <= 0;
- if (handleParityWrites(ecChunkSize, allocateBlockIfFull,
- shouldClose) == StripeWriteStatus.FAILED) {
- handleStripeFailure(numDataBlks * ecChunkSize, allocateBlockIfFull,
- shouldClose);
+ if (handleParityWrites(ecChunkSize, shouldClose)
+ == StripeWriteStatus.FAILED) {
+ handleStripeFailure(numDataBlks * ecChunkSize, shouldClose);
} else {
// At this stage stripe write is successful.
currentStreamEntry.updateBlockGroupToAckedPosition(
@@ -293,8 +271,7 @@ public class ECKeyOutputStream extends KeyOutputStream {
}
private StripeWriteStatus handleParityWrites(int parityCellSize,
- boolean allocateBlockIfFull, boolean isLastStripe)
- throws IOException {
+ boolean isLastStripe) throws IOException {
writeParityCells(parityCellSize);
if (hasWriteFailure()) {
return StripeWriteStatus.FAILED;
@@ -315,9 +292,6 @@ public class ECKeyOutputStream extends KeyOutputStream {
if (streamEntry.getRemaining() <= 0) {
streamEntry.close();
- if (allocateBlockIfFull) {
- blockOutputStreamEntryPool.allocateBlockIfNeeded();
- }
} else {
streamEntry.resetToFirstEntry();
}
@@ -522,9 +496,9 @@ public class ECKeyOutputStream extends KeyOutputStream {
final int parityCellSize =
(int) (lastStripeSize < ecChunkSize ? lastStripeSize :
ecChunkSize);
addPadding(parityCellSize);
- if (handleParityWrites(parityCellSize,
- false, true) == StripeWriteStatus.FAILED) {
- handleStripeFailure(lastStripeSize, false, true);
+ if (handleParityWrites(parityCellSize, true)
+ == StripeWriteStatus.FAILED) {
+ handleStripeFailure(lastStripeSize, true);
} else {
blockOutputStreamEntryPool.getCurrentStreamEntry()
.updateBlockGroupToAckedPosition(
@@ -545,14 +519,11 @@ public class ECKeyOutputStream extends KeyOutputStream {
ecChunkBufferCache.release();
}
- private void handleStripeFailure(long lastStripeSize,
- boolean allocateBlockIfFull, boolean isClose)
+ private void handleStripeFailure(long lastStripeSize, boolean isClose)
throws IOException {
StripeWriteStatus stripeWriteStatus;
for (int i = 0; i < this.config.getMaxECStripeWriteRetries(); i++) {
- stripeWriteStatus =
- rewriteStripeToNewBlockGroup(lastStripeSize,
- allocateBlockIfFull, isClose);
+ stripeWriteStatus = rewriteStripeToNewBlockGroup(lastStripeSize,
isClose);
if (stripeWriteStatus == StripeWriteStatus.SUCCESS) {
return;
}
diff --git
a/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/TestOzoneECClient.java
b/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/TestOzoneECClient.java
index bd9ac20..0ee6808 100644
---
a/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/TestOzoneECClient.java
+++
b/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/TestOzoneECClient.java
@@ -598,9 +598,9 @@ public class TestOzoneECClient {
.size() == clusterSize);
}
- @Test(expected = IllegalStateException.class)
// The mocked impl throws IllegalStateException when there are not enough
- // nodes in allocateBlock request.
+ // nodes in allocateBlock request. But write() converts it to IOException.
+ @Test(expected = IOException.class)
public void testStripeWriteRetriesOnAllNodeFailures() throws IOException {
OzoneConfiguration con = new OzoneConfiguration();
con.setStorageSize(OzoneConfigKeys.OZONE_SCM_BLOCK_SIZE, 2,
StorageUnit.KB);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]