This is an automated email from the ASF dual-hosted git repository.
umamahesh pushed a commit to branch HDDS-3816-ec
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/HDDS-3816-ec by this push:
new b9adbcd HDDS-6348: EC: PartialStripe failure handling logic is
writing padding bytes also to DNs (#3124)
b9adbcd is described below
commit b9adbcd89ad84d71e73ddff26f4e039c6aeba499
Author: Uma Maheswara Rao G <[email protected]>
AuthorDate: Wed Feb 23 21:30:18 2022 -0800
HDDS-6348: EC: PartialStripe failure handling logic is writing padding
bytes also to DNs (#3124)
---
.../client/io/BlockOutputStreamEntryPool.java | 3 +-
.../hadoop/ozone/client/io/ECKeyOutputStream.java | 101 +++++++++++----------
.../hadoop/ozone/client/TestOzoneECClient.java | 66 ++++++++++++++
3 files changed, 123 insertions(+), 47 deletions(-)
diff --git
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntryPool.java
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntryPool.java
index 1c09823..856d486 100644
---
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntryPool.java
+++
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntryPool.java
@@ -324,7 +324,8 @@ public class BlockOutputStreamEntryPool {
if (keyArgs != null) {
// in test, this could be null
long length = getKeyLength();
- Preconditions.checkArgument(offset == length);
+ Preconditions.checkArgument(offset == length,
+ "Epected offset: " + offset + " expected len: " + length);
keyArgs.setDataSize(length);
keyArgs.setLocationInfoList(getLocationInfoList());
// When the key is multipart upload part file upload, we should not
diff --git
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECKeyOutputStream.java
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECKeyOutputStream.java
index 9ef3d06..266ee90 100644
---
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECKeyOutputStream.java
+++
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECKeyOutputStream.java
@@ -196,10 +196,10 @@ public class ECKeyOutputStream extends KeyOutputStream {
}
private StripeWriteStatus rewriteStripeToNewBlockGroup(
- int failedStripeDataSize, boolean allocateBlockIfFull, boolean close)
+ long failedStripeDataSize, boolean allocateBlockIfFull, boolean close)
throws IOException {
- long[] failedDataStripeChunkLens = new long[numDataBlks];
- long[] failedParityStripeChunkLens = new long[numParityBlks];
+ int[] failedDataStripeChunkLens = new int[numDataBlks];
+ int[] failedParityStripeChunkLens = new int[numParityBlks];
final ByteBuffer[] dataBuffers = ecChunkBufferCache.getDataBuffers();
for (int i = 0; i < numDataBlks; i++) {
failedDataStripeChunkLens[i] = dataBuffers[i].limit();
@@ -227,18 +227,18 @@ public class ECKeyOutputStream extends KeyOutputStream {
blockOutputStreamEntryPool.getCurrentStreamEntry();
long totalLenToWrite = failedStripeDataSize;
for (int i = 0; i < numDataBlks; i++) {
- long currentLen = totalLenToWrite < failedDataStripeChunkLens[i] ?
+ int currentLen = (int) (totalLenToWrite < failedDataStripeChunkLens[i] ?
totalLenToWrite :
- failedDataStripeChunkLens[i];
+ failedDataStripeChunkLens[i]);
if (currentLen > 0) {
- handleOutputStreamWrite(i, currentLen, true, false);
+ handleOutputStreamWrite(i, currentLen, false);
}
currentStreamEntry.useNextBlockStream();
totalLenToWrite -= currentLen;
}
for (int i = 0; i < (numParityBlks); i++) {
handleOutputStreamWrite(i + numDataBlks, failedParityStripeChunkLens[i],
- true, true);
+ true);
currentStreamEntry.useNextBlockStream();
}
@@ -373,47 +373,51 @@ public class ECKeyOutputStream extends KeyOutputStream {
for (int i =
numDataBlks; i < (this.numDataBlks + this.numParityBlks); i++) {
// Move the stream entry cursor to parity block index
- handleParityWrite(i, parityCellSize, true);
+ handleParityWrite(i, parityCellSize);
}
}
private int handleDataWrite(int currIdx, byte[] b, int off, long len,
- boolean isFullCell) throws IOException {
+ boolean isFullCell) {
int pos = ecChunkBufferCache.addToDataBuffer(currIdx, b, off, (int) len);
- handleOutputStreamWrite(currIdx, len, isFullCell, false);
- if (pos == ecChunkSize) {
+ if (isFullCell) {
+ Preconditions.checkArgument(pos == ecChunkSize,
+ "When full cell passed, the pos: " + pos
+ + " should match to ec chunk size.");
+ handleOutputStreamWrite(currIdx, pos, false);
blockOutputStreamEntryPool.getCurrentStreamEntry().useNextBlockStream();
}
return pos;
}
- private void handleParityWrite(int currIdx, long len, boolean isFullCell) {
- handleOutputStreamWrite(currIdx, len, isFullCell, true);
+ private void handleParityWrite(int currIdx, int len) {
+ handleOutputStreamWrite(currIdx, len, true);
blockOutputStreamEntryPool.getCurrentStreamEntry().useNextBlockStream();
}
- private void handleOutputStreamWrite(int currIdx, long len,
- boolean isFullCell, boolean isParity) {
-
- BlockOutputStreamEntry current =
- blockOutputStreamEntryPool.getCurrentStreamEntry();
-
- if (isFullCell) {
- ByteBuffer bytesToWrite = isParity ?
- ecChunkBufferCache.getParityBuffers()[currIdx - numDataBlks] :
- ecChunkBufferCache.getDataBuffers()[currIdx];
- try {
- // Since it's a fullcell, let's write all content from buffer.
- writeToOutputStream(current, len, bytesToWrite.array(),
- bytesToWrite.limit(), 0, isParity);
- } catch (Exception e) {
- markStreamAsFailed(e);
- }
- }
- }
-
- private int writeToOutputStream(BlockOutputStreamEntry current, long len,
+ private void handleOutputStreamWrite(int currIdx, int len, boolean isParity)
{
+ ByteBuffer bytesToWrite = isParity ?
+ ecChunkBufferCache.getParityBuffers()[currIdx - numDataBlks] :
+ ecChunkBufferCache.getDataBuffers()[currIdx];
+ try {
+ // Since it's a full cell, let's write all content from buffer.
+ // At a time we write max cell size in EC. So, it should safe to cast
+ // the len to int to use the super class defined write API.
+ // The len cannot be bigger than cell buffer size.
+ assert len <= ecChunkSize : " The len: " + len + ". EC chunk size: "
+ + ecChunkSize;
+ assert len <= bytesToWrite
+ .limit() : " The len: " + len + ". Chunk buffer limit: "
+ + bytesToWrite.limit();
+ writeToOutputStream(blockOutputStreamEntryPool.getCurrentStreamEntry(),
+ bytesToWrite.array(), len, 0, isParity);
+ } catch (Exception e) {
+ markStreamAsFailed(e);
+ }
+ }
+
+ private long writeToOutputStream(ECBlockOutputStreamEntry current,
byte[] b, int writeLen, int off, boolean isParity)
throws IOException {
try {
@@ -424,8 +428,11 @@ public class ECKeyOutputStream extends KeyOutputStream {
}
current.write(b, off, writeLen);
} catch (IOException ioe) {
- LOG.debug("Exception:: writeLen: " + writeLen + ", total len:" + len,
- ioe);
+ LOG.debug(
+ "Exception while writing the cell buffers. The writeLen: " + writeLen
+ + ". The block internal index is: "
+ + current
+ .getCurrentStreamIdx(), ioe);
handleException(current, ioe);
}
return writeLen;
@@ -494,7 +501,7 @@ public class ECKeyOutputStream extends KeyOutputStream {
}
closed = true;
try {
- final int lastStripeSize = getCurrentDataStripeSize();
+ final long lastStripeSize = getCurrentDataStripeSize();
if (isPartialStripe(lastStripeSize)) {
ByteBuffer bytesToWrite =
ecChunkBufferCache.getDataBuffers()[blockOutputStreamEntryPool
@@ -502,11 +509,11 @@ public class ECKeyOutputStream extends KeyOutputStream {
// Finish writing the current partial cached chunk
if (bytesToWrite.position() % ecChunkSize != 0) {
- final BlockOutputStreamEntry current =
+ final ECBlockOutputStreamEntry current =
blockOutputStreamEntryPool.getCurrentStreamEntry();
try {
byte[] array = bytesToWrite.array();
- writeToOutputStream(current, bytesToWrite.position(), array,
+ writeToOutputStream(current, array,
bytesToWrite.position(), 0, false);
} catch (Exception e) {
markStreamAsFailed(e);
@@ -514,7 +521,7 @@ public class ECKeyOutputStream extends KeyOutputStream {
}
final int parityCellSize =
- lastStripeSize < ecChunkSize ? lastStripeSize : ecChunkSize;
+ (int) (lastStripeSize < ecChunkSize ? lastStripeSize :
ecChunkSize);
addPadding(parityCellSize);
if (handleParityWrites(parityCellSize,
false, true) == StripeWriteStatus.FAILED) {
@@ -529,7 +536,9 @@ public class ECKeyOutputStream extends KeyOutputStream {
}
closeCurrentStreamEntry();
- Preconditions.checkArgument(writeOffset == offset);
+ Preconditions.checkArgument(writeOffset == offset,
+ "Expected writeOffset= " + writeOffset
+ + " Expected offset=" + offset);
blockOutputStreamEntryPool.commitKey(offset);
} finally {
blockOutputStreamEntryPool.cleanup();
@@ -537,7 +546,7 @@ public class ECKeyOutputStream extends KeyOutputStream {
ecChunkBufferCache.release();
}
- private void handleStripeFailure(int lastStripeSize,
+ private void handleStripeFailure(long lastStripeSize,
boolean allocateBlockIfFull, boolean isClose)
throws IOException {
StripeWriteStatus stripeWriteStatus;
@@ -575,13 +584,13 @@ public class ECKeyOutputStream extends KeyOutputStream {
buf.position(limit);
}
- private boolean isPartialStripe(int stripeSize) {
- return stripeSize > 0 && stripeSize < numDataBlks * ecChunkSize;
+ private boolean isPartialStripe(long stripeSize) {
+ return stripeSize > 0 && stripeSize < (numDataBlks * ecChunkSize);
}
- private int getCurrentDataStripeSize() {
+ private long getCurrentDataStripeSize() {
final ByteBuffer[] dataBuffers = ecChunkBufferCache.getDataBuffers();
- int lastStripeSize = 0;
+ long lastStripeSize = 0;
for (int i = 0; i < numDataBlks; i++) {
lastStripeSize += dataBuffers[i].position();
}
diff --git
a/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/TestOzoneECClient.java
b/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/TestOzoneECClient.java
index 8b2a8a1..8a17d81 100644
---
a/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/TestOzoneECClient.java
+++
b/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/TestOzoneECClient.java
@@ -843,6 +843,72 @@ public class TestOzoneECClient {
}
}
+ @Test
+ public void testPartialStripeWithPartialChunkRetry()
+ throws IOException {
+ close();
+ OzoneConfiguration con = new OzoneConfiguration();
+ // block size of 3KB could hold 3 full stripes
+ con.setStorageSize(OzoneConfigKeys.OZONE_SCM_BLOCK_SIZE, 3,
StorageUnit.KB);
+ con.setInt(OzoneConfigKeys.OZONE_CLIENT_MAX_EC_STRIPE_WRITE_RETRIES, 3);
+ MultiNodePipelineBlockAllocator blkAllocator =
+ new MultiNodePipelineBlockAllocator(con, dataBlocks + parityBlocks,
15);
+ createNewClient(con, blkAllocator);
+
+ store.createVolume(volumeName);
+ OzoneVolume volume = store.getVolume(volumeName);
+ volume.createBucket(bucketName);
+ OzoneBucket bucket = volume.getBucket(bucketName);
+ int numFullChunks = 7;
+ //Prepare additional partial chunk.
+ int partialChunkSize = 1020;
+ byte[] partialChunk = new byte[partialChunkSize];
+ Arrays.fill(partialChunk, 0, partialChunk.length, "1".getBytes(UTF_8)[0]);
+
+ // A partial chunk to trigger partialStripe check
+ // in ECKeyOutputStream.close()
+ int inSize = chunkSize;
+ try (OzoneOutputStream out = bucket.createKey(keyName, inSize,
+ new ECReplicationConfig(dataBlocks, parityBlocks,
+ ECReplicationConfig.EcCodec.RS, chunkSize), new HashMap<>())) {
+ for (int i = 0; i < numFullChunks; i++) {
+ out.write(inputChunks[i % dataBlocks]);
+ }
+
+ out.write(partialChunk);
+
+ int[] nodesIndexesToMarkFailure = new int[] {0, 4};
+ List<DatanodeDetails> failedDNs = new ArrayList<>();
+ List<HddsProtos.DatanodeDetailsProto> dns = blkAllocator.getClusterDns();
+ for (int j = 0; j < nodesIndexesToMarkFailure.length; j++) {
+ failedDNs.add(DatanodeDetails
+ .getFromProtoBuf(dns.get(nodesIndexesToMarkFailure[j])));
+ }
+
+ // First let's set storage as bad
+ ((MockXceiverClientFactory) factoryStub).setFailedStorages(failedDNs);
+
+ }
+
+ try (OzoneInputStream is = bucket.readKey(keyName)) {
+ byte[] fileContent = new byte[chunkSize];
+ for (int i = 0; i < numFullChunks; i++) {
+ Assert.assertEquals(inputChunks[i % dataBlocks].length,
+ is.read(fileContent));
+ Assert.assertTrue("Expected: " + new String(inputChunks[i %
dataBlocks],
+ UTF_8) + " \n " + "Actual: " + new String(fileContent, UTF_8),
+ Arrays.equals(inputChunks[i % dataBlocks], fileContent));
+ }
+
+ byte[] partialChunkToRead = new byte[partialChunkSize];
+ Assert
+ .assertEquals(partialChunkToRead.length,
is.read(partialChunkToRead));
+ Assert.assertTrue(Arrays.equals(partialChunk, partialChunkToRead));
+
+ Assert.assertEquals(-1, is.read(partialChunkToRead));
+ }
+ }
+
@Test(expected = NotImplementedException.class)
public void testFlushShouldThrowNotImplementedException() throws IOException
{
store.createVolume(volumeName);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]