This is an automated email from the ASF dual-hosted git repository.
adoroszlai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/master by this push:
new ccbfed5f82b HDDS-14026. Close OutputStream properly in
TestBlockOutputStream (#9392)
ccbfed5f82b is described below
commit ccbfed5f82bf4eaf5a7635ac491760b795ae442d
Author: Doroszlai, Attila <[email protected]>
AuthorDate: Mon Dec 1 12:36:42 2025 +0100
HDDS-14026. Close OutputStream properly in TestBlockOutputStream (#9392)
---
.../ozone/client/rpc/TestBlockOutputStream.java | 640 ++++++++--------
.../rpc/TestBlockOutputStreamWithFailures.java | 850 ++++++++++-----------
2 files changed, 750 insertions(+), 740 deletions(-)
diff --git
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStream.java
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStream.java
index 579b6482201..65c91bb8a5d 100644
---
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStream.java
+++
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStream.java
@@ -195,64 +195,67 @@ void testWriteLessThanChunkSize(boolean flushDelay,
boolean enablePiggybacking)
metrics.getPendingContainerOpCountMetrics(PutBlock);
long totalOpCount = metrics.getTotalOpCount();
String keyName = getKeyName();
- OzoneOutputStream key = createKey(client, keyName);
int dataLength = 50;
final int totalWriteLength = dataLength * 2;
byte[] data1 = RandomUtils.secure().randomBytes(dataLength);
- key.write(data1);
- KeyOutputStream keyOutputStream =
- assertInstanceOf(KeyOutputStream.class, key.getOutputStream());
-
- assertEquals(1, keyOutputStream.getStreamEntries().size());
- RatisBlockOutputStream blockOutputStream =
- assertInstanceOf(RatisBlockOutputStream.class,
- keyOutputStream.getStreamEntries().get(0).getOutputStream());
-
- // we have written data less than a chunk size, the data will just sit
- // in the buffer, with only one buffer being allocated in the buffer pool
-
- BufferPool bufferPool = blockOutputStream.getBufferPool();
- assertEquals(1, bufferPool.getSize());
- //Just the writtenDataLength will be updated here
- assertEquals(dataLength, blockOutputStream.getWrittenDataLength());
-
- // no data will be flushed till now
- assertEquals(0, blockOutputStream.getTotalDataFlushedLength());
- assertEquals(0, blockOutputStream.getTotalAckDataLength());
- assertEquals(pendingWriteChunkCount,
- metrics.getPendingContainerOpCountMetrics(WriteChunk));
- assertEquals(pendingPutBlockCount,
- metrics.getPendingContainerOpCountMetrics(PutBlock));
+ KeyOutputStream keyOutputStream;
+ RatisBlockOutputStream blockOutputStream;
+ BufferPool bufferPool;
+ try (OzoneOutputStream key = createKey(client, keyName)) {
+ key.write(data1);
+ keyOutputStream =
+ assertInstanceOf(KeyOutputStream.class, key.getOutputStream());
- // commitIndex2FlushedData Map will be empty here
- assertEquals(0,
blockOutputStream.getCommitIndex2flushedDataMap().size());
- // Total write data greater than or equal one chunk
- // size to make sure flush will sync data.
- key.write(data1);
- // This will flush the data and update the flush length and the map.
- key.flush();
+ assertEquals(1, keyOutputStream.getStreamEntries().size());
+ blockOutputStream =
+ assertInstanceOf(RatisBlockOutputStream.class,
+ keyOutputStream.getStreamEntries().get(0).getOutputStream());
- // flush is a sync call, all pending operations will complete
- assertEquals(pendingWriteChunkCount,
- metrics.getPendingContainerOpCountMetrics(WriteChunk));
- assertEquals(pendingPutBlockCount,
- metrics.getPendingContainerOpCountMetrics(PutBlock));
- // we have written data less than a chunk size, the data will just sit
- // in the buffer, with only one buffer being allocated in the buffer pool
+ // we have written data less than a chunk size, the data will just sit
+ // in the buffer, with only one buffer being allocated in the buffer
pool
- assertEquals(1, bufferPool.getSize());
- assertEquals(totalWriteLength, blockOutputStream.getWrittenDataLength());
- assertEquals(totalWriteLength,
- blockOutputStream.getTotalDataFlushedLength());
- assertEquals(0,
- blockOutputStream.getCommitIndex2flushedDataMap().size());
+ bufferPool = blockOutputStream.getBufferPool();
+ assertEquals(1, bufferPool.getSize());
+ //Just the writtenDataLength will be updated here
+ assertEquals(dataLength, blockOutputStream.getWrittenDataLength());
- // flush ensures watchForCommit updates the total length acknowledged
- assertEquals(totalWriteLength,
blockOutputStream.getTotalAckDataLength());
+ // no data will be flushed till now
+ assertEquals(0, blockOutputStream.getTotalDataFlushedLength());
+ assertEquals(0, blockOutputStream.getTotalAckDataLength());
+ assertEquals(pendingWriteChunkCount,
+ metrics.getPendingContainerOpCountMetrics(WriteChunk));
+ assertEquals(pendingPutBlockCount,
+ metrics.getPendingContainerOpCountMetrics(PutBlock));
- assertEquals(1, keyOutputStream.getStreamEntries().size());
- // now close the stream, It will update ack length after watchForCommit
- key.close();
+ // commitIndex2FlushedData Map will be empty here
+ assertEquals(0,
blockOutputStream.getCommitIndex2flushedDataMap().size());
+ // Total write data greater than or equal one chunk
+ // size to make sure flush will sync data.
+ key.write(data1);
+ // This will flush the data and update the flush length and the map.
+ key.flush();
+
+ // flush is a sync call, all pending operations will complete
+ assertEquals(pendingWriteChunkCount,
+ metrics.getPendingContainerOpCountMetrics(WriteChunk));
+ assertEquals(pendingPutBlockCount,
+ metrics.getPendingContainerOpCountMetrics(PutBlock));
+ // we have written data less than a chunk size, the data will just sit
+ // in the buffer, with only one buffer being allocated in the buffer
pool
+
+ assertEquals(1, bufferPool.getSize());
+ assertEquals(totalWriteLength,
blockOutputStream.getWrittenDataLength());
+ assertEquals(totalWriteLength,
+ blockOutputStream.getTotalDataFlushedLength());
+ assertEquals(0,
+ blockOutputStream.getCommitIndex2flushedDataMap().size());
+
+ // flush ensures watchForCommit updates the total length acknowledged
+ assertEquals(totalWriteLength,
blockOutputStream.getTotalAckDataLength());
+
+ assertEquals(1, keyOutputStream.getStreamEntries().size());
+ // now close the stream, It will update ack length after watchForCommit
+ }
assertEquals(pendingWriteChunkCount,
metrics.getPendingContainerOpCountMetrics(WriteChunk));
@@ -293,86 +296,88 @@ void testWriteExactlyFlushSize(boolean flushDelay,
boolean enablePiggybacking) t
final long totalOpCount = metrics.getTotalOpCount();
String keyName = getKeyName();
- OzoneOutputStream key = createKey(client, keyName);
// write data equal to 2 chunks
int dataLength = FLUSH_SIZE;
byte[] data1 = RandomUtils.secure().randomBytes(dataLength);
- key.write(data1);
-
- assertEquals(writeChunkCount + 2,
- metrics.getContainerOpCountMetrics(WriteChunk));
- assertEquals(putBlockCount + 1,
- metrics.getContainerOpCountMetrics(PutBlock));
- // The WriteChunk and PutBlock can be completed soon.
- assertThat(metrics.getPendingContainerOpCountMetrics(WriteChunk))
- .isLessThanOrEqualTo(pendingWriteChunkCount + 2);
- assertThat(metrics.getPendingContainerOpCountMetrics(PutBlock))
- .isLessThanOrEqualTo(pendingPutBlockCount + 1);
-
- KeyOutputStream keyOutputStream =
- assertInstanceOf(KeyOutputStream.class, key.getOutputStream());
- assertEquals(1, keyOutputStream.getStreamEntries().size());
- RatisBlockOutputStream blockOutputStream =
- assertInstanceOf(RatisBlockOutputStream.class,
- keyOutputStream.getStreamEntries().get(0).getOutputStream());
-
- // we have just written data equal flush Size = 2 chunks, at this time
- // buffer pool will have 2 buffers allocated worth of chunk size
-
- assertEquals(2, blockOutputStream.getBufferPool().getSize());
- // writtenDataLength as well flushedDataLength will be updated here
- assertEquals(dataLength, blockOutputStream.getWrittenDataLength());
-
- assertEquals(dataLength, blockOutputStream.getTotalDataFlushedLength());
- assertEquals(0, blockOutputStream.getTotalAckDataLength());
-
- // Before flush, if there was no pending PutBlock which means it is
complete.
- // It put a commit index into commitIndexMap.
- assertEquals((metrics.getPendingContainerOpCountMetrics(PutBlock) ==
pendingPutBlockCount) ? 1 : 0,
- blockOutputStream.getCommitIndex2flushedDataMap().size());
-
- // Now do a flush.
- key.flush();
-
- assertEquals(1, keyOutputStream.getStreamEntries().size());
- // The previously written data is equal to flushSize, so no action is
- // triggered when execute flush, if flushDelay is enabled.
- // If flushDelay is disabled, it will call waitOnFlushFutures to wait all
- // putBlocks finished. It was broken because WriteChunk and PutBlock
- // can be complete regardless of whether the flush executed or not.
- if (flushDelay) {
+ KeyOutputStream keyOutputStream;
+ RatisBlockOutputStream blockOutputStream;
+ try (OzoneOutputStream key = createKey(client, keyName)) {
+ key.write(data1);
+
+ assertEquals(writeChunkCount + 2,
+ metrics.getContainerOpCountMetrics(WriteChunk));
+ assertEquals(putBlockCount + 1,
+ metrics.getContainerOpCountMetrics(PutBlock));
+ // The WriteChunk and PutBlock can be completed soon.
assertThat(metrics.getPendingContainerOpCountMetrics(WriteChunk))
.isLessThanOrEqualTo(pendingWriteChunkCount + 2);
assertThat(metrics.getPendingContainerOpCountMetrics(PutBlock))
- .isLessThanOrEqualTo(pendingWriteChunkCount + 1);
- } else {
- assertEquals(pendingWriteChunkCount,
- metrics.getPendingContainerOpCountMetrics(WriteChunk));
- assertEquals(pendingPutBlockCount,
- metrics.getPendingContainerOpCountMetrics(PutBlock));
- }
+ .isLessThanOrEqualTo(pendingPutBlockCount + 1);
+
+ keyOutputStream =
+ assertInstanceOf(KeyOutputStream.class, key.getOutputStream());
+ assertEquals(1, keyOutputStream.getStreamEntries().size());
+ blockOutputStream =
+ assertInstanceOf(RatisBlockOutputStream.class,
+ keyOutputStream.getStreamEntries().get(0).getOutputStream());
+
+ // we have just written data equal flush Size = 2 chunks, at this time
+ // buffer pool will have 2 buffers allocated worth of chunk size
+
+ assertEquals(2, blockOutputStream.getBufferPool().getSize());
+ // writtenDataLength as well flushedDataLength will be updated here
+ assertEquals(dataLength, blockOutputStream.getWrittenDataLength());
+
+ assertEquals(dataLength,
blockOutputStream.getTotalDataFlushedLength());
+ assertEquals(0, blockOutputStream.getTotalAckDataLength());
+
+ // Before flush, if there was no pending PutBlock which means it is
complete.
+ // It put a commit index into commitIndexMap.
+ assertEquals((metrics.getPendingContainerOpCountMetrics(PutBlock) ==
pendingPutBlockCount) ? 1 : 0,
+ blockOutputStream.getCommitIndex2flushedDataMap().size());
+
+ // Now do a flush.
+ key.flush();
+
+ assertEquals(1, keyOutputStream.getStreamEntries().size());
+ // The previously written data is equal to flushSize, so no action is
+ // triggered when execute flush, if flushDelay is enabled.
+ // If flushDelay is disabled, it will call waitOnFlushFutures to wait
all
+ // putBlocks finished. It was broken because WriteChunk and PutBlock
+ // can be complete regardless of whether the flush executed or not.
+ if (flushDelay) {
+ assertThat(metrics.getPendingContainerOpCountMetrics(WriteChunk))
+ .isLessThanOrEqualTo(pendingWriteChunkCount + 2);
+ assertThat(metrics.getPendingContainerOpCountMetrics(PutBlock))
+ .isLessThanOrEqualTo(pendingWriteChunkCount + 1);
+ } else {
+ assertEquals(pendingWriteChunkCount,
+ metrics.getPendingContainerOpCountMetrics(WriteChunk));
+ assertEquals(pendingPutBlockCount,
+ metrics.getPendingContainerOpCountMetrics(PutBlock));
+ }
+
+ // Since the data in the buffer is already flushed, flush here will
have
+ // no impact on the counters and data structures
+ assertEquals(2, blockOutputStream.getBufferPool().getSize());
+
+ // No action is triggered when execute flush, BlockOutputStream will
not
+ // be updated.
+ assertEquals(flushDelay ? dataLength : 0,
+ blockOutputStream.getBufferPool().computeBufferData());
+ assertEquals(dataLength, blockOutputStream.getWrittenDataLength());
+ assertEquals(dataLength,
blockOutputStream.getTotalDataFlushedLength());
+ // If the flushDelay feature is enabled, nothing happens.
+ // The assertions will be as same as those before flush.
+ // If it flushed, the Commit index will be removed.
+ assertEquals((flushDelay &&
+ (metrics.getPendingContainerOpCountMetrics(PutBlock) ==
pendingPutBlockCount)) ? 1 : 0,
+ blockOutputStream.getCommitIndex2flushedDataMap().size());
+ assertEquals(flushDelay ? 0 : dataLength,
+ blockOutputStream.getTotalAckDataLength());
- // Since the data in the buffer is already flushed, flush here will have
- // no impact on the counters and data structures
- assertEquals(2, blockOutputStream.getBufferPool().getSize());
-
- // No action is triggered when execute flush, BlockOutputStream will not
- // be updated.
- assertEquals(flushDelay ? dataLength : 0,
- blockOutputStream.getBufferPool().computeBufferData());
- assertEquals(dataLength, blockOutputStream.getWrittenDataLength());
- assertEquals(dataLength, blockOutputStream.getTotalDataFlushedLength());
- // If the flushDelay feature is enabled, nothing happens.
- // The assertions will be as same as those before flush.
- // If it flushed, the Commit index will be removed.
- assertEquals((flushDelay &&
- (metrics.getPendingContainerOpCountMetrics(PutBlock) ==
pendingPutBlockCount)) ? 1 : 0,
- blockOutputStream.getCommitIndex2flushedDataMap().size());
- assertEquals(flushDelay ? 0 : dataLength,
- blockOutputStream.getTotalAckDataLength());
-
- // now close the stream, It will update ack length after watchForCommit
- key.close();
+ // now close the stream, It will update ack length after watchForCommit
+ }
assertEquals(dataLength, blockOutputStream.getTotalAckDataLength());
// make sure the bufferPool is empty
@@ -411,57 +416,60 @@ void testWriteMoreThanChunkSize(boolean flushDelay,
boolean enablePiggybacking)
PutBlock);
long totalOpCount = metrics.getTotalOpCount();
String keyName = getKeyName();
- OzoneOutputStream key = createKey(client, keyName);
// write data more than 1 chunk
int dataLength = CHUNK_SIZE + 50;
byte[] data1 = RandomUtils.secure().randomBytes(dataLength);
- key.write(data1);
- assertEquals(totalOpCount + 1, metrics.getTotalOpCount());
- KeyOutputStream keyOutputStream =
- assertInstanceOf(KeyOutputStream.class, key.getOutputStream());
+ KeyOutputStream keyOutputStream;
+ RatisBlockOutputStream blockOutputStream;
+ BufferPool bufferPool;
+ try (OzoneOutputStream key = createKey(client, keyName)) {
+ key.write(data1);
+ assertEquals(totalOpCount + 1, metrics.getTotalOpCount());
+ keyOutputStream =
+ assertInstanceOf(KeyOutputStream.class, key.getOutputStream());
- assertEquals(1, keyOutputStream.getStreamEntries().size());
- RatisBlockOutputStream blockOutputStream =
- assertInstanceOf(RatisBlockOutputStream.class,
- keyOutputStream.getStreamEntries().get(0).getOutputStream());
+ assertEquals(1, keyOutputStream.getStreamEntries().size());
+ blockOutputStream =
+ assertInstanceOf(RatisBlockOutputStream.class,
+ keyOutputStream.getStreamEntries().get(0).getOutputStream());
- // we have just written data equal flush Size > 1 chunk, at this time
- // buffer pool will have 2 buffers allocated worth of chunk size
+ // we have just written data equal flush Size > 1 chunk, at this time
+ // buffer pool will have 2 buffers allocated worth of chunk size
- BufferPool bufferPool = blockOutputStream.getBufferPool();
- assertEquals(2, bufferPool.getSize());
- // writtenDataLength as well flushedDataLength will be updated here
- assertEquals(dataLength, blockOutputStream.getWrittenDataLength());
+ bufferPool = blockOutputStream.getBufferPool();
+ assertEquals(2, bufferPool.getSize());
+ // writtenDataLength as well flushedDataLength will be updated here
+ assertEquals(dataLength, blockOutputStream.getWrittenDataLength());
- // since data written is still less than flushLength, flushLength will
- // still be 0.
- assertEquals(0, blockOutputStream.getTotalDataFlushedLength());
- assertEquals(0, blockOutputStream.getTotalAckDataLength());
+ // since data written is still less than flushLength, flushLength will
+ // still be 0.
+ assertEquals(0, blockOutputStream.getTotalDataFlushedLength());
+ assertEquals(0, blockOutputStream.getTotalAckDataLength());
- assertEquals(0,
blockOutputStream.getCommitIndex2flushedDataMap().size());
+ assertEquals(0,
blockOutputStream.getCommitIndex2flushedDataMap().size());
- // This will flush the data and update the flush length and the map.
- key.flush();
- assertEquals(writeChunkCount + 2,
- metrics.getContainerOpCountMetrics(WriteChunk));
- assertEquals(putBlockCount + ((enablePiggybacking) ? 0 : 1),
- metrics.getContainerOpCountMetrics(PutBlock));
- assertEquals(pendingWriteChunkCount,
- metrics.getPendingContainerOpCountMetrics(WriteChunk));
- assertEquals(pendingPutBlockCount,
- metrics.getPendingContainerOpCountMetrics(PutBlock));
+ // This will flush the data and update the flush length and the map.
+ key.flush();
+ assertEquals(writeChunkCount + 2,
+ metrics.getContainerOpCountMetrics(WriteChunk));
+ assertEquals(putBlockCount + ((enablePiggybacking) ? 0 : 1),
+ metrics.getContainerOpCountMetrics(PutBlock));
+ assertEquals(pendingWriteChunkCount,
+ metrics.getPendingContainerOpCountMetrics(WriteChunk));
+ assertEquals(pendingPutBlockCount,
+ metrics.getPendingContainerOpCountMetrics(PutBlock));
- assertEquals(2, bufferPool.getSize());
- assertEquals(dataLength, blockOutputStream.getWrittenDataLength());
+ assertEquals(2, bufferPool.getSize());
+ assertEquals(dataLength, blockOutputStream.getWrittenDataLength());
- assertEquals(dataLength, blockOutputStream.getTotalDataFlushedLength());
- assertEquals(0,
blockOutputStream.getCommitIndex2flushedDataMap().size());
+ assertEquals(dataLength,
blockOutputStream.getTotalDataFlushedLength());
+ assertEquals(0,
blockOutputStream.getCommitIndex2flushedDataMap().size());
- // flush ensures watchForCommit updates the total length acknowledged
- assertEquals(dataLength, blockOutputStream.getTotalAckDataLength());
+ // flush ensures watchForCommit updates the total length acknowledged
+ assertEquals(dataLength, blockOutputStream.getTotalAckDataLength());
- // now close the stream, It will update ack length after watchForCommit
- key.close();
+ // now close the stream, It will update ack length after watchForCommit
+ }
assertEquals(dataLength, blockOutputStream.getTotalAckDataLength());
// make sure the bufferPool is empty
assertEquals(0, bufferPool.computeBufferData());
@@ -501,56 +509,56 @@ void testWriteMoreThanFlushSize(boolean flushDelay,
boolean enablePiggybacking)
long totalOpCount = metrics.getTotalOpCount();
String keyName = getKeyName();
- OzoneOutputStream key = createKey(client, keyName);
int dataLength = FLUSH_SIZE + 50;
byte[] data1 = RandomUtils.secure().randomBytes(dataLength);
- key.write(data1);
-
- assertEquals(totalOpCount + 3, metrics.getTotalOpCount());
- KeyOutputStream keyOutputStream =
- assertInstanceOf(KeyOutputStream.class, key.getOutputStream());
+ KeyOutputStream keyOutputStream;
+ RatisBlockOutputStream blockOutputStream;
+ try (OzoneOutputStream key = createKey(client, keyName)) {
+ key.write(data1);
- assertEquals(1, keyOutputStream.getStreamEntries().size());
- RatisBlockOutputStream blockOutputStream =
- assertInstanceOf(RatisBlockOutputStream.class,
- keyOutputStream.getStreamEntries().get(0).getOutputStream());
+ assertEquals(totalOpCount + 3, metrics.getTotalOpCount());
+ keyOutputStream =
+ assertInstanceOf(KeyOutputStream.class, key.getOutputStream());
- // we have just written data more than flush Size(2 chunks), at this time
- // buffer pool will have 3 buffers allocated worth of chunk size
-
- assertEquals(3, blockOutputStream.getBufferPool().getSize());
- // writtenDataLength as well flushedDataLength will be updated here
- assertEquals(dataLength, blockOutputStream.getWrittenDataLength());
+ assertEquals(1, keyOutputStream.getStreamEntries().size());
+ blockOutputStream =
+ assertInstanceOf(RatisBlockOutputStream.class,
+ keyOutputStream.getStreamEntries().get(0).getOutputStream());
- assertEquals(FLUSH_SIZE, blockOutputStream.getTotalDataFlushedLength());
- assertEquals(0, blockOutputStream.getTotalAckDataLength());
+ // we have just written data more than flush Size(2 chunks), at this
time
+ // buffer pool will have 3 buffers allocated worth of chunk size
- // Before flush, if there was no pending PutBlock which means it is
complete.
- // It put a commit index into commitIndexMap.
- assertEquals((metrics.getPendingContainerOpCountMetrics(PutBlock) ==
pendingPutBlockCount) ? 1 : 0,
- blockOutputStream.getCommitIndex2flushedDataMap().size());
+ assertEquals(3, blockOutputStream.getBufferPool().getSize());
+ // writtenDataLength as well flushedDataLength will be updated here
+ assertEquals(dataLength, blockOutputStream.getWrittenDataLength());
- key.flush();
- if (flushDelay) {
- // If the flushDelay feature is enabled, nothing happens.
- // The assertions will be as same as those before flush.
assertEquals(FLUSH_SIZE,
blockOutputStream.getTotalDataFlushedLength());
+ assertEquals(0, blockOutputStream.getTotalAckDataLength());
+
+ // Before flush, if there was no pending PutBlock which means it is
complete.
+ // It put a commit index into commitIndexMap.
assertEquals((metrics.getPendingContainerOpCountMetrics(PutBlock) ==
pendingPutBlockCount) ? 1 : 0,
blockOutputStream.getCommitIndex2flushedDataMap().size());
- assertEquals(0, blockOutputStream.getTotalAckDataLength());
- assertEquals(1, keyOutputStream.getStreamEntries().size());
- } else {
- assertEquals(dataLength,
blockOutputStream.getTotalDataFlushedLength());
- assertEquals(0,
blockOutputStream.getCommitIndex2flushedDataMap().size());
-
- assertEquals(dataLength, blockOutputStream.getTotalAckDataLength());
- assertEquals(1, keyOutputStream.getStreamEntries().size());
+ key.flush();
+ if (flushDelay) {
+ // If the flushDelay feature is enabled, nothing happens.
+ // The assertions will be as same as those before flush.
+ assertEquals(FLUSH_SIZE,
blockOutputStream.getTotalDataFlushedLength());
+ assertEquals((metrics.getPendingContainerOpCountMetrics(PutBlock) ==
pendingPutBlockCount) ? 1 : 0,
+ blockOutputStream.getCommitIndex2flushedDataMap().size());
+
+ assertEquals(0, blockOutputStream.getTotalAckDataLength());
+ assertEquals(1, keyOutputStream.getStreamEntries().size());
+ } else {
+ assertEquals(dataLength,
blockOutputStream.getTotalDataFlushedLength());
+ assertEquals(0,
blockOutputStream.getCommitIndex2flushedDataMap().size());
+
+ assertEquals(dataLength, blockOutputStream.getTotalAckDataLength());
+ assertEquals(1, keyOutputStream.getStreamEntries().size());
+ }
}
-
- key.close();
-
assertEquals(pendingWriteChunkCount,
metrics.getPendingContainerOpCountMetrics(WriteChunk));
assertEquals(pendingPutBlockCount,
@@ -590,65 +598,67 @@ void testWriteExactlyMaxFlushSize(boolean flushDelay,
boolean enablePiggybacking
long totalOpCount = metrics.getTotalOpCount();
String keyName = getKeyName();
- OzoneOutputStream key = createKey(client, keyName);
int dataLength = MAX_FLUSH_SIZE;
byte[] data1 = RandomUtils.secure().randomBytes(dataLength);
- key.write(data1);
-
- KeyOutputStream keyOutputStream =
- assertInstanceOf(KeyOutputStream.class, key.getOutputStream());
- RatisBlockOutputStream blockOutputStream =
- assertInstanceOf(RatisBlockOutputStream.class,
- keyOutputStream.getStreamEntries().get(0).getOutputStream());
- BufferPool bufferPool = blockOutputStream.getBufferPool();
- // since it's hitting the full bufferCondition, it will call
watchForCommit
- // however, the outputstream will not wait for watchForCommit, but the
next call to
- // write() will need to wait for at least one watchForCommit, indirectly
when asking for new buffer allocation.
- bufferPool.waitUntilAvailable();
-
- assertThat(metrics.getPendingContainerOpCountMetrics(WriteChunk))
- .isLessThanOrEqualTo(pendingWriteChunkCount + 2);
- assertThat(metrics.getPendingContainerOpCountMetrics(PutBlock))
- .isLessThanOrEqualTo(pendingPutBlockCount + 1);
-
- assertEquals(1, keyOutputStream.getStreamEntries().size());
-
- assertEquals(4, blockOutputStream.getBufferPool().getSize());
- // writtenDataLength as well flushedDataLength will be updated here
- assertEquals(dataLength, blockOutputStream.getWrittenDataLength());
-
- assertEquals(MAX_FLUSH_SIZE,
- blockOutputStream.getTotalDataFlushedLength());
-
- // since data equals to maxBufferSize is written, this will be a blocking
- // call and hence will wait for atleast flushSize worth of data to get
- // ack'd by all servers right here
- assertThat(blockOutputStream.getTotalAckDataLength())
- .isGreaterThanOrEqualTo(FLUSH_SIZE);
-
- // watchForCommit will clean up atleast one entry from the map where each
- // entry corresponds to flushSize worth of data
-
- assertThat(blockOutputStream.getCommitIndex2flushedDataMap().size())
- .isLessThanOrEqualTo(1);
-
- // This will flush the data and update the flush length and the map.
- key.flush();
- assertEquals(1, keyOutputStream.getStreamEntries().size());
- assertEquals(totalOpCount + 6, metrics.getTotalOpCount());
-
- // Since the data in the buffer is already flushed, flush here will have
- // no impact on the counters and data structures
-
- assertEquals(4, blockOutputStream.getBufferPool().getSize());
- assertEquals(dataLength, blockOutputStream.getWrittenDataLength());
-
- assertEquals(dataLength, blockOutputStream.getTotalDataFlushedLength());
- assertThat(blockOutputStream.getCommitIndex2flushedDataMap().size())
- .isLessThanOrEqualTo(1);
-
- // now close the stream, it will update ack length after watchForCommit
- key.close();
+ KeyOutputStream keyOutputStream;
+ RatisBlockOutputStream blockOutputStream;
+ try (OzoneOutputStream key = createKey(client, keyName)) {
+ key.write(data1);
+
+ keyOutputStream =
+ assertInstanceOf(KeyOutputStream.class, key.getOutputStream());
+ blockOutputStream =
+ assertInstanceOf(RatisBlockOutputStream.class,
+ keyOutputStream.getStreamEntries().get(0).getOutputStream());
+ BufferPool bufferPool = blockOutputStream.getBufferPool();
+ // since it's hitting the full bufferCondition, it will call
watchForCommit
+ // however, the outputstream will not wait for watchForCommit, but the
next call to
+ // write() will need to wait for at least one watchForCommit,
indirectly when asking for new buffer allocation.
+ bufferPool.waitUntilAvailable();
+
+ assertThat(metrics.getPendingContainerOpCountMetrics(WriteChunk))
+ .isLessThanOrEqualTo(pendingWriteChunkCount + 2);
+ assertThat(metrics.getPendingContainerOpCountMetrics(PutBlock))
+ .isLessThanOrEqualTo(pendingPutBlockCount + 1);
+
+ assertEquals(1, keyOutputStream.getStreamEntries().size());
+
+ assertEquals(4, blockOutputStream.getBufferPool().getSize());
+ // writtenDataLength as well flushedDataLength will be updated here
+ assertEquals(dataLength, blockOutputStream.getWrittenDataLength());
+
+ assertEquals(MAX_FLUSH_SIZE,
+ blockOutputStream.getTotalDataFlushedLength());
+
+ // since data equals to maxBufferSize is written, this will be a
blocking
+ // call and hence will wait for atleast flushSize worth of data to get
+ // ack'd by all servers right here
+ assertThat(blockOutputStream.getTotalAckDataLength())
+ .isGreaterThanOrEqualTo(FLUSH_SIZE);
+
+ // watchForCommit will clean up atleast one entry from the map where
each
+ // entry corresponds to flushSize worth of data
+
+ assertThat(blockOutputStream.getCommitIndex2flushedDataMap().size())
+ .isLessThanOrEqualTo(1);
+
+ // This will flush the data and update the flush length and the map.
+ key.flush();
+ assertEquals(1, keyOutputStream.getStreamEntries().size());
+ assertEquals(totalOpCount + 6, metrics.getTotalOpCount());
+
+ // Since the data in the buffer is already flushed, flush here will
have
+ // no impact on the counters and data structures
+
+ assertEquals(4, blockOutputStream.getBufferPool().getSize());
+ assertEquals(dataLength, blockOutputStream.getWrittenDataLength());
+
+ assertEquals(dataLength,
blockOutputStream.getTotalDataFlushedLength());
+ assertThat(blockOutputStream.getCommitIndex2flushedDataMap().size())
+ .isLessThanOrEqualTo(1);
+
+ // now close the stream, it will update ack length after watchForCommit
+ }
assertEquals(pendingWriteChunkCount,
metrics.getPendingContainerOpCountMetrics(WriteChunk));
assertEquals(pendingPutBlockCount,
@@ -684,71 +694,73 @@ void testWriteMoreThanMaxFlushSize(boolean flushDelay,
boolean enablePiggybackin
metrics.getPendingContainerOpCountMetrics(PutBlock);
long totalOpCount = metrics.getTotalOpCount();
String keyName = getKeyName();
- OzoneOutputStream key = createKey(client, keyName);
int dataLength = MAX_FLUSH_SIZE + 50;
// write data more than 1 chunk
byte[] data1 = RandomUtils.secure().randomBytes(dataLength);
- key.write(data1);
- KeyOutputStream keyOutputStream =
- assertInstanceOf(KeyOutputStream.class, key.getOutputStream());
-
- // since it's hitting full-buffer, it will call watchForCommit
- // and completes putBlock at least for first flushSize worth of data
- assertThat(metrics.getPendingContainerOpCountMetrics(WriteChunk))
- .isLessThanOrEqualTo(pendingWriteChunkCount + 2);
- assertThat(metrics.getPendingContainerOpCountMetrics(PutBlock))
- .isLessThanOrEqualTo(pendingPutBlockCount + 1);
- assertEquals(writeChunkCount + 4,
- metrics.getContainerOpCountMetrics(WriteChunk));
- assertEquals(putBlockCount + 2,
- metrics.getContainerOpCountMetrics(PutBlock));
- assertEquals(totalOpCount + 6, metrics.getTotalOpCount());
- assertEquals(1, keyOutputStream.getStreamEntries().size());
- RatisBlockOutputStream blockOutputStream =
- assertInstanceOf(RatisBlockOutputStream.class,
- keyOutputStream.getStreamEntries().get(0).getOutputStream());
-
- assertThat(blockOutputStream.getBufferPool().getSize())
- .isLessThanOrEqualTo(4);
- // writtenDataLength as well flushedDataLength will be updated here
- assertEquals(dataLength, blockOutputStream.getWrittenDataLength());
-
- assertEquals(MAX_FLUSH_SIZE,
- blockOutputStream.getTotalDataFlushedLength());
-
- // since data equals to maxBufferSize is written, this will be a blocking
- // call and hence will wait for atleast flushSize worth of data to get
- // ack'd by all servers right here
- assertThat(blockOutputStream.getTotalAckDataLength())
- .isGreaterThanOrEqualTo(FLUSH_SIZE);
-
- // watchForCommit will clean up atleast one entry from the map where each
- // entry corresponds to flushSize worth of data
- assertThat(blockOutputStream.getCommitIndex2flushedDataMap().size())
- .isLessThanOrEqualTo(1);
-
- // Now do a flush.
- key.flush();
- assertEquals(1, keyOutputStream.getStreamEntries().size());
- assertEquals(pendingWriteChunkCount,
- metrics.getPendingContainerOpCountMetrics(WriteChunk));
- assertEquals(pendingPutBlockCount,
- metrics.getPendingContainerOpCountMetrics(PutBlock));
+ KeyOutputStream keyOutputStream;
+ RatisBlockOutputStream blockOutputStream;
+ try (OzoneOutputStream key = createKey(client, keyName)) {
+ key.write(data1);
+ keyOutputStream =
+ assertInstanceOf(KeyOutputStream.class, key.getOutputStream());
+
+ // since it's hitting full-buffer, it will call watchForCommit
+ // and completes putBlock at least for first flushSize worth of data
+ assertThat(metrics.getPendingContainerOpCountMetrics(WriteChunk))
+ .isLessThanOrEqualTo(pendingWriteChunkCount + 2);
+ assertThat(metrics.getPendingContainerOpCountMetrics(PutBlock))
+ .isLessThanOrEqualTo(pendingPutBlockCount + 1);
+ assertEquals(writeChunkCount + 4,
+ metrics.getContainerOpCountMetrics(WriteChunk));
+ assertEquals(putBlockCount + 2,
+ metrics.getContainerOpCountMetrics(PutBlock));
+ assertEquals(totalOpCount + 6, metrics.getTotalOpCount());
+ assertEquals(1, keyOutputStream.getStreamEntries().size());
+ blockOutputStream =
+ assertInstanceOf(RatisBlockOutputStream.class,
+ keyOutputStream.getStreamEntries().get(0).getOutputStream());
+
+ assertThat(blockOutputStream.getBufferPool().getSize())
+ .isLessThanOrEqualTo(4);
+ // writtenDataLength as well flushedDataLength will be updated here
+ assertEquals(dataLength, blockOutputStream.getWrittenDataLength());
+
+ assertEquals(MAX_FLUSH_SIZE,
+ blockOutputStream.getTotalDataFlushedLength());
+
+ // since data equals to maxBufferSize is written, this will be a
blocking
+ // call and hence will wait for atleast flushSize worth of data to get
+ // ack'd by all servers right here
+ assertThat(blockOutputStream.getTotalAckDataLength())
+ .isGreaterThanOrEqualTo(FLUSH_SIZE);
+
+ // watchForCommit will clean up atleast one entry from the map where
each
+ // entry corresponds to flushSize worth of data
+ assertThat(blockOutputStream.getCommitIndex2flushedDataMap().size())
+ .isLessThanOrEqualTo(1);
+
+ // Now do a flush.
+ key.flush();
+ assertEquals(1, keyOutputStream.getStreamEntries().size());
+ assertEquals(pendingWriteChunkCount,
+ metrics.getPendingContainerOpCountMetrics(WriteChunk));
+ assertEquals(pendingPutBlockCount,
+ metrics.getPendingContainerOpCountMetrics(PutBlock));
- // Since the data in the buffer is already flushed, flush here will have
- // no impact on the counters and data structures
+ // Since the data in the buffer is already flushed, flush here will
have
+ // no impact on the counters and data structures
- assertThat(blockOutputStream.getBufferPool().getSize())
- .isLessThanOrEqualTo(4);
- assertEquals(dataLength, blockOutputStream.getWrittenDataLength());
- // dataLength > MAX_FLUSH_SIZE
- assertEquals(flushDelay ? MAX_FLUSH_SIZE : dataLength,
- blockOutputStream.getTotalDataFlushedLength());
- assertThat(blockOutputStream.getCommitIndex2flushedDataMap().size())
- .isLessThanOrEqualTo(2);
+ assertThat(blockOutputStream.getBufferPool().getSize())
+ .isLessThanOrEqualTo(4);
+ assertEquals(dataLength, blockOutputStream.getWrittenDataLength());
+ // dataLength > MAX_FLUSH_SIZE
+ assertEquals(flushDelay ? MAX_FLUSH_SIZE : dataLength,
+ blockOutputStream.getTotalDataFlushedLength());
+ assertThat(blockOutputStream.getCommitIndex2flushedDataMap().size())
+ .isLessThanOrEqualTo(2);
- // now close the stream, it will update ack length after watchForCommit
- key.close();
+ // now close the stream, it will update ack length after watchForCommit
+ }
assertEquals(dataLength, blockOutputStream.getTotalAckDataLength());
// make sure the bufferPool is empty
assertEquals(0, blockOutputStream.getBufferPool().computeBufferData());
diff --git
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStreamWithFailures.java
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStreamWithFailures.java
index ff07b599a9c..70e15f9b677 100644
---
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStreamWithFailures.java
+++
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStreamWithFailures.java
@@ -107,116 +107,27 @@ void testContainerClose(boolean flushDelay, boolean
enablePiggybacking) throws E
private void testWatchForCommitWithCloseContainerException(OzoneClient
client)
throws Exception {
String keyName = getKeyName();
- OzoneOutputStream key = createKey(client, keyName);
int dataLength = MAX_FLUSH_SIZE + CHUNK_SIZE;
byte[] data1 = RandomUtils.secure().randomBytes(dataLength);
- key.write(data1);
-
- KeyOutputStream keyOutputStream =
- assertInstanceOf(KeyOutputStream.class, key.getOutputStream());
-
- assertEquals(1, keyOutputStream.getStreamEntries().size());
- RatisBlockOutputStream blockOutputStream =
- assertInstanceOf(RatisBlockOutputStream.class,
- keyOutputStream.getStreamEntries().get(0).getOutputStream());
-
- // we have just written data more than flush Size(2 chunks), at this time
- // buffer pool will have 4 buffers allocated worth of chunk size
-
- assertEquals(4, blockOutputStream.getBufferPool().getSize());
- // writtenDataLength as well flushedDataLength will be updated here
- assertEquals(dataLength, blockOutputStream.getWrittenDataLength());
-
- assertEquals(MAX_FLUSH_SIZE,
blockOutputStream.getTotalDataFlushedLength());
-
- // since data equals to maxBufferSize is written, this will be a blocking
- // call and hence will wait for atleast flushSize worth of data to get
- // ack'd by all servers right here
- assertThat(blockOutputStream.getTotalAckDataLength())
- .isGreaterThanOrEqualTo(FLUSH_SIZE);
-
- // watchForCommit will clean up atleast one entry from the map where each
- // entry corresponds to flushSize worth of data
- assertThat(blockOutputStream.getCommitIndex2flushedDataMap().size())
- .isLessThanOrEqualTo(1);
-
- // This will flush the data and update the flush length and the map.
- key.flush();
-
- // flush is a sync call, all pending operations will complete
- // Since the data in the buffer is already flushed, flush here will have
- // no impact on the counters and data structures
-
- assertEquals(4, blockOutputStream.getBufferPool().getSize());
- assertEquals(dataLength, blockOutputStream.getWrittenDataLength());
-
- assertEquals(dataLength, blockOutputStream.getTotalDataFlushedLength());
- // flush will make sure one more entry gets updated in the map
- assertThat(blockOutputStream.getCommitIndex2flushedDataMap().size())
- .isLessThanOrEqualTo(2);
-
- XceiverClientRatis raftClient =
- (XceiverClientRatis) blockOutputStream.getXceiverClient();
- assertEquals(3, raftClient.getCommitInfoMap().size());
- // Close the containers on the Datanode and write more data
- TestHelper.waitForContainerClose(key, cluster);
- key.write(data1);
-
- // As a part of handling the exception, 4 failed writeChunks will be
- // rewritten plus one partial chunk plus two putBlocks for flushSize
- // and one flush for partial chunk
- key.flush();
- assertEquals(2, keyOutputStream.getStreamEntries().size());
- assertInstanceOf(ContainerNotOpenException.class,
- checkForException(blockOutputStream.getIoException()));
-
- // Make sure the retryCount is reset after the exception is handled
- assertEquals(0, keyOutputStream.getRetryCount());
- // commitInfoMap will remain intact as there is no server failure
- assertEquals(3, raftClient.getCommitInfoMap().size());
- // now close the stream, It will update ack length after watchForCommit
- key.close();
- // make sure the bufferPool is empty
- assertEquals(0, blockOutputStream.getBufferPool().computeBufferData());
- assertEquals(dataLength, blockOutputStream.getTotalAckDataLength());
- assertEquals(0, blockOutputStream.getCommitIndex2flushedDataMap().size());
- assertEquals(0, keyOutputStream.getStreamEntries().size());
- // Written the same data twice
- byte[] bytes = ArrayUtils.addAll(data1, data1);
- validateData(keyName, bytes, client.getObjectStore(), VOLUME, BUCKET);
- }
-
- @ParameterizedTest
- @MethodSource("clientParameters")
- void testWatchForCommitDatanodeFailure(boolean flushDelay, boolean
enablePiggybacking) throws Exception {
- OzoneClientConfig config = newClientConfig(cluster.getConf(), flushDelay,
enablePiggybacking);
- try (OzoneClient client = newClient(cluster.getConf(), config)) {
- String keyName = getKeyName();
- OzoneOutputStream key = createKey(client, keyName);
- int dataLength = MAX_FLUSH_SIZE + CHUNK_SIZE;
- byte[] data1 = RandomUtils.secure().randomBytes(dataLength);
+ KeyOutputStream keyOutputStream;
+ RatisBlockOutputStream blockOutputStream;
+ try (OzoneOutputStream key = createKey(client, keyName)) {
key.write(data1);
- // since its hitting the full bufferCondition, it will call
watchForCommit
- // and completes at least putBlock for first flushSize worth of data
- KeyOutputStream keyOutputStream =
- assertInstanceOf(KeyOutputStream.class, key.getOutputStream());
+
+ keyOutputStream = assertInstanceOf(KeyOutputStream.class,
key.getOutputStream());
assertEquals(1, keyOutputStream.getStreamEntries().size());
- RatisBlockOutputStream blockOutputStream =
- assertInstanceOf(RatisBlockOutputStream.class,
- keyOutputStream.getStreamEntries().get(0).getOutputStream());
+ blockOutputStream = assertInstanceOf(RatisBlockOutputStream.class,
+ keyOutputStream.getStreamEntries().get(0).getOutputStream());
// we have just written data more than flush Size(2 chunks), at this time
- // buffer pool will have 3 buffers allocated worth of chunk size
+ // buffer pool will have 4 buffers allocated worth of chunk size
assertEquals(4, blockOutputStream.getBufferPool().getSize());
// writtenDataLength as well flushedDataLength will be updated here
assertEquals(dataLength, blockOutputStream.getWrittenDataLength());
- // since data written is still less than flushLength, flushLength will
- // still be 0.
- assertEquals(MAX_FLUSH_SIZE,
- blockOutputStream.getTotalDataFlushedLength());
+ assertEquals(MAX_FLUSH_SIZE,
blockOutputStream.getTotalDataFlushedLength());
// since data equals to maxBufferSize is written, this will be a blocking
// call and hence will wait for atleast flushSize worth of data to get
@@ -224,14 +135,15 @@ void testWatchForCommitDatanodeFailure(boolean
flushDelay, boolean enablePiggyba
assertThat(blockOutputStream.getTotalAckDataLength())
.isGreaterThanOrEqualTo(FLUSH_SIZE);
- // watchForCommit will clean up atleast flushSize worth of data buffer
- // where each entry corresponds to flushSize worth of data
+ // watchForCommit will clean up atleast one entry from the map where each
+ // entry corresponds to flushSize worth of data
assertThat(blockOutputStream.getCommitIndex2flushedDataMap().size())
- .isLessThanOrEqualTo(2);
+ .isLessThanOrEqualTo(1);
// This will flush the data and update the flush length and the map.
key.flush();
+ // flush is a sync call, all pending operations will complete
// Since the data in the buffer is already flushed, flush here will have
// no impact on the counters and data structures
@@ -239,127 +151,215 @@ void testWatchForCommitDatanodeFailure(boolean
flushDelay, boolean enablePiggyba
assertEquals(dataLength, blockOutputStream.getWrittenDataLength());
assertEquals(dataLength, blockOutputStream.getTotalDataFlushedLength());
- // flush will make sure one more entry gets updated in the map
- assertEquals(0,
blockOutputStream.getCommitIndex2flushedDataMap().size());
+ // flush will make sure one more entry gets updated in the map
+ assertThat(blockOutputStream.getCommitIndex2flushedDataMap().size())
+ .isLessThanOrEqualTo(2);
XceiverClientRatis raftClient =
(XceiverClientRatis) blockOutputStream.getXceiverClient();
assertEquals(3, raftClient.getCommitInfoMap().size());
- Pipeline pipeline = raftClient.getPipeline();
- stopAndRemove(pipeline.getNodes().get(0));
-
- // again write data with more than max buffer limit. This will call
- // watchForCommit again. Since the commit will happen 2 way, the
- // commitInfoMap will get updated for servers which are alive
+ // Close the containers on the Datanode and write more data
+ TestHelper.waitForContainerClose(key, cluster);
key.write(data1);
+ // As a part of handling the exception, 4 failed writeChunks will be
+ // rewritten plus one partial chunk plus two putBlocks for flushSize
+ // and one flush for partial chunk
key.flush();
-
assertEquals(2, keyOutputStream.getStreamEntries().size());
- // now close the stream, It will update ack length after watchForCommit
- key.close();
+ assertInstanceOf(ContainerNotOpenException.class,
+ checkForException(blockOutputStream.getIoException()));
+
// Make sure the retryCount is reset after the exception is handled
assertEquals(0, keyOutputStream.getRetryCount());
- // make sure the bufferPool is empty
- assertEquals(0, blockOutputStream.getBufferPool().computeBufferData());
- assertEquals(0,
blockOutputStream.getCommitIndex2flushedDataMap().size());
- assertEquals(0, keyOutputStream.getStreamEntries().size());
- // Written the same data twice
- byte[] bytes = ArrayUtils.addAll(data1, data1);
- validateData(keyName, bytes, client.getObjectStore(), VOLUME, BUCKET);
+ // commitInfoMap will remain intact as there is no server failure
+ assertEquals(3, raftClient.getCommitInfoMap().size());
+ // now close the stream, It will update ack length after watchForCommit
}
+ // make sure the bufferPool is empty
+ assertEquals(0, blockOutputStream.getBufferPool().computeBufferData());
+ assertEquals(dataLength, blockOutputStream.getTotalAckDataLength());
+ assertEquals(0, blockOutputStream.getCommitIndex2flushedDataMap().size());
+ assertEquals(0, keyOutputStream.getStreamEntries().size());
+ // Written the same data twice
+ byte[] bytes = ArrayUtils.addAll(data1, data1);
+ validateData(keyName, bytes, client.getObjectStore(), VOLUME, BUCKET);
}
@ParameterizedTest
@MethodSource("clientParameters")
- void test2DatanodesFailure(boolean flushDelay, boolean enablePiggybacking)
throws Exception {
+ void testWatchForCommitDatanodeFailure(boolean flushDelay, boolean
enablePiggybacking) throws Exception {
OzoneClientConfig config = newClientConfig(cluster.getConf(), flushDelay,
enablePiggybacking);
try (OzoneClient client = newClient(cluster.getConf(), config)) {
String keyName = getKeyName();
- OzoneOutputStream key = createKey(client, keyName);
int dataLength = MAX_FLUSH_SIZE + CHUNK_SIZE;
byte[] data1 = RandomUtils.secure().randomBytes(dataLength);
- key.write(data1);
- // since its hitting the full bufferCondition, it will call
watchForCommit
- // and completes atleast putBlock for first flushSize worth of data
- KeyOutputStream keyOutputStream =
- assertInstanceOf(KeyOutputStream.class, key.getOutputStream());
+ KeyOutputStream keyOutputStream;
+ RatisBlockOutputStream blockOutputStream;
+ try (OzoneOutputStream key = createKey(client, keyName)) {
+ key.write(data1);
+ // since its hitting the full bufferCondition, it will call
watchForCommit
+ // and completes at least putBlock for first flushSize worth of data
+ keyOutputStream = assertInstanceOf(KeyOutputStream.class,
key.getOutputStream());
+
+ assertEquals(1, keyOutputStream.getStreamEntries().size());
+ blockOutputStream = assertInstanceOf(RatisBlockOutputStream.class,
+ keyOutputStream.getStreamEntries().get(0).getOutputStream());
- RatisBlockOutputStream blockOutputStream =
- assertInstanceOf(RatisBlockOutputStream.class,
- keyOutputStream.getStreamEntries().get(0).getOutputStream());
+ // we have just written data more than flush Size(2 chunks), at this
time
+ // buffer pool will have 3 buffers allocated worth of chunk size
- // we have just written data more than flush Size(2 chunks), at this time
- // buffer pool will have 3 buffers allocated worth of chunk size
+ assertEquals(4, blockOutputStream.getBufferPool().getSize());
+ // writtenDataLength as well flushedDataLength will be updated here
+ assertEquals(dataLength, blockOutputStream.getWrittenDataLength());
- assertEquals(4, blockOutputStream.getBufferPool().getSize());
- // writtenDataLength as well flushedDataLength will be updated here
- assertEquals(dataLength, blockOutputStream.getWrittenDataLength());
+ // since data written is still less than flushLength, flushLength will
+ // still be 0.
+ assertEquals(MAX_FLUSH_SIZE,
+ blockOutputStream.getTotalDataFlushedLength());
- assertEquals(MAX_FLUSH_SIZE,
- blockOutputStream.getTotalDataFlushedLength());
+ // since data equals to maxBufferSize is written, this will be a
blocking
+ // call and hence will wait for atleast flushSize worth of data to get
+ // ack'd by all servers right here
+ assertThat(blockOutputStream.getTotalAckDataLength())
+ .isGreaterThanOrEqualTo(FLUSH_SIZE);
- // since data equals to maxBufferSize is written, this will be a blocking
- // call and hence will wait for atleast flushSize worth of data to get
- // acked by all servers right here
- assertThat(blockOutputStream.getTotalAckDataLength())
- .isGreaterThanOrEqualTo(FLUSH_SIZE);
+ // watchForCommit will clean up atleast flushSize worth of data buffer
+ // where each entry corresponds to flushSize worth of data
+ assertThat(blockOutputStream.getCommitIndex2flushedDataMap().size())
+ .isLessThanOrEqualTo(2);
- // watchForCommit will clean up atleast one entry from the map where each
- // entry corresponds to flushSize worth of data
- assertThat(blockOutputStream.getCommitIndex2flushedDataMap().size())
- .isLessThanOrEqualTo(1);
+ // This will flush the data and update the flush length and the map.
+ key.flush();
- // This will flush the data and update the flush length and the map.
- key.flush();
-
- // Since the data in the buffer is already flushed, flush here will have
- // no impact on the counters and data structures
+ // Since the data in the buffer is already flushed, flush here will
have
+ // no impact on the counters and data structures
- assertEquals(4, blockOutputStream.getBufferPool().getSize());
- assertEquals(dataLength, blockOutputStream.getWrittenDataLength());
+ assertEquals(4, blockOutputStream.getBufferPool().getSize());
+ assertEquals(dataLength, blockOutputStream.getWrittenDataLength());
- assertEquals(dataLength, blockOutputStream.getTotalDataFlushedLength());
- // flush will make sure one more entry gets updated in the map
- assertThat(blockOutputStream.getCommitIndex2flushedDataMap().size())
- .isLessThanOrEqualTo(2);
+ assertEquals(dataLength,
blockOutputStream.getTotalDataFlushedLength());
+ // flush will make sure one more entry gets updated in the map
+ assertEquals(0,
blockOutputStream.getCommitIndex2flushedDataMap().size());
- XceiverClientRatis raftClient =
- (XceiverClientRatis) blockOutputStream.getXceiverClient();
- assertEquals(3, raftClient.getCommitInfoMap().size());
- Pipeline pipeline = raftClient.getPipeline();
- stopAndRemove(pipeline.getNodes().get(0));
- stopAndRemove(pipeline.getNodes().get(1));
- // again write data with more than max buffer limit. This will call
- // watchForCommit again. Since the commit will happen 2 way, the
- // commitInfoMap will get updated for servers which are alive
+ XceiverClientRatis raftClient =
+ (XceiverClientRatis) blockOutputStream.getXceiverClient();
+ assertEquals(3, raftClient.getCommitInfoMap().size());
+ Pipeline pipeline = raftClient.getPipeline();
+ stopAndRemove(pipeline.getNodes().get(0));
- // 4 writeChunks = maxFlushSize + 2 putBlocks will be discarded here
- // once exception is hit
- key.write(data1);
+ // again write data with more than max buffer limit. This will call
+ // watchForCommit again. Since the commit will happen 2 way, the
+ // commitInfoMap will get updated for servers which are alive
+ key.write(data1);
- // As a part of handling the exception, 4 failed writeChunks will be
- // rewritten plus one partial chunk plus two putBlocks for flushSize
- // and one flush for partial chunk
- key.flush();
+ key.flush();
- Throwable ioException = checkForException(
- blockOutputStream.getIoException());
- // Since, 2 datanodes went down,
- // a) if the pipeline gets destroyed quickly it will hit
- // GroupMismatchException.
- // b) will hit close container exception if the container is closed
- // but pipeline is still not destroyed.
- // c) will fail with RaftRetryFailureException if the leader election
- // did not finish before the request retry count finishes.
- assertTrue(ioException instanceof RaftRetryFailureException
- || ioException instanceof GroupMismatchException
- || ioException instanceof ContainerNotOpenException);
+ assertEquals(2, keyOutputStream.getStreamEntries().size());
+ // now close the stream, It will update ack length after watchForCommit
+ }
// Make sure the retryCount is reset after the exception is handled
assertEquals(0, keyOutputStream.getRetryCount());
- // now close the stream, It will update ack length after watchForCommit
+ // make sure the bufferPool is empty
+ assertEquals(0, blockOutputStream.getBufferPool().computeBufferData());
+ assertEquals(0,
blockOutputStream.getCommitIndex2flushedDataMap().size());
+ assertEquals(0, keyOutputStream.getStreamEntries().size());
+ // Written the same data twice
+ byte[] bytes = ArrayUtils.addAll(data1, data1);
+ validateData(keyName, bytes, client.getObjectStore(), VOLUME, BUCKET);
+ }
+ }
- key.close();
+ @ParameterizedTest
+ @MethodSource("clientParameters")
+ void test2DatanodesFailure(boolean flushDelay, boolean enablePiggybacking)
throws Exception {
+ OzoneClientConfig config = newClientConfig(cluster.getConf(), flushDelay,
enablePiggybacking);
+ try (OzoneClient client = newClient(cluster.getConf(), config)) {
+ String keyName = getKeyName();
+ int dataLength = MAX_FLUSH_SIZE + CHUNK_SIZE;
+ byte[] data1 = RandomUtils.secure().randomBytes(dataLength);
+ KeyOutputStream keyOutputStream;
+ RatisBlockOutputStream blockOutputStream;
+ try (OzoneOutputStream key = createKey(client, keyName)) {
+ key.write(data1);
+ // since its hitting the full bufferCondition, it will call
watchForCommit
+ // and completes atleast putBlock for first flushSize worth of data
+ keyOutputStream = assertInstanceOf(KeyOutputStream.class,
key.getOutputStream());
+
+ blockOutputStream = assertInstanceOf(RatisBlockOutputStream.class,
+ keyOutputStream.getStreamEntries().get(0).getOutputStream());
+
+ // we have just written data more than flush Size(2 chunks), at this
time
+ // buffer pool will have 3 buffers allocated worth of chunk size
+
+ assertEquals(4, blockOutputStream.getBufferPool().getSize());
+ // writtenDataLength as well flushedDataLength will be updated here
+ assertEquals(dataLength, blockOutputStream.getWrittenDataLength());
+
+ assertEquals(MAX_FLUSH_SIZE,
+ blockOutputStream.getTotalDataFlushedLength());
+
+ // since data equals to maxBufferSize is written, this will be a
blocking
+ // call and hence will wait for atleast flushSize worth of data to get
+ // acked by all servers right here
+ assertThat(blockOutputStream.getTotalAckDataLength())
+ .isGreaterThanOrEqualTo(FLUSH_SIZE);
+
+ // watchForCommit will clean up atleast one entry from the map where
each
+ // entry corresponds to flushSize worth of data
+ assertThat(blockOutputStream.getCommitIndex2flushedDataMap().size())
+ .isLessThanOrEqualTo(1);
+
+ // This will flush the data and update the flush length and the map.
+ key.flush();
+
+ // Since the data in the buffer is already flushed, flush here will
have
+ // no impact on the counters and data structures
+
+ assertEquals(4, blockOutputStream.getBufferPool().getSize());
+ assertEquals(dataLength, blockOutputStream.getWrittenDataLength());
+
+ assertEquals(dataLength,
blockOutputStream.getTotalDataFlushedLength());
+ // flush will make sure one more entry gets updated in the map
+ assertThat(blockOutputStream.getCommitIndex2flushedDataMap().size())
+ .isLessThanOrEqualTo(2);
+
+ XceiverClientRatis raftClient =
+ (XceiverClientRatis) blockOutputStream.getXceiverClient();
+ assertEquals(3, raftClient.getCommitInfoMap().size());
+ Pipeline pipeline = raftClient.getPipeline();
+ stopAndRemove(pipeline.getNodes().get(0));
+ stopAndRemove(pipeline.getNodes().get(1));
+ // again write data with more than max buffer limit. This will call
+ // watchForCommit again. Since the commit will happen 2 way, the
+ // commitInfoMap will get updated for servers which are alive
+
+ // 4 writeChunks = maxFlushSize + 2 putBlocks will be discarded here
+ // once exception is hit
+ key.write(data1);
+
+ // As a part of handling the exception, 4 failed writeChunks will be
+ // rewritten plus one partial chunk plus two putBlocks for flushSize
+ // and one flush for partial chunk
+ key.flush();
+
+ Throwable ioException = checkForException(
+ blockOutputStream.getIoException());
+ // Since, 2 datanodes went down,
+ // a) if the pipeline gets destroyed quickly it will hit
+ // GroupMismatchException.
+ // b) will hit close container exception if the container is closed
+ // but pipeline is still not destroyed.
+ // c) will fail with RaftRetryFailureException if the leader election
+ // did not finish before the request retry count finishes.
+ assertTrue(ioException instanceof RaftRetryFailureException
+ || ioException instanceof GroupMismatchException
+ || ioException instanceof ContainerNotOpenException);
+ // Make sure the retryCount is reset after the exception is handled
+ assertEquals(0, keyOutputStream.getRetryCount());
+ // now close the stream, It will update ack length after watchForCommit
+
+ }
assertEquals(0,
blockOutputStream.getCommitIndex2flushedDataMap().size());
assertEquals(dataLength, blockOutputStream.getTotalAckDataLength());
// make sure the bufferPool is empty
@@ -372,51 +372,51 @@ void test2DatanodesFailure(boolean flushDelay, boolean
enablePiggybacking) throw
private void testWriteMoreThanMaxFlushSize(OzoneClient client)
throws Exception {
String keyName = getKeyName();
- OzoneOutputStream key = createKey(client, keyName);
int dataLength = MAX_FLUSH_SIZE + CHUNK_SIZE;
byte[] data1 = RandomUtils.secure().randomBytes(dataLength);
- key.write(data1);
+ KeyOutputStream keyOutputStream;
+ RatisBlockOutputStream blockOutputStream;
+ try (OzoneOutputStream key = createKey(client, keyName)) {
+ key.write(data1);
- KeyOutputStream keyOutputStream =
- assertInstanceOf(KeyOutputStream.class, key.getOutputStream());
+ keyOutputStream = assertInstanceOf(KeyOutputStream.class,
key.getOutputStream());
- assertEquals(1, keyOutputStream.getStreamEntries().size());
- RatisBlockOutputStream blockOutputStream =
- assertInstanceOf(RatisBlockOutputStream.class,
- keyOutputStream.getStreamEntries().get(0).getOutputStream());
+ assertEquals(1, keyOutputStream.getStreamEntries().size());
+ blockOutputStream = assertInstanceOf(RatisBlockOutputStream.class,
+ keyOutputStream.getStreamEntries().get(0).getOutputStream());
- assertThat(blockOutputStream.getBufferPool().getSize())
- .isLessThanOrEqualTo(4);
- assertEquals(dataLength, blockOutputStream.getWrittenDataLength());
+ assertThat(blockOutputStream.getBufferPool().getSize())
+ .isLessThanOrEqualTo(4);
+ assertEquals(dataLength, blockOutputStream.getWrittenDataLength());
- assertEquals(400, blockOutputStream.getTotalDataFlushedLength());
+ assertEquals(400, blockOutputStream.getTotalDataFlushedLength());
- // This will flush the data and update the flush length and the map.
- key.flush();
+ // This will flush the data and update the flush length and the map.
+ key.flush();
- assertEquals(dataLength, blockOutputStream.getTotalDataFlushedLength());
+ assertEquals(dataLength, blockOutputStream.getTotalDataFlushedLength());
- XceiverClientRatis raftClient =
- (XceiverClientRatis) blockOutputStream.getXceiverClient();
- assertEquals(3, raftClient.getCommitInfoMap().size());
- // Close the containers on the Datanode and write more data
- TestHelper.waitForContainerClose(key, cluster);
- key.write(data1);
+ XceiverClientRatis raftClient =
+ (XceiverClientRatis) blockOutputStream.getXceiverClient();
+ assertEquals(3, raftClient.getCommitInfoMap().size());
+ // Close the containers on the Datanode and write more data
+ TestHelper.waitForContainerClose(key, cluster);
+ key.write(data1);
- // As a part of handling the exception, 2 failed writeChunks will be
- // rewritten plus 1 putBlocks for flush
- // and one flush for partial chunk
- key.flush();
+ // As a part of handling the exception, 2 failed writeChunks will be
+ // rewritten plus 1 putBlocks for flush
+ // and one flush for partial chunk
+ key.flush();
- assertInstanceOf(ContainerNotOpenException.class,
- checkForException(blockOutputStream.getIoException()));
- // Make sure the retryCount is reset after the exception is handled
- assertEquals(0, keyOutputStream.getRetryCount());
+ assertInstanceOf(ContainerNotOpenException.class,
+ checkForException(blockOutputStream.getIoException()));
+ // Make sure the retryCount is reset after the exception is handled
+ assertEquals(0, keyOutputStream.getRetryCount());
- // commitInfoMap will remain intact as there is no server failure
- assertEquals(3, raftClient.getCommitInfoMap().size());
- // now close the stream, It will update ack length after watchForCommit
- key.close();
+ // commitInfoMap will remain intact as there is no server failure
+ assertEquals(3, raftClient.getCommitInfoMap().size());
+ // now close the stream, It will update ack length after watchForCommit
+ }
// make sure the bufferPool is empty
assertEquals(0, blockOutputStream.getBufferPool().computeBufferData());
assertEquals(dataLength, blockOutputStream.getTotalAckDataLength());
@@ -429,52 +429,52 @@ private void testWriteMoreThanMaxFlushSize(OzoneClient
client)
private void testExceptionDuringClose(OzoneClient client) throws Exception {
String keyName = getKeyName();
- OzoneOutputStream key = createKey(client, keyName);
int dataLength = 167;
byte[] data1 = RandomUtils.secure().randomBytes(dataLength);
- key.write(data1);
+ KeyOutputStream keyOutputStream;
+ RatisBlockOutputStream blockOutputStream;
+ try (OzoneOutputStream key = createKey(client, keyName)) {
+ key.write(data1);
- KeyOutputStream keyOutputStream =
- assertInstanceOf(KeyOutputStream.class, key.getOutputStream());
+ keyOutputStream = assertInstanceOf(KeyOutputStream.class,
key.getOutputStream());
- assertEquals(1, keyOutputStream.getStreamEntries().size());
- RatisBlockOutputStream blockOutputStream =
- assertInstanceOf(RatisBlockOutputStream.class,
- keyOutputStream.getStreamEntries().get(0).getOutputStream());
+ assertEquals(1, keyOutputStream.getStreamEntries().size());
+ blockOutputStream = assertInstanceOf(RatisBlockOutputStream.class,
+ keyOutputStream.getStreamEntries().get(0).getOutputStream());
- assertThat(blockOutputStream.getBufferPool().getSize())
- .isLessThanOrEqualTo(2);
- assertEquals(dataLength, blockOutputStream.getWrittenDataLength());
+ assertThat(blockOutputStream.getBufferPool().getSize())
+ .isLessThanOrEqualTo(2);
+ assertEquals(dataLength, blockOutputStream.getWrittenDataLength());
- assertEquals(0, blockOutputStream.getTotalDataFlushedLength());
+ assertEquals(0, blockOutputStream.getTotalDataFlushedLength());
- assertEquals(0, blockOutputStream.getTotalAckDataLength());
+ assertEquals(0, blockOutputStream.getTotalAckDataLength());
- assertEquals(0, blockOutputStream.getCommitIndex2flushedDataMap().size());
- // This will flush the data and update the flush length and the map.
- key.flush();
- // Since the data in the buffer is already flushed, flush here will have
- // no impact on the counters and data structures
+ assertEquals(0,
blockOutputStream.getCommitIndex2flushedDataMap().size());
+ // This will flush the data and update the flush length and the map.
+ key.flush();
+ // Since the data in the buffer is already flushed, flush here will have
+ // no impact on the counters and data structures
- assertThat(blockOutputStream.getBufferPool().getSize())
- .isLessThanOrEqualTo(2);
- assertEquals(dataLength, blockOutputStream.getWrittenDataLength());
+ assertThat(blockOutputStream.getBufferPool().getSize())
+ .isLessThanOrEqualTo(2);
+ assertEquals(dataLength, blockOutputStream.getWrittenDataLength());
- assertEquals(dataLength, blockOutputStream.getTotalDataFlushedLength());
- // flush will make sure one more entry gets updated in the map
- assertEquals(0, blockOutputStream.getCommitIndex2flushedDataMap().size());
+ assertEquals(dataLength, blockOutputStream.getTotalDataFlushedLength());
+ // flush will make sure one more entry gets updated in the map
+ assertEquals(0,
blockOutputStream.getCommitIndex2flushedDataMap().size());
- XceiverClientRatis raftClient =
- (XceiverClientRatis) blockOutputStream.getXceiverClient();
- assertEquals(3, raftClient.getCommitInfoMap().size());
- // Close the containers on the Datanode and write more data
- TestHelper.waitForContainerClose(key, cluster);
- key.write(data1);
+ XceiverClientRatis raftClient =
+ (XceiverClientRatis) blockOutputStream.getXceiverClient();
+ assertEquals(3, raftClient.getCommitInfoMap().size());
+ // Close the containers on the Datanode and write more data
+ TestHelper.waitForContainerClose(key, cluster);
+ key.write(data1);
- // commitInfoMap will remain intact as there is no server failure
- assertEquals(3, raftClient.getCommitInfoMap().size());
- // now close the stream, It will hit exception
- key.close();
+ // commitInfoMap will remain intact as there is no server failure
+ assertEquals(3, raftClient.getCommitInfoMap().size());
+ // now close the stream, It will hit exception
+ }
assertInstanceOf(ContainerNotOpenException.class,
checkForException(blockOutputStream.getIoException()));
@@ -493,119 +493,28 @@ private void testExceptionDuringClose(OzoneClient
client) throws Exception {
private void testWatchForCommitWithSingleNodeRatis(OzoneClient client)
throws Exception {
String keyName = getKeyName();
- OzoneOutputStream key =
- createKey(client, keyName, 0, ReplicationFactor.ONE);
int dataLength = MAX_FLUSH_SIZE + CHUNK_SIZE;
byte[] data1 = RandomUtils.secure().randomBytes(dataLength);
- key.write(data1);
-
- KeyOutputStream keyOutputStream =
- assertInstanceOf(KeyOutputStream.class, key.getOutputStream());
-
- assertEquals(1, keyOutputStream.getStreamEntries().size());
- RatisBlockOutputStream blockOutputStream =
- assertInstanceOf(RatisBlockOutputStream.class,
- keyOutputStream.getStreamEntries().get(0).getOutputStream());
-
- // we have just written data more than flush Size(2 chunks), at this time
- // buffer pool will have up to 4 buffers allocated worth of chunk size
-
- assertThat(blockOutputStream.getBufferPool().getSize())
- .isLessThanOrEqualTo(4);
- // writtenDataLength as well flushedDataLength will be updated here
- assertEquals(dataLength, blockOutputStream.getWrittenDataLength());
-
- assertEquals(MAX_FLUSH_SIZE,
blockOutputStream.getTotalDataFlushedLength());
-
- // since data equals to maxBufferSize is written, this will be a blocking
- // call and hence will wait for atleast flushSize worth of data to get
- // ack'd by all servers right here
- assertThat(blockOutputStream.getTotalAckDataLength())
- .isGreaterThanOrEqualTo(FLUSH_SIZE);
-
- // watchForCommit will clean up atleast one entry from the map where each
- // entry corresponds to flushSize worth of data
- assertThat(blockOutputStream.getCommitIndex2flushedDataMap().size())
- .isLessThanOrEqualTo(1);
-
- // This will flush the data and update the flush length and the map.
- key.flush();
-
- // Since the data in the buffer is already flushed, flush here will have
- // no impact on the counters and data structures
-
- assertThat(blockOutputStream.getBufferPool().getSize())
- .isLessThanOrEqualTo(4);
- assertEquals(dataLength, blockOutputStream.getWrittenDataLength());
-
- assertEquals(dataLength, blockOutputStream.getTotalDataFlushedLength());
- // flush will make sure one more entry gets updated in the map
- assertThat(blockOutputStream.getCommitIndex2flushedDataMap().size())
- .isLessThanOrEqualTo(2);
-
- XceiverClientRatis raftClient =
- (XceiverClientRatis) blockOutputStream.getXceiverClient();
- assertEquals(1, raftClient.getCommitInfoMap().size());
- // Close the containers on the Datanode and write more data
- TestHelper.waitForContainerClose(key, cluster);
- // 4 writeChunks = maxFlushSize + 2 putBlocks will be discarded here
- // once exception is hit
- key.write(data1);
-
- // As a part of handling the exception, 4 failed writeChunks will be
- // rewritten plus one partial chunk plus two putBlocks for flushSize
- // and one flush for partial chunk
- key.flush();
-
- assertInstanceOf(ContainerNotOpenException.class,
- checkForException(blockOutputStream.getIoException()));
- // Make sure the retryCount is reset after the exception is handled
- assertEquals(0, keyOutputStream.getRetryCount());
- // commitInfoMap will remain intact as there is no server failure
- assertEquals(1, raftClient.getCommitInfoMap().size());
- assertEquals(2, keyOutputStream.getStreamEntries().size());
- // now close the stream, It will update ack length after watchForCommit
- key.close();
- // make sure the bufferPool is empty
- assertEquals(0, blockOutputStream.getBufferPool().computeBufferData());
- assertEquals(dataLength, blockOutputStream.getTotalAckDataLength());
- assertEquals(0, blockOutputStream.getCommitIndex2flushedDataMap().size());
- assertEquals(0, keyOutputStream.getLocationInfoList().size());
- // Written the same data twice
- byte[] bytes = ArrayUtils.addAll(data1, data1);
- validateData(keyName, bytes, client.getObjectStore(), VOLUME, BUCKET);
- }
-
- @ParameterizedTest
- @MethodSource("clientParameters")
- void testDatanodeFailureWithSingleNode(boolean flushDelay, boolean
enablePiggybacking) throws Exception {
- OzoneClientConfig config = newClientConfig(cluster.getConf(), flushDelay,
enablePiggybacking);
- try (OzoneClient client = newClient(cluster.getConf(), config)) {
- String keyName = getKeyName();
- OzoneOutputStream key =
- createKey(client, keyName, 0, ReplicationFactor.ONE);
- int dataLength = MAX_FLUSH_SIZE + CHUNK_SIZE;
- byte[] data1 = RandomUtils.secure().randomBytes(dataLength);
+ KeyOutputStream keyOutputStream;
+ RatisBlockOutputStream blockOutputStream;
+ try (OzoneOutputStream key = createKey(client, keyName, 0,
ReplicationFactor.ONE)) {
key.write(data1);
- // since its hitting the full bufferCondition, it will call
watchForCommit
- // and completes at least putBlock for first flushSize worth of data
- KeyOutputStream keyOutputStream =
- assertInstanceOf(KeyOutputStream.class, key.getOutputStream());
+
+ keyOutputStream = assertInstanceOf(KeyOutputStream.class,
key.getOutputStream());
assertEquals(1, keyOutputStream.getStreamEntries().size());
- RatisBlockOutputStream blockOutputStream =
- assertInstanceOf(RatisBlockOutputStream.class,
- keyOutputStream.getStreamEntries().get(0).getOutputStream());
+ blockOutputStream = assertInstanceOf(RatisBlockOutputStream.class,
+ keyOutputStream.getStreamEntries().get(0).getOutputStream());
// we have just written data more than flush Size(2 chunks), at this time
- // buffer pool will have 3 buffers allocated worth of chunk size
+ // buffer pool will have up to 4 buffers allocated worth of chunk size
- assertEquals(4, blockOutputStream.getBufferPool().getSize());
+ assertThat(blockOutputStream.getBufferPool().getSize())
+ .isLessThanOrEqualTo(4);
// writtenDataLength as well flushedDataLength will be updated here
assertEquals(dataLength, blockOutputStream.getWrittenDataLength());
- assertEquals(MAX_FLUSH_SIZE,
- blockOutputStream.getTotalDataFlushedLength());
+ assertEquals(MAX_FLUSH_SIZE,
blockOutputStream.getTotalDataFlushedLength());
// since data equals to maxBufferSize is written, this will be a blocking
// call and hence will wait for atleast flushSize worth of data to get
@@ -613,10 +522,10 @@ void testDatanodeFailureWithSingleNode(boolean
flushDelay, boolean enablePiggyba
assertThat(blockOutputStream.getTotalAckDataLength())
.isGreaterThanOrEqualTo(FLUSH_SIZE);
- // watchForCommit will clean up atleast flushSize worth of data buffer
- // where each entry corresponds to flushSize worth of data
+ // watchForCommit will clean up atleast one entry from the map where each
+ // entry corresponds to flushSize worth of data
assertThat(blockOutputStream.getCommitIndex2flushedDataMap().size())
- .isLessThanOrEqualTo(2);
+ .isLessThanOrEqualTo(1);
// This will flush the data and update the flush length and the map.
key.flush();
@@ -624,34 +533,124 @@ void testDatanodeFailureWithSingleNode(boolean
flushDelay, boolean enablePiggyba
// Since the data in the buffer is already flushed, flush here will have
// no impact on the counters and data structures
- assertEquals(4, blockOutputStream.getBufferPool().getSize());
+ assertThat(blockOutputStream.getBufferPool().getSize())
+ .isLessThanOrEqualTo(4);
assertEquals(dataLength, blockOutputStream.getWrittenDataLength());
assertEquals(dataLength, blockOutputStream.getTotalDataFlushedLength());
- // flush will make sure one more entry gets updated in the map
- assertEquals(0,
blockOutputStream.getCommitIndex2flushedDataMap().size());
+ // flush will make sure one more entry gets updated in the map
+ assertThat(blockOutputStream.getCommitIndex2flushedDataMap().size())
+ .isLessThanOrEqualTo(2);
XceiverClientRatis raftClient =
(XceiverClientRatis) blockOutputStream.getXceiverClient();
assertEquals(1, raftClient.getCommitInfoMap().size());
- Pipeline pipeline = raftClient.getPipeline();
- cluster.shutdownHddsDatanode(pipeline.getNodes().get(0));
-
- // again write data with more than max buffer limit. This will call
- // watchForCommit again. No write will happen in the current block and
- // data will be rewritten to the next block.
-
+ // Close the containers on the Datanode and write more data
+ TestHelper.waitForContainerClose(key, cluster);
+ // 4 writeChunks = maxFlushSize + 2 putBlocks will be discarded here
+ // once exception is hit
key.write(data1);
+
+ // As a part of handling the exception, 4 failed writeChunks will be
+ // rewritten plus one partial chunk plus two putBlocks for flushSize
+ // and one flush for partial chunk
key.flush();
- assertInstanceOf(RaftRetryFailureException.class,
+ assertInstanceOf(ContainerNotOpenException.class,
checkForException(blockOutputStream.getIoException()));
- assertEquals(1, raftClient.getCommitInfoMap().size());
// Make sure the retryCount is reset after the exception is handled
assertEquals(0, keyOutputStream.getRetryCount());
+ // commitInfoMap will remain intact as there is no server failure
+ assertEquals(1, raftClient.getCommitInfoMap().size());
assertEquals(2, keyOutputStream.getStreamEntries().size());
// now close the stream, It will update ack length after watchForCommit
- key.close();
+ }
+ // make sure the bufferPool is empty
+ assertEquals(0, blockOutputStream.getBufferPool().computeBufferData());
+ assertEquals(dataLength, blockOutputStream.getTotalAckDataLength());
+ assertEquals(0, blockOutputStream.getCommitIndex2flushedDataMap().size());
+ assertEquals(0, keyOutputStream.getLocationInfoList().size());
+ // Written the same data twice
+ byte[] bytes = ArrayUtils.addAll(data1, data1);
+ validateData(keyName, bytes, client.getObjectStore(), VOLUME, BUCKET);
+ }
+
+ @ParameterizedTest
+ @MethodSource("clientParameters")
+ void testDatanodeFailureWithSingleNode(boolean flushDelay, boolean
enablePiggybacking) throws Exception {
+ OzoneClientConfig config = newClientConfig(cluster.getConf(), flushDelay,
enablePiggybacking);
+ try (OzoneClient client = newClient(cluster.getConf(), config)) {
+ String keyName = getKeyName();
+ int dataLength = MAX_FLUSH_SIZE + CHUNK_SIZE;
+ byte[] data1 = RandomUtils.secure().randomBytes(dataLength);
+ KeyOutputStream keyOutputStream;
+ RatisBlockOutputStream blockOutputStream;
+ Pipeline pipeline;
+ try (OzoneOutputStream key = createKey(client, keyName, 0,
ReplicationFactor.ONE)) {
+ key.write(data1);
+ // since its hitting the full bufferCondition, it will call
watchForCommit
+ // and completes at least putBlock for first flushSize worth of data
+ keyOutputStream = assertInstanceOf(KeyOutputStream.class,
key.getOutputStream());
+
+ assertEquals(1, keyOutputStream.getStreamEntries().size());
+ blockOutputStream = assertInstanceOf(RatisBlockOutputStream.class,
+ keyOutputStream.getStreamEntries().get(0).getOutputStream());
+
+ // we have just written data more than flush Size(2 chunks), at this
time
+ // buffer pool will have 3 buffers allocated worth of chunk size
+
+ assertEquals(4, blockOutputStream.getBufferPool().getSize());
+ // writtenDataLength as well flushedDataLength will be updated here
+ assertEquals(dataLength, blockOutputStream.getWrittenDataLength());
+
+ assertEquals(MAX_FLUSH_SIZE,
+ blockOutputStream.getTotalDataFlushedLength());
+
+ // since data equals to maxBufferSize is written, this will be a
blocking
+ // call and hence will wait for atleast flushSize worth of data to get
+ // ack'd by all servers right here
+ assertThat(blockOutputStream.getTotalAckDataLength())
+ .isGreaterThanOrEqualTo(FLUSH_SIZE);
+
+ // watchForCommit will clean up atleast flushSize worth of data buffer
+ // where each entry corresponds to flushSize worth of data
+ assertThat(blockOutputStream.getCommitIndex2flushedDataMap().size())
+ .isLessThanOrEqualTo(2);
+
+ // This will flush the data and update the flush length and the map.
+ key.flush();
+
+ // Since the data in the buffer is already flushed, flush here will
have
+ // no impact on the counters and data structures
+
+ assertEquals(4, blockOutputStream.getBufferPool().getSize());
+ assertEquals(dataLength, blockOutputStream.getWrittenDataLength());
+
+ assertEquals(dataLength,
blockOutputStream.getTotalDataFlushedLength());
+ // flush will make sure one more entry gets updated in the map
+ assertEquals(0,
blockOutputStream.getCommitIndex2flushedDataMap().size());
+
+ XceiverClientRatis raftClient =
+ (XceiverClientRatis) blockOutputStream.getXceiverClient();
+ assertEquals(1, raftClient.getCommitInfoMap().size());
+ pipeline = raftClient.getPipeline();
+ cluster.shutdownHddsDatanode(pipeline.getNodes().get(0));
+
+ // again write data with more than max buffer limit. This will call
+ // watchForCommit again. No write will happen in the current block and
+ // data will be rewritten to the next block.
+
+ key.write(data1);
+ key.flush();
+
+ assertInstanceOf(RaftRetryFailureException.class,
+ checkForException(blockOutputStream.getIoException()));
+ assertEquals(1, raftClient.getCommitInfoMap().size());
+ // Make sure the retryCount is reset after the exception is handled
+ assertEquals(0, keyOutputStream.getRetryCount());
+ assertEquals(2, keyOutputStream.getStreamEntries().size());
+ // now close the stream, It will update ack length after watchForCommit
+ }
assertEquals(dataLength, blockOutputStream.getTotalAckDataLength());
// make sure the bufferPool is empty
assertEquals(0, blockOutputStream.getBufferPool().computeBufferData());
@@ -672,78 +671,77 @@ void testDatanodeFailureWithPreAllocation(boolean
flushDelay, boolean enablePigg
OzoneClientConfig config = newClientConfig(cluster.getConf(), flushDelay,
enablePiggybacking);
try (OzoneClient client = newClient(cluster.getConf(), config)) {
String keyName = getKeyName();
- OzoneOutputStream key =
- createKey(client, keyName, 3 * BLOCK_SIZE,
- ReplicationFactor.ONE);
int dataLength = MAX_FLUSH_SIZE + CHUNK_SIZE;
byte[] data1 = RandomUtils.secure().randomBytes(dataLength);
- key.write(data1);
- // since its hitting the full bufferCondition, it will call
watchForCommit
- // and completes at least putBlock for first flushSize worth of data
- KeyOutputStream keyOutputStream =
- assertInstanceOf(KeyOutputStream.class, key.getOutputStream());
+ KeyOutputStream keyOutputStream;
+ RatisBlockOutputStream blockOutputStream;
+ Pipeline pipeline;
+ try (OzoneOutputStream key = createKey(client, keyName, 3 * BLOCK_SIZE,
ReplicationFactor.ONE)) {
+ key.write(data1);
+ // since its hitting the full bufferCondition, it will call
watchForCommit
+ // and completes at least putBlock for first flushSize worth of data
+ keyOutputStream = assertInstanceOf(KeyOutputStream.class,
key.getOutputStream());
+
+ assertEquals(3, keyOutputStream.getStreamEntries().size());
+ blockOutputStream = assertInstanceOf(RatisBlockOutputStream.class,
+ keyOutputStream.getStreamEntries().get(0).getOutputStream());
- assertEquals(3, keyOutputStream.getStreamEntries().size());
- RatisBlockOutputStream blockOutputStream =
- assertInstanceOf(RatisBlockOutputStream.class,
- keyOutputStream.getStreamEntries().get(0).getOutputStream());
+ // we have just written data more than flush Size(2 chunks), at this
time
+ // buffer pool will have 3 buffers allocated worth of chunk size
- // we have just written data more than flush Size(2 chunks), at this time
- // buffer pool will have 3 buffers allocated worth of chunk size
+ assertEquals(4, blockOutputStream.getBufferPool().getSize());
+ // writtenDataLength as well flushedDataLength will be updated here
+ assertEquals(dataLength, blockOutputStream.getWrittenDataLength());
- assertEquals(4, blockOutputStream.getBufferPool().getSize());
- // writtenDataLength as well flushedDataLength will be updated here
- assertEquals(dataLength, blockOutputStream.getWrittenDataLength());
+ assertEquals(MAX_FLUSH_SIZE,
+ blockOutputStream.getTotalDataFlushedLength());
- assertEquals(MAX_FLUSH_SIZE,
- blockOutputStream.getTotalDataFlushedLength());
+ // since data equals to maxBufferSize is written, this will be a
blocking
+ // call and hence will wait for atleast flushSize worth of data to get
+ // ack'd by all servers right here
+ assertThat(blockOutputStream.getTotalAckDataLength())
+ .isGreaterThanOrEqualTo(FLUSH_SIZE);
- // since data equals to maxBufferSize is written, this will be a blocking
- // call and hence will wait for atleast flushSize worth of data to get
- // ack'd by all servers right here
- assertThat(blockOutputStream.getTotalAckDataLength())
- .isGreaterThanOrEqualTo(FLUSH_SIZE);
-
- // watchForCommit will clean up atleast flushSize worth of data buffer
- // where each entry corresponds to flushSize worth of data
- assertThat(blockOutputStream.getCommitIndex2flushedDataMap().size())
- .isLessThanOrEqualTo(2);
+ // watchForCommit will clean up atleast flushSize worth of data buffer
+ // where each entry corresponds to flushSize worth of data
+ assertThat(blockOutputStream.getCommitIndex2flushedDataMap().size())
+ .isLessThanOrEqualTo(2);
- // This will flush the data and update the flush length and
- // the map.
- key.flush();
+ // This will flush the data and update the flush length and
+ // the map.
+ key.flush();
- // Since the data in the buffer is already flushed, flush here will have
- // no impact on the counters and data structures
+ // Since the data in the buffer is already flushed, flush here will
have
+ // no impact on the counters and data structures
- assertEquals(4, blockOutputStream.getBufferPool().getSize());
- assertEquals(dataLength, blockOutputStream.getWrittenDataLength());
+ assertEquals(4, blockOutputStream.getBufferPool().getSize());
+ assertEquals(dataLength, blockOutputStream.getWrittenDataLength());
- assertEquals(dataLength, blockOutputStream.getTotalDataFlushedLength());
- // flush will make sure one more entry gets updated in the map
- assertEquals(0,
blockOutputStream.getCommitIndex2flushedDataMap().size());
+ assertEquals(dataLength,
blockOutputStream.getTotalDataFlushedLength());
+ // flush will make sure one more entry gets updated in the map
+ assertEquals(0,
blockOutputStream.getCommitIndex2flushedDataMap().size());
- XceiverClientRatis raftClient =
- (XceiverClientRatis) blockOutputStream.getXceiverClient();
- assertEquals(1, raftClient.getCommitInfoMap().size());
- Pipeline pipeline = raftClient.getPipeline();
- cluster.shutdownHddsDatanode(pipeline.getNodes().get(0));
+ XceiverClientRatis raftClient =
+ (XceiverClientRatis) blockOutputStream.getXceiverClient();
+ assertEquals(1, raftClient.getCommitInfoMap().size());
+ pipeline = raftClient.getPipeline();
+ cluster.shutdownHddsDatanode(pipeline.getNodes().get(0));
- // again write data with more than max buffer limit. This will call
- // watchForCommit again. No write will happen and
+ // again write data with more than max buffer limit. This will call
+ // watchForCommit again. No write will happen and
- key.write(data1);
- key.flush();
+ key.write(data1);
+ key.flush();
- assertInstanceOf(RaftRetryFailureException.class,
- checkForException(blockOutputStream.getIoException()));
+ assertInstanceOf(RaftRetryFailureException.class,
+ checkForException(blockOutputStream.getIoException()));
- // Make sure the retryCount is reset after the exception is handled
- assertEquals(0, keyOutputStream.getRetryCount());
- assertEquals(1, raftClient.getCommitInfoMap().size());
+ // Make sure the retryCount is reset after the exception is handled
+ assertEquals(0, keyOutputStream.getRetryCount());
+ assertEquals(1, raftClient.getCommitInfoMap().size());
- // now close the stream, It will update ack length after watchForCommit
- key.close();
+ // now close the stream, It will update ack length after watchForCommit
+ }
assertEquals(dataLength, blockOutputStream.getTotalAckDataLength());
// make sure the bufferPool is empty
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]