chungen0126 commented on code in PR #9392:
URL: https://github.com/apache/ozone/pull/9392#discussion_r2576041609
##########
hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStream.java:
##########
@@ -684,71 +698,74 @@ void testWriteMoreThanMaxFlushSize(boolean flushDelay,
boolean enablePiggybackin
metrics.getPendingContainerOpCountMetrics(PutBlock);
long totalOpCount = metrics.getTotalOpCount();
String keyName = getKeyName();
- OzoneOutputStream key = createKey(client, keyName);
int dataLength = MAX_FLUSH_SIZE + 50;
// write data more than 1 chunk
byte[] data1 = RandomUtils.secure().randomBytes(dataLength);
- key.write(data1);
- KeyOutputStream keyOutputStream =
- assertInstanceOf(KeyOutputStream.class, key.getOutputStream());
-
- // since it's hitting full-buffer, it will call watchForCommit
- // and completes putBlock at least for first flushSize worth of data
- assertThat(metrics.getPendingContainerOpCountMetrics(WriteChunk))
- .isLessThanOrEqualTo(pendingWriteChunkCount + 2);
- assertThat(metrics.getPendingContainerOpCountMetrics(PutBlock))
- .isLessThanOrEqualTo(pendingPutBlockCount + 1);
- assertEquals(writeChunkCount + 4,
- metrics.getContainerOpCountMetrics(WriteChunk));
- assertEquals(putBlockCount + 2,
- metrics.getContainerOpCountMetrics(PutBlock));
- assertEquals(totalOpCount + 6, metrics.getTotalOpCount());
- assertEquals(1, keyOutputStream.getStreamEntries().size());
- RatisBlockOutputStream blockOutputStream =
- assertInstanceOf(RatisBlockOutputStream.class,
- keyOutputStream.getStreamEntries().get(0).getOutputStream());
-
- assertThat(blockOutputStream.getBufferPool().getSize())
- .isLessThanOrEqualTo(4);
- // writtenDataLength as well flushedDataLength will be updated here
- assertEquals(dataLength, blockOutputStream.getWrittenDataLength());
-
- assertEquals(MAX_FLUSH_SIZE,
- blockOutputStream.getTotalDataFlushedLength());
-
- // since data equals to maxBufferSize is written, this will be a blocking
- // call and hence will wait for atleast flushSize worth of data to get
- // ack'd by all servers right here
- assertThat(blockOutputStream.getTotalAckDataLength())
- .isGreaterThanOrEqualTo(FLUSH_SIZE);
-
- // watchForCommit will clean up atleast one entry from the map where each
- // entry corresponds to flushSize worth of data
- assertThat(blockOutputStream.getCommitIndex2flushedDataMap().size())
- .isLessThanOrEqualTo(1);
-
- // Now do a flush.
- key.flush();
- assertEquals(1, keyOutputStream.getStreamEntries().size());
- assertEquals(pendingWriteChunkCount,
- metrics.getPendingContainerOpCountMetrics(WriteChunk));
- assertEquals(pendingPutBlockCount,
- metrics.getPendingContainerOpCountMetrics(PutBlock));
+ KeyOutputStream keyOutputStream;
+ RatisBlockOutputStream blockOutputStream;
+ try (OzoneOutputStream key = createKey(client, keyName)) {
+ key.write(data1);
+ keyOutputStream =
+ assertInstanceOf(KeyOutputStream.class, key.getOutputStream());
+
+ // since it's hitting full-buffer, it will call watchForCommit
+ // and completes putBlock at least for first flushSize worth of data
+ assertThat(metrics.getPendingContainerOpCountMetrics(WriteChunk))
+ .isLessThanOrEqualTo(pendingWriteChunkCount + 2);
+ assertThat(metrics.getPendingContainerOpCountMetrics(PutBlock))
+ .isLessThanOrEqualTo(pendingPutBlockCount + 1);
+ assertEquals(writeChunkCount + 4,
+ metrics.getContainerOpCountMetrics(WriteChunk));
+ assertEquals(putBlockCount + 2,
+ metrics.getContainerOpCountMetrics(PutBlock));
+ assertEquals(totalOpCount + 6, metrics.getTotalOpCount());
+ assertEquals(1, keyOutputStream.getStreamEntries().size());
+ blockOutputStream =
+ assertInstanceOf(RatisBlockOutputStream.class,
+ keyOutputStream.getStreamEntries().get(0).getOutputStream());
+
+ assertThat(blockOutputStream.getBufferPool().getSize())
+ .isLessThanOrEqualTo(4);
+ // writtenDataLength as well flushedDataLength will be updated here
+ assertEquals(dataLength, blockOutputStream.getWrittenDataLength());
+
+ assertEquals(MAX_FLUSH_SIZE,
+ blockOutputStream.getTotalDataFlushedLength());
+
+ // since data equals to maxBufferSize is written, this will be a
blocking
+ // call and hence will wait for atleast flushSize worth of data to get
+ // ack'd by all servers right here
+ assertThat(blockOutputStream.getTotalAckDataLength())
+ .isGreaterThanOrEqualTo(FLUSH_SIZE);
+
+ // watchForCommit will clean up atleast one entry from the map where
each
+ // entry corresponds to flushSize worth of data
+ assertThat(blockOutputStream.getCommitIndex2flushedDataMap().size())
+ .isLessThanOrEqualTo(1);
+
+ // Now do a flush.
+ key.flush();
+ assertEquals(1, keyOutputStream.getStreamEntries().size());
+ assertEquals(pendingWriteChunkCount,
+ metrics.getPendingContainerOpCountMetrics(WriteChunk));
+ assertEquals(pendingPutBlockCount,
+ metrics.getPendingContainerOpCountMetrics(PutBlock));
- // Since the data in the buffer is already flushed, flush here will have
- // no impact on the counters and data structures
+ // Since the data in the buffer is already flushed, flush here will
have
+ // no impact on the counters and data structures
- assertThat(blockOutputStream.getBufferPool().getSize())
- .isLessThanOrEqualTo(4);
- assertEquals(dataLength, blockOutputStream.getWrittenDataLength());
- // dataLength > MAX_FLUSH_SIZE
- assertEquals(flushDelay ? MAX_FLUSH_SIZE : dataLength,
- blockOutputStream.getTotalDataFlushedLength());
- assertThat(blockOutputStream.getCommitIndex2flushedDataMap().size())
- .isLessThanOrEqualTo(2);
+ assertThat(blockOutputStream.getBufferPool().getSize())
+ .isLessThanOrEqualTo(4);
+ assertEquals(dataLength, blockOutputStream.getWrittenDataLength());
+ // dataLength > MAX_FLUSH_SIZE
+ assertEquals(flushDelay ? MAX_FLUSH_SIZE : dataLength,
+ blockOutputStream.getTotalDataFlushedLength());
+ assertThat(blockOutputStream.getCommitIndex2flushedDataMap().size())
+ .isLessThanOrEqualTo(2);
- // now close the stream, it will update ack length after watchForCommit
- key.close();
+ // now close the stream, it will update ack length after watchForCommit
+ key.close();
Review Comment:
Redundant 'close()'
##########
hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStreamWithFailures.java:
##########
@@ -107,259 +107,262 @@ void testContainerClose(boolean flushDelay, boolean
enablePiggybacking) throws E
private void testWatchForCommitWithCloseContainerException(OzoneClient
client)
throws Exception {
String keyName = getKeyName();
- OzoneOutputStream key = createKey(client, keyName);
int dataLength = MAX_FLUSH_SIZE + CHUNK_SIZE;
byte[] data1 = RandomUtils.secure().randomBytes(dataLength);
- key.write(data1);
-
- KeyOutputStream keyOutputStream =
- assertInstanceOf(KeyOutputStream.class, key.getOutputStream());
-
- assertEquals(1, keyOutputStream.getStreamEntries().size());
- RatisBlockOutputStream blockOutputStream =
- assertInstanceOf(RatisBlockOutputStream.class,
- keyOutputStream.getStreamEntries().get(0).getOutputStream());
-
- // we have just written data more than flush Size(2 chunks), at this time
- // buffer pool will have 4 buffers allocated worth of chunk size
-
- assertEquals(4, blockOutputStream.getBufferPool().getSize());
- // writtenDataLength as well flushedDataLength will be updated here
- assertEquals(dataLength, blockOutputStream.getWrittenDataLength());
-
- assertEquals(MAX_FLUSH_SIZE,
blockOutputStream.getTotalDataFlushedLength());
-
- // since data equals to maxBufferSize is written, this will be a blocking
- // call and hence will wait for atleast flushSize worth of data to get
- // ack'd by all servers right here
- assertThat(blockOutputStream.getTotalAckDataLength())
- .isGreaterThanOrEqualTo(FLUSH_SIZE);
-
- // watchForCommit will clean up atleast one entry from the map where each
- // entry corresponds to flushSize worth of data
- assertThat(blockOutputStream.getCommitIndex2flushedDataMap().size())
- .isLessThanOrEqualTo(1);
-
- // This will flush the data and update the flush length and the map.
- key.flush();
-
- // flush is a sync call, all pending operations will complete
- // Since the data in the buffer is already flushed, flush here will have
- // no impact on the counters and data structures
-
- assertEquals(4, blockOutputStream.getBufferPool().getSize());
- assertEquals(dataLength, blockOutputStream.getWrittenDataLength());
-
- assertEquals(dataLength, blockOutputStream.getTotalDataFlushedLength());
- // flush will make sure one more entry gets updated in the map
- assertThat(blockOutputStream.getCommitIndex2flushedDataMap().size())
- .isLessThanOrEqualTo(2);
-
- XceiverClientRatis raftClient =
- (XceiverClientRatis) blockOutputStream.getXceiverClient();
- assertEquals(3, raftClient.getCommitInfoMap().size());
- // Close the containers on the Datanode and write more data
- TestHelper.waitForContainerClose(key, cluster);
- key.write(data1);
-
- // As a part of handling the exception, 4 failed writeChunks will be
- // rewritten plus one partial chunk plus two putBlocks for flushSize
- // and one flush for partial chunk
- key.flush();
- assertEquals(2, keyOutputStream.getStreamEntries().size());
- assertInstanceOf(ContainerNotOpenException.class,
- checkForException(blockOutputStream.getIoException()));
-
- // Make sure the retryCount is reset after the exception is handled
- assertEquals(0, keyOutputStream.getRetryCount());
- // commitInfoMap will remain intact as there is no server failure
- assertEquals(3, raftClient.getCommitInfoMap().size());
- // now close the stream, It will update ack length after watchForCommit
- key.close();
- // make sure the bufferPool is empty
- assertEquals(0, blockOutputStream.getBufferPool().computeBufferData());
- assertEquals(dataLength, blockOutputStream.getTotalAckDataLength());
- assertEquals(0, blockOutputStream.getCommitIndex2flushedDataMap().size());
- assertEquals(0, keyOutputStream.getStreamEntries().size());
- // Written the same data twice
- byte[] bytes = ArrayUtils.addAll(data1, data1);
- validateData(keyName, bytes, client.getObjectStore(), VOLUME, BUCKET);
- }
-
- @ParameterizedTest
- @MethodSource("clientParameters")
- void testWatchForCommitDatanodeFailure(boolean flushDelay, boolean
enablePiggybacking) throws Exception {
- OzoneClientConfig config = newClientConfig(cluster.getConf(), flushDelay,
enablePiggybacking);
- try (OzoneClient client = newClient(cluster.getConf(), config)) {
- String keyName = getKeyName();
- OzoneOutputStream key = createKey(client, keyName);
- int dataLength = MAX_FLUSH_SIZE + CHUNK_SIZE;
- byte[] data1 = RandomUtils.secure().randomBytes(dataLength);
+ KeyOutputStream keyOutputStream;
+ RatisBlockOutputStream blockOutputStream;
+ try (OzoneOutputStream key = createKey(client, keyName)) {
key.write(data1);
- // since its hitting the full bufferCondition, it will call
watchForCommit
- // and completes at least putBlock for first flushSize worth of data
- KeyOutputStream keyOutputStream =
- assertInstanceOf(KeyOutputStream.class, key.getOutputStream());
+
+ keyOutputStream = assertInstanceOf(KeyOutputStream.class,
key.getOutputStream());
assertEquals(1, keyOutputStream.getStreamEntries().size());
- RatisBlockOutputStream blockOutputStream =
- assertInstanceOf(RatisBlockOutputStream.class,
- keyOutputStream.getStreamEntries().get(0).getOutputStream());
+ blockOutputStream = assertInstanceOf(RatisBlockOutputStream.class,
+ keyOutputStream.getStreamEntries().get(0).getOutputStream());
// we have just written data more than flush Size(2 chunks), at this time
- // buffer pool will have 3 buffers allocated worth of chunk size
+ // buffer pool will have 4 buffers allocated worth of chunk size
assertEquals(4, blockOutputStream.getBufferPool().getSize());
// writtenDataLength as well flushedDataLength will be updated here
assertEquals(dataLength, blockOutputStream.getWrittenDataLength());
- // since data written is still less than flushLength, flushLength will
- // still be 0.
- assertEquals(MAX_FLUSH_SIZE,
- blockOutputStream.getTotalDataFlushedLength());
+ assertEquals(MAX_FLUSH_SIZE,
blockOutputStream.getTotalDataFlushedLength());
// since data equals to maxBufferSize is written, this will be a blocking
// call and hence will wait for atleast flushSize worth of data to get
// ack'd by all servers right here
assertThat(blockOutputStream.getTotalAckDataLength())
.isGreaterThanOrEqualTo(FLUSH_SIZE);
- // watchForCommit will clean up atleast flushSize worth of data buffer
- // where each entry corresponds to flushSize worth of data
+ // watchForCommit will clean up atleast one entry from the map where each
+ // entry corresponds to flushSize worth of data
assertThat(blockOutputStream.getCommitIndex2flushedDataMap().size())
- .isLessThanOrEqualTo(2);
+ .isLessThanOrEqualTo(1);
// This will flush the data and update the flush length and the map.
key.flush();
+ // flush is a sync call, all pending operations will complete
// Since the data in the buffer is already flushed, flush here will have
// no impact on the counters and data structures
assertEquals(4, blockOutputStream.getBufferPool().getSize());
assertEquals(dataLength, blockOutputStream.getWrittenDataLength());
assertEquals(dataLength, blockOutputStream.getTotalDataFlushedLength());
- // flush will make sure one more entry gets updated in the map
- assertEquals(0,
blockOutputStream.getCommitIndex2flushedDataMap().size());
+ // flush will make sure one more entry gets updated in the map
+ assertThat(blockOutputStream.getCommitIndex2flushedDataMap().size())
+ .isLessThanOrEqualTo(2);
XceiverClientRatis raftClient =
(XceiverClientRatis) blockOutputStream.getXceiverClient();
assertEquals(3, raftClient.getCommitInfoMap().size());
- Pipeline pipeline = raftClient.getPipeline();
- stopAndRemove(pipeline.getNodes().get(0));
-
- // again write data with more than max buffer limit. This will call
- // watchForCommit again. Since the commit will happen 2 way, the
- // commitInfoMap will get updated for servers which are alive
+ // Close the containers on the Datanode and write more data
+ TestHelper.waitForContainerClose(key, cluster);
key.write(data1);
+ // As a part of handling the exception, 4 failed writeChunks will be
+ // rewritten plus one partial chunk plus two putBlocks for flushSize
+ // and one flush for partial chunk
key.flush();
-
assertEquals(2, keyOutputStream.getStreamEntries().size());
- // now close the stream, It will update ack length after watchForCommit
- key.close();
+ assertInstanceOf(ContainerNotOpenException.class,
+ checkForException(blockOutputStream.getIoException()));
+
// Make sure the retryCount is reset after the exception is handled
assertEquals(0, keyOutputStream.getRetryCount());
- // make sure the bufferPool is empty
- assertEquals(0, blockOutputStream.getBufferPool().computeBufferData());
- assertEquals(0,
blockOutputStream.getCommitIndex2flushedDataMap().size());
- assertEquals(0, keyOutputStream.getStreamEntries().size());
- // Written the same data twice
- byte[] bytes = ArrayUtils.addAll(data1, data1);
- validateData(keyName, bytes, client.getObjectStore(), VOLUME, BUCKET);
+ // commitInfoMap will remain intact as there is no server failure
+ assertEquals(3, raftClient.getCommitInfoMap().size());
+ // now close the stream, It will update ack length after watchForCommit
+ key.close();
Review Comment:
Redundant 'close()'
##########
hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStreamWithFailures.java:
##########
@@ -107,259 +107,262 @@ void testContainerClose(boolean flushDelay, boolean
enablePiggybacking) throws E
private void testWatchForCommitWithCloseContainerException(OzoneClient
client)
throws Exception {
String keyName = getKeyName();
- OzoneOutputStream key = createKey(client, keyName);
int dataLength = MAX_FLUSH_SIZE + CHUNK_SIZE;
byte[] data1 = RandomUtils.secure().randomBytes(dataLength);
- key.write(data1);
-
- KeyOutputStream keyOutputStream =
- assertInstanceOf(KeyOutputStream.class, key.getOutputStream());
-
- assertEquals(1, keyOutputStream.getStreamEntries().size());
- RatisBlockOutputStream blockOutputStream =
- assertInstanceOf(RatisBlockOutputStream.class,
- keyOutputStream.getStreamEntries().get(0).getOutputStream());
-
- // we have just written data more than flush Size(2 chunks), at this time
- // buffer pool will have 4 buffers allocated worth of chunk size
-
- assertEquals(4, blockOutputStream.getBufferPool().getSize());
- // writtenDataLength as well flushedDataLength will be updated here
- assertEquals(dataLength, blockOutputStream.getWrittenDataLength());
-
- assertEquals(MAX_FLUSH_SIZE,
blockOutputStream.getTotalDataFlushedLength());
-
- // since data equals to maxBufferSize is written, this will be a blocking
- // call and hence will wait for atleast flushSize worth of data to get
- // ack'd by all servers right here
- assertThat(blockOutputStream.getTotalAckDataLength())
- .isGreaterThanOrEqualTo(FLUSH_SIZE);
-
- // watchForCommit will clean up atleast one entry from the map where each
- // entry corresponds to flushSize worth of data
- assertThat(blockOutputStream.getCommitIndex2flushedDataMap().size())
- .isLessThanOrEqualTo(1);
-
- // This will flush the data and update the flush length and the map.
- key.flush();
-
- // flush is a sync call, all pending operations will complete
- // Since the data in the buffer is already flushed, flush here will have
- // no impact on the counters and data structures
-
- assertEquals(4, blockOutputStream.getBufferPool().getSize());
- assertEquals(dataLength, blockOutputStream.getWrittenDataLength());
-
- assertEquals(dataLength, blockOutputStream.getTotalDataFlushedLength());
- // flush will make sure one more entry gets updated in the map
- assertThat(blockOutputStream.getCommitIndex2flushedDataMap().size())
- .isLessThanOrEqualTo(2);
-
- XceiverClientRatis raftClient =
- (XceiverClientRatis) blockOutputStream.getXceiverClient();
- assertEquals(3, raftClient.getCommitInfoMap().size());
- // Close the containers on the Datanode and write more data
- TestHelper.waitForContainerClose(key, cluster);
- key.write(data1);
-
- // As a part of handling the exception, 4 failed writeChunks will be
- // rewritten plus one partial chunk plus two putBlocks for flushSize
- // and one flush for partial chunk
- key.flush();
- assertEquals(2, keyOutputStream.getStreamEntries().size());
- assertInstanceOf(ContainerNotOpenException.class,
- checkForException(blockOutputStream.getIoException()));
-
- // Make sure the retryCount is reset after the exception is handled
- assertEquals(0, keyOutputStream.getRetryCount());
- // commitInfoMap will remain intact as there is no server failure
- assertEquals(3, raftClient.getCommitInfoMap().size());
- // now close the stream, It will update ack length after watchForCommit
- key.close();
- // make sure the bufferPool is empty
- assertEquals(0, blockOutputStream.getBufferPool().computeBufferData());
- assertEquals(dataLength, blockOutputStream.getTotalAckDataLength());
- assertEquals(0, blockOutputStream.getCommitIndex2flushedDataMap().size());
- assertEquals(0, keyOutputStream.getStreamEntries().size());
- // Written the same data twice
- byte[] bytes = ArrayUtils.addAll(data1, data1);
- validateData(keyName, bytes, client.getObjectStore(), VOLUME, BUCKET);
- }
-
- @ParameterizedTest
- @MethodSource("clientParameters")
- void testWatchForCommitDatanodeFailure(boolean flushDelay, boolean
enablePiggybacking) throws Exception {
- OzoneClientConfig config = newClientConfig(cluster.getConf(), flushDelay,
enablePiggybacking);
- try (OzoneClient client = newClient(cluster.getConf(), config)) {
- String keyName = getKeyName();
- OzoneOutputStream key = createKey(client, keyName);
- int dataLength = MAX_FLUSH_SIZE + CHUNK_SIZE;
- byte[] data1 = RandomUtils.secure().randomBytes(dataLength);
+ KeyOutputStream keyOutputStream;
+ RatisBlockOutputStream blockOutputStream;
+ try (OzoneOutputStream key = createKey(client, keyName)) {
key.write(data1);
- // since its hitting the full bufferCondition, it will call
watchForCommit
- // and completes at least putBlock for first flushSize worth of data
- KeyOutputStream keyOutputStream =
- assertInstanceOf(KeyOutputStream.class, key.getOutputStream());
+
+ keyOutputStream = assertInstanceOf(KeyOutputStream.class,
key.getOutputStream());
assertEquals(1, keyOutputStream.getStreamEntries().size());
- RatisBlockOutputStream blockOutputStream =
- assertInstanceOf(RatisBlockOutputStream.class,
- keyOutputStream.getStreamEntries().get(0).getOutputStream());
+ blockOutputStream = assertInstanceOf(RatisBlockOutputStream.class,
+ keyOutputStream.getStreamEntries().get(0).getOutputStream());
// we have just written data more than flush Size(2 chunks), at this time
- // buffer pool will have 3 buffers allocated worth of chunk size
+ // buffer pool will have 4 buffers allocated worth of chunk size
assertEquals(4, blockOutputStream.getBufferPool().getSize());
// writtenDataLength as well flushedDataLength will be updated here
assertEquals(dataLength, blockOutputStream.getWrittenDataLength());
- // since data written is still less than flushLength, flushLength will
- // still be 0.
- assertEquals(MAX_FLUSH_SIZE,
- blockOutputStream.getTotalDataFlushedLength());
+ assertEquals(MAX_FLUSH_SIZE,
blockOutputStream.getTotalDataFlushedLength());
// since data equals to maxBufferSize is written, this will be a blocking
// call and hence will wait for atleast flushSize worth of data to get
// ack'd by all servers right here
assertThat(blockOutputStream.getTotalAckDataLength())
.isGreaterThanOrEqualTo(FLUSH_SIZE);
- // watchForCommit will clean up atleast flushSize worth of data buffer
- // where each entry corresponds to flushSize worth of data
+ // watchForCommit will clean up atleast one entry from the map where each
+ // entry corresponds to flushSize worth of data
assertThat(blockOutputStream.getCommitIndex2flushedDataMap().size())
- .isLessThanOrEqualTo(2);
+ .isLessThanOrEqualTo(1);
// This will flush the data and update the flush length and the map.
key.flush();
+ // flush is a sync call, all pending operations will complete
// Since the data in the buffer is already flushed, flush here will have
// no impact on the counters and data structures
assertEquals(4, blockOutputStream.getBufferPool().getSize());
assertEquals(dataLength, blockOutputStream.getWrittenDataLength());
assertEquals(dataLength, blockOutputStream.getTotalDataFlushedLength());
- // flush will make sure one more entry gets updated in the map
- assertEquals(0,
blockOutputStream.getCommitIndex2flushedDataMap().size());
+ // flush will make sure one more entry gets updated in the map
+ assertThat(blockOutputStream.getCommitIndex2flushedDataMap().size())
+ .isLessThanOrEqualTo(2);
XceiverClientRatis raftClient =
(XceiverClientRatis) blockOutputStream.getXceiverClient();
assertEquals(3, raftClient.getCommitInfoMap().size());
- Pipeline pipeline = raftClient.getPipeline();
- stopAndRemove(pipeline.getNodes().get(0));
-
- // again write data with more than max buffer limit. This will call
- // watchForCommit again. Since the commit will happen 2 way, the
- // commitInfoMap will get updated for servers which are alive
+ // Close the containers on the Datanode and write more data
+ TestHelper.waitForContainerClose(key, cluster);
key.write(data1);
+ // As a part of handling the exception, 4 failed writeChunks will be
+ // rewritten plus one partial chunk plus two putBlocks for flushSize
+ // and one flush for partial chunk
key.flush();
-
assertEquals(2, keyOutputStream.getStreamEntries().size());
- // now close the stream, It will update ack length after watchForCommit
- key.close();
+ assertInstanceOf(ContainerNotOpenException.class,
+ checkForException(blockOutputStream.getIoException()));
+
// Make sure the retryCount is reset after the exception is handled
assertEquals(0, keyOutputStream.getRetryCount());
- // make sure the bufferPool is empty
- assertEquals(0, blockOutputStream.getBufferPool().computeBufferData());
- assertEquals(0,
blockOutputStream.getCommitIndex2flushedDataMap().size());
- assertEquals(0, keyOutputStream.getStreamEntries().size());
- // Written the same data twice
- byte[] bytes = ArrayUtils.addAll(data1, data1);
- validateData(keyName, bytes, client.getObjectStore(), VOLUME, BUCKET);
+ // commitInfoMap will remain intact as there is no server failure
+ assertEquals(3, raftClient.getCommitInfoMap().size());
+ // now close the stream, It will update ack length after watchForCommit
+ key.close();
}
+ // make sure the bufferPool is empty
+ assertEquals(0, blockOutputStream.getBufferPool().computeBufferData());
+ assertEquals(dataLength, blockOutputStream.getTotalAckDataLength());
+ assertEquals(0, blockOutputStream.getCommitIndex2flushedDataMap().size());
+ assertEquals(0, keyOutputStream.getStreamEntries().size());
+ // Written the same data twice
+ byte[] bytes = ArrayUtils.addAll(data1, data1);
+ validateData(keyName, bytes, client.getObjectStore(), VOLUME, BUCKET);
}
@ParameterizedTest
@MethodSource("clientParameters")
- void test2DatanodesFailure(boolean flushDelay, boolean enablePiggybacking)
throws Exception {
+ void testWatchForCommitDatanodeFailure(boolean flushDelay, boolean
enablePiggybacking) throws Exception {
OzoneClientConfig config = newClientConfig(cluster.getConf(), flushDelay,
enablePiggybacking);
try (OzoneClient client = newClient(cluster.getConf(), config)) {
String keyName = getKeyName();
- OzoneOutputStream key = createKey(client, keyName);
int dataLength = MAX_FLUSH_SIZE + CHUNK_SIZE;
byte[] data1 = RandomUtils.secure().randomBytes(dataLength);
- key.write(data1);
- // since its hitting the full bufferCondition, it will call
watchForCommit
- // and completes atleast putBlock for first flushSize worth of data
- KeyOutputStream keyOutputStream =
- assertInstanceOf(KeyOutputStream.class, key.getOutputStream());
-
- RatisBlockOutputStream blockOutputStream =
- assertInstanceOf(RatisBlockOutputStream.class,
- keyOutputStream.getStreamEntries().get(0).getOutputStream());
+ KeyOutputStream keyOutputStream;
+ RatisBlockOutputStream blockOutputStream;
+ try (OzoneOutputStream key = createKey(client, keyName)) {
+ key.write(data1);
+ // since its hitting the full bufferCondition, it will call
watchForCommit
+ // and completes at least putBlock for first flushSize worth of data
+ keyOutputStream = assertInstanceOf(KeyOutputStream.class,
key.getOutputStream());
+
+ assertEquals(1, keyOutputStream.getStreamEntries().size());
+ blockOutputStream = assertInstanceOf(RatisBlockOutputStream.class,
+ keyOutputStream.getStreamEntries().get(0).getOutputStream());
- // we have just written data more than flush Size(2 chunks), at this time
- // buffer pool will have 3 buffers allocated worth of chunk size
+ // we have just written data more than flush Size(2 chunks), at this
time
+ // buffer pool will have 3 buffers allocated worth of chunk size
- assertEquals(4, blockOutputStream.getBufferPool().getSize());
- // writtenDataLength as well flushedDataLength will be updated here
- assertEquals(dataLength, blockOutputStream.getWrittenDataLength());
+ assertEquals(4, blockOutputStream.getBufferPool().getSize());
+ // writtenDataLength as well flushedDataLength will be updated here
+ assertEquals(dataLength, blockOutputStream.getWrittenDataLength());
- assertEquals(MAX_FLUSH_SIZE,
- blockOutputStream.getTotalDataFlushedLength());
+ // since data written is still less than flushLength, flushLength will
+ // still be 0.
+ assertEquals(MAX_FLUSH_SIZE,
+ blockOutputStream.getTotalDataFlushedLength());
- // since data equals to maxBufferSize is written, this will be a blocking
- // call and hence will wait for atleast flushSize worth of data to get
- // acked by all servers right here
- assertThat(blockOutputStream.getTotalAckDataLength())
- .isGreaterThanOrEqualTo(FLUSH_SIZE);
+ // since data equals to maxBufferSize is written, this will be a
blocking
+ // call and hence will wait for atleast flushSize worth of data to get
+ // ack'd by all servers right here
+ assertThat(blockOutputStream.getTotalAckDataLength())
+ .isGreaterThanOrEqualTo(FLUSH_SIZE);
- // watchForCommit will clean up atleast one entry from the map where each
- // entry corresponds to flushSize worth of data
- assertThat(blockOutputStream.getCommitIndex2flushedDataMap().size())
- .isLessThanOrEqualTo(1);
+ // watchForCommit will clean up atleast flushSize worth of data buffer
+ // where each entry corresponds to flushSize worth of data
+ assertThat(blockOutputStream.getCommitIndex2flushedDataMap().size())
+ .isLessThanOrEqualTo(2);
- // This will flush the data and update the flush length and the map.
- key.flush();
+ // This will flush the data and update the flush length and the map.
+ key.flush();
- // Since the data in the buffer is already flushed, flush here will have
- // no impact on the counters and data structures
+ // Since the data in the buffer is already flushed, flush here will
have
+ // no impact on the counters and data structures
- assertEquals(4, blockOutputStream.getBufferPool().getSize());
- assertEquals(dataLength, blockOutputStream.getWrittenDataLength());
+ assertEquals(4, blockOutputStream.getBufferPool().getSize());
+ assertEquals(dataLength, blockOutputStream.getWrittenDataLength());
- assertEquals(dataLength, blockOutputStream.getTotalDataFlushedLength());
- // flush will make sure one more entry gets updated in the map
- assertThat(blockOutputStream.getCommitIndex2flushedDataMap().size())
- .isLessThanOrEqualTo(2);
+ assertEquals(dataLength,
blockOutputStream.getTotalDataFlushedLength());
+ // flush will make sure one more entry gets updated in the map
+ assertEquals(0,
blockOutputStream.getCommitIndex2flushedDataMap().size());
- XceiverClientRatis raftClient =
- (XceiverClientRatis) blockOutputStream.getXceiverClient();
- assertEquals(3, raftClient.getCommitInfoMap().size());
- Pipeline pipeline = raftClient.getPipeline();
- stopAndRemove(pipeline.getNodes().get(0));
- stopAndRemove(pipeline.getNodes().get(1));
- // again write data with more than max buffer limit. This will call
- // watchForCommit again. Since the commit will happen 2 way, the
- // commitInfoMap will get updated for servers which are alive
+ XceiverClientRatis raftClient =
+ (XceiverClientRatis) blockOutputStream.getXceiverClient();
+ assertEquals(3, raftClient.getCommitInfoMap().size());
+ Pipeline pipeline = raftClient.getPipeline();
+ stopAndRemove(pipeline.getNodes().get(0));
- // 4 writeChunks = maxFlushSize + 2 putBlocks will be discarded here
- // once exception is hit
- key.write(data1);
+ // again write data with more than max buffer limit. This will call
+ // watchForCommit again. Since the commit will happen 2 way, the
+ // commitInfoMap will get updated for servers which are alive
+ key.write(data1);
- // As a part of handling the exception, 4 failed writeChunks will be
- // rewritten plus one partial chunk plus two putBlocks for flushSize
- // and one flush for partial chunk
- key.flush();
+ key.flush();
- Throwable ioException = checkForException(
- blockOutputStream.getIoException());
- // Since, 2 datanodes went down,
- // a) if the pipeline gets destroyed quickly it will hit
- // GroupMismatchException.
- // b) will hit close container exception if the container is closed
- // but pipeline is still not destroyed.
- // c) will fail with RaftRetryFailureException if the leader election
- // did not finish before the request retry count finishes.
- assertTrue(ioException instanceof RaftRetryFailureException
- || ioException instanceof GroupMismatchException
- || ioException instanceof ContainerNotOpenException);
+ assertEquals(2, keyOutputStream.getStreamEntries().size());
+ // now close the stream, It will update ack length after watchForCommit
+ key.close();
Review Comment:
Redundant 'close()'
##########
hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStream.java:
##########
@@ -293,86 +297,89 @@ void testWriteExactlyFlushSize(boolean flushDelay,
boolean enablePiggybacking) t
final long totalOpCount = metrics.getTotalOpCount();
String keyName = getKeyName();
- OzoneOutputStream key = createKey(client, keyName);
// write data equal to 2 chunks
int dataLength = FLUSH_SIZE;
byte[] data1 = RandomUtils.secure().randomBytes(dataLength);
- key.write(data1);
-
- assertEquals(writeChunkCount + 2,
- metrics.getContainerOpCountMetrics(WriteChunk));
- assertEquals(putBlockCount + 1,
- metrics.getContainerOpCountMetrics(PutBlock));
- // The WriteChunk and PutBlock can be completed soon.
- assertThat(metrics.getPendingContainerOpCountMetrics(WriteChunk))
- .isLessThanOrEqualTo(pendingWriteChunkCount + 2);
- assertThat(metrics.getPendingContainerOpCountMetrics(PutBlock))
- .isLessThanOrEqualTo(pendingPutBlockCount + 1);
-
- KeyOutputStream keyOutputStream =
- assertInstanceOf(KeyOutputStream.class, key.getOutputStream());
- assertEquals(1, keyOutputStream.getStreamEntries().size());
- RatisBlockOutputStream blockOutputStream =
- assertInstanceOf(RatisBlockOutputStream.class,
- keyOutputStream.getStreamEntries().get(0).getOutputStream());
-
- // we have just written data equal flush Size = 2 chunks, at this time
- // buffer pool will have 2 buffers allocated worth of chunk size
-
- assertEquals(2, blockOutputStream.getBufferPool().getSize());
- // writtenDataLength as well flushedDataLength will be updated here
- assertEquals(dataLength, blockOutputStream.getWrittenDataLength());
-
- assertEquals(dataLength, blockOutputStream.getTotalDataFlushedLength());
- assertEquals(0, blockOutputStream.getTotalAckDataLength());
-
- // Before flush, if there was no pending PutBlock which means it is
complete.
- // It put a commit index into commitIndexMap.
- assertEquals((metrics.getPendingContainerOpCountMetrics(PutBlock) ==
pendingPutBlockCount) ? 1 : 0,
- blockOutputStream.getCommitIndex2flushedDataMap().size());
-
- // Now do a flush.
- key.flush();
-
- assertEquals(1, keyOutputStream.getStreamEntries().size());
- // The previously written data is equal to flushSize, so no action is
- // triggered when execute flush, if flushDelay is enabled.
- // If flushDelay is disabled, it will call waitOnFlushFutures to wait all
- // putBlocks finished. It was broken because WriteChunk and PutBlock
- // can be complete regardless of whether the flush executed or not.
- if (flushDelay) {
+ KeyOutputStream keyOutputStream;
+ RatisBlockOutputStream blockOutputStream;
+ try (OzoneOutputStream key = createKey(client, keyName)) {
+ key.write(data1);
+
+ assertEquals(writeChunkCount + 2,
+ metrics.getContainerOpCountMetrics(WriteChunk));
+ assertEquals(putBlockCount + 1,
+ metrics.getContainerOpCountMetrics(PutBlock));
+ // The WriteChunk and PutBlock can be completed soon.
assertThat(metrics.getPendingContainerOpCountMetrics(WriteChunk))
.isLessThanOrEqualTo(pendingWriteChunkCount + 2);
assertThat(metrics.getPendingContainerOpCountMetrics(PutBlock))
- .isLessThanOrEqualTo(pendingWriteChunkCount + 1);
- } else {
- assertEquals(pendingWriteChunkCount,
- metrics.getPendingContainerOpCountMetrics(WriteChunk));
- assertEquals(pendingPutBlockCount,
- metrics.getPendingContainerOpCountMetrics(PutBlock));
- }
+ .isLessThanOrEqualTo(pendingPutBlockCount + 1);
+
+ keyOutputStream =
+ assertInstanceOf(KeyOutputStream.class, key.getOutputStream());
+ assertEquals(1, keyOutputStream.getStreamEntries().size());
+ blockOutputStream =
+ assertInstanceOf(RatisBlockOutputStream.class,
+ keyOutputStream.getStreamEntries().get(0).getOutputStream());
+
+ // we have just written data equal flush Size = 2 chunks, at this time
+ // buffer pool will have 2 buffers allocated worth of chunk size
+
+ assertEquals(2, blockOutputStream.getBufferPool().getSize());
+ // writtenDataLength as well flushedDataLength will be updated here
+ assertEquals(dataLength, blockOutputStream.getWrittenDataLength());
+
+ assertEquals(dataLength,
blockOutputStream.getTotalDataFlushedLength());
+ assertEquals(0, blockOutputStream.getTotalAckDataLength());
+
+ // Before flush, if there was no pending PutBlock which means it is
complete.
+ // It put a commit index into commitIndexMap.
+ assertEquals((metrics.getPendingContainerOpCountMetrics(PutBlock) ==
pendingPutBlockCount) ? 1 : 0,
+ blockOutputStream.getCommitIndex2flushedDataMap().size());
+
+ // Now do a flush.
+ key.flush();
+
+ assertEquals(1, keyOutputStream.getStreamEntries().size());
+ // The previously written data is equal to flushSize, so no action is
+ // triggered when execute flush, if flushDelay is enabled.
+ // If flushDelay is disabled, it will call waitOnFlushFutures to wait
all
+ // putBlocks finished. It was broken because WriteChunk and PutBlock
+ // can be complete regardless of whether the flush executed or not.
+ if (flushDelay) {
+ assertThat(metrics.getPendingContainerOpCountMetrics(WriteChunk))
+ .isLessThanOrEqualTo(pendingWriteChunkCount + 2);
+ assertThat(metrics.getPendingContainerOpCountMetrics(PutBlock))
+ .isLessThanOrEqualTo(pendingWriteChunkCount + 1);
+ } else {
+ assertEquals(pendingWriteChunkCount,
+ metrics.getPendingContainerOpCountMetrics(WriteChunk));
+ assertEquals(pendingPutBlockCount,
+ metrics.getPendingContainerOpCountMetrics(PutBlock));
+ }
+
+ // Since the data in the buffer is already flushed, flush here will
have
+ // no impact on the counters and data structures
+ assertEquals(2, blockOutputStream.getBufferPool().getSize());
+
+ // No action is triggered when execute flush, BlockOutputStream will
not
+ // be updated.
+ assertEquals(flushDelay ? dataLength : 0,
+ blockOutputStream.getBufferPool().computeBufferData());
+ assertEquals(dataLength, blockOutputStream.getWrittenDataLength());
+ assertEquals(dataLength,
blockOutputStream.getTotalDataFlushedLength());
+ // If the flushDelay feature is enabled, nothing happens.
+ // The assertions will be as same as those before flush.
+ // If it flushed, the Commit index will be removed.
+ assertEquals((flushDelay &&
+ (metrics.getPendingContainerOpCountMetrics(PutBlock) ==
pendingPutBlockCount)) ? 1 : 0,
+ blockOutputStream.getCommitIndex2flushedDataMap().size());
+ assertEquals(flushDelay ? 0 : dataLength,
+ blockOutputStream.getTotalAckDataLength());
- // Since the data in the buffer is already flushed, flush here will have
- // no impact on the counters and data structures
- assertEquals(2, blockOutputStream.getBufferPool().getSize());
-
- // No action is triggered when execute flush, BlockOutputStream will not
- // be updated.
- assertEquals(flushDelay ? dataLength : 0,
- blockOutputStream.getBufferPool().computeBufferData());
- assertEquals(dataLength, blockOutputStream.getWrittenDataLength());
- assertEquals(dataLength, blockOutputStream.getTotalDataFlushedLength());
- // If the flushDelay feature is enabled, nothing happens.
- // The assertions will be as same as those before flush.
- // If it flushed, the Commit index will be removed.
- assertEquals((flushDelay &&
- (metrics.getPendingContainerOpCountMetrics(PutBlock) ==
pendingPutBlockCount)) ? 1 : 0,
- blockOutputStream.getCommitIndex2flushedDataMap().size());
- assertEquals(flushDelay ? 0 : dataLength,
- blockOutputStream.getTotalAckDataLength());
-
- // now close the stream, It will update ack length after watchForCommit
- key.close();
+ // now close the stream, It will update ack length after watchForCommit
+ key.close();
Review Comment:
Redundant 'close()'
##########
hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStreamWithFailures.java:
##########
@@ -429,52 +433,53 @@ private void testWriteMoreThanMaxFlushSize(OzoneClient
client)
private void testExceptionDuringClose(OzoneClient client) throws Exception {
String keyName = getKeyName();
- OzoneOutputStream key = createKey(client, keyName);
int dataLength = 167;
byte[] data1 = RandomUtils.secure().randomBytes(dataLength);
- key.write(data1);
+ KeyOutputStream keyOutputStream;
+ RatisBlockOutputStream blockOutputStream;
+ try (OzoneOutputStream key = createKey(client, keyName)) {
+ key.write(data1);
- KeyOutputStream keyOutputStream =
- assertInstanceOf(KeyOutputStream.class, key.getOutputStream());
+ keyOutputStream = assertInstanceOf(KeyOutputStream.class,
key.getOutputStream());
- assertEquals(1, keyOutputStream.getStreamEntries().size());
- RatisBlockOutputStream blockOutputStream =
- assertInstanceOf(RatisBlockOutputStream.class,
- keyOutputStream.getStreamEntries().get(0).getOutputStream());
+ assertEquals(1, keyOutputStream.getStreamEntries().size());
+ blockOutputStream = assertInstanceOf(RatisBlockOutputStream.class,
+ keyOutputStream.getStreamEntries().get(0).getOutputStream());
- assertThat(blockOutputStream.getBufferPool().getSize())
- .isLessThanOrEqualTo(2);
- assertEquals(dataLength, blockOutputStream.getWrittenDataLength());
+ assertThat(blockOutputStream.getBufferPool().getSize())
+ .isLessThanOrEqualTo(2);
+ assertEquals(dataLength, blockOutputStream.getWrittenDataLength());
- assertEquals(0, blockOutputStream.getTotalDataFlushedLength());
+ assertEquals(0, blockOutputStream.getTotalDataFlushedLength());
- assertEquals(0, blockOutputStream.getTotalAckDataLength());
+ assertEquals(0, blockOutputStream.getTotalAckDataLength());
- assertEquals(0, blockOutputStream.getCommitIndex2flushedDataMap().size());
- // This will flush the data and update the flush length and the map.
- key.flush();
- // Since the data in the buffer is already flushed, flush here will have
- // no impact on the counters and data structures
+ assertEquals(0,
blockOutputStream.getCommitIndex2flushedDataMap().size());
+ // This will flush the data and update the flush length and the map.
+ key.flush();
+ // Since the data in the buffer is already flushed, flush here will have
+ // no impact on the counters and data structures
- assertThat(blockOutputStream.getBufferPool().getSize())
- .isLessThanOrEqualTo(2);
- assertEquals(dataLength, blockOutputStream.getWrittenDataLength());
+ assertThat(blockOutputStream.getBufferPool().getSize())
+ .isLessThanOrEqualTo(2);
+ assertEquals(dataLength, blockOutputStream.getWrittenDataLength());
- assertEquals(dataLength, blockOutputStream.getTotalDataFlushedLength());
- // flush will make sure one more entry gets updated in the map
- assertEquals(0, blockOutputStream.getCommitIndex2flushedDataMap().size());
+ assertEquals(dataLength, blockOutputStream.getTotalDataFlushedLength());
+ // flush will make sure one more entry gets updated in the map
+ assertEquals(0,
blockOutputStream.getCommitIndex2flushedDataMap().size());
- XceiverClientRatis raftClient =
- (XceiverClientRatis) blockOutputStream.getXceiverClient();
- assertEquals(3, raftClient.getCommitInfoMap().size());
- // Close the containers on the Datanode and write more data
- TestHelper.waitForContainerClose(key, cluster);
- key.write(data1);
+ XceiverClientRatis raftClient =
+ (XceiverClientRatis) blockOutputStream.getXceiverClient();
+ assertEquals(3, raftClient.getCommitInfoMap().size());
+ // Close the containers on the Datanode and write more data
+ TestHelper.waitForContainerClose(key, cluster);
+ key.write(data1);
- // commitInfoMap will remain intact as there is no server failure
- assertEquals(3, raftClient.getCommitInfoMap().size());
- // now close the stream, It will hit exception
- key.close();
+ // commitInfoMap will remain intact as there is no server failure
+ assertEquals(3, raftClient.getCommitInfoMap().size());
+ // now close the stream, It will hit exception
+ key.close();
Review Comment:
Redundant 'close()'
##########
hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStreamWithFailures.java:
##########
@@ -672,78 +678,78 @@ void testDatanodeFailureWithPreAllocation(boolean
flushDelay, boolean enablePigg
OzoneClientConfig config = newClientConfig(cluster.getConf(), flushDelay,
enablePiggybacking);
try (OzoneClient client = newClient(cluster.getConf(), config)) {
String keyName = getKeyName();
- OzoneOutputStream key =
- createKey(client, keyName, 3 * BLOCK_SIZE,
- ReplicationFactor.ONE);
int dataLength = MAX_FLUSH_SIZE + CHUNK_SIZE;
byte[] data1 = RandomUtils.secure().randomBytes(dataLength);
- key.write(data1);
- // since its hitting the full bufferCondition, it will call
watchForCommit
- // and completes at least putBlock for first flushSize worth of data
- KeyOutputStream keyOutputStream =
- assertInstanceOf(KeyOutputStream.class, key.getOutputStream());
+ KeyOutputStream keyOutputStream;
+ RatisBlockOutputStream blockOutputStream;
+ Pipeline pipeline;
+ try (OzoneOutputStream key = createKey(client, keyName, 3 * BLOCK_SIZE,
ReplicationFactor.ONE)) {
+ key.write(data1);
+ // since its hitting the full bufferCondition, it will call
watchForCommit
+ // and completes at least putBlock for first flushSize worth of data
+ keyOutputStream = assertInstanceOf(KeyOutputStream.class,
key.getOutputStream());
+
+ assertEquals(3, keyOutputStream.getStreamEntries().size());
+ blockOutputStream = assertInstanceOf(RatisBlockOutputStream.class,
+ keyOutputStream.getStreamEntries().get(0).getOutputStream());
- assertEquals(3, keyOutputStream.getStreamEntries().size());
- RatisBlockOutputStream blockOutputStream =
- assertInstanceOf(RatisBlockOutputStream.class,
- keyOutputStream.getStreamEntries().get(0).getOutputStream());
+ // we have just written data more than flush Size(2 chunks), at this
time
+ // buffer pool will have 3 buffers allocated worth of chunk size
- // we have just written data more than flush Size(2 chunks), at this time
- // buffer pool will have 3 buffers allocated worth of chunk size
+ assertEquals(4, blockOutputStream.getBufferPool().getSize());
+ // writtenDataLength as well flushedDataLength will be updated here
+ assertEquals(dataLength, blockOutputStream.getWrittenDataLength());
- assertEquals(4, blockOutputStream.getBufferPool().getSize());
- // writtenDataLength as well flushedDataLength will be updated here
- assertEquals(dataLength, blockOutputStream.getWrittenDataLength());
+ assertEquals(MAX_FLUSH_SIZE,
+ blockOutputStream.getTotalDataFlushedLength());
- assertEquals(MAX_FLUSH_SIZE,
- blockOutputStream.getTotalDataFlushedLength());
+ // since data equals to maxBufferSize is written, this will be a
blocking
+ // call and hence will wait for atleast flushSize worth of data to get
+ // ack'd by all servers right here
+ assertThat(blockOutputStream.getTotalAckDataLength())
+ .isGreaterThanOrEqualTo(FLUSH_SIZE);
- // since data equals to maxBufferSize is written, this will be a blocking
- // call and hence will wait for atleast flushSize worth of data to get
- // ack'd by all servers right here
- assertThat(blockOutputStream.getTotalAckDataLength())
- .isGreaterThanOrEqualTo(FLUSH_SIZE);
+ // watchForCommit will clean up atleast flushSize worth of data buffer
+ // where each entry corresponds to flushSize worth of data
+ assertThat(blockOutputStream.getCommitIndex2flushedDataMap().size())
+ .isLessThanOrEqualTo(2);
- // watchForCommit will clean up atleast flushSize worth of data buffer
- // where each entry corresponds to flushSize worth of data
- assertThat(blockOutputStream.getCommitIndex2flushedDataMap().size())
- .isLessThanOrEqualTo(2);
+ // This will flush the data and update the flush length and
+ // the map.
+ key.flush();
- // This will flush the data and update the flush length and
- // the map.
- key.flush();
+ // Since the data in the buffer is already flushed, flush here will
have
+ // no impact on the counters and data structures
- // Since the data in the buffer is already flushed, flush here will have
- // no impact on the counters and data structures
+ assertEquals(4, blockOutputStream.getBufferPool().getSize());
+ assertEquals(dataLength, blockOutputStream.getWrittenDataLength());
- assertEquals(4, blockOutputStream.getBufferPool().getSize());
- assertEquals(dataLength, blockOutputStream.getWrittenDataLength());
+ assertEquals(dataLength,
blockOutputStream.getTotalDataFlushedLength());
+ // flush will make sure one more entry gets updated in the map
+ assertEquals(0,
blockOutputStream.getCommitIndex2flushedDataMap().size());
- assertEquals(dataLength, blockOutputStream.getTotalDataFlushedLength());
- // flush will make sure one more entry gets updated in the map
- assertEquals(0,
blockOutputStream.getCommitIndex2flushedDataMap().size());
+ XceiverClientRatis raftClient =
+ (XceiverClientRatis) blockOutputStream.getXceiverClient();
+ assertEquals(1, raftClient.getCommitInfoMap().size());
+ pipeline = raftClient.getPipeline();
+ cluster.shutdownHddsDatanode(pipeline.getNodes().get(0));
- XceiverClientRatis raftClient =
- (XceiverClientRatis) blockOutputStream.getXceiverClient();
- assertEquals(1, raftClient.getCommitInfoMap().size());
- Pipeline pipeline = raftClient.getPipeline();
- cluster.shutdownHddsDatanode(pipeline.getNodes().get(0));
+ // again write data with more than max buffer limit. This will call
+ // watchForCommit again. No write will happen and
- // again write data with more than max buffer limit. This will call
- // watchForCommit again. No write will happen and
+ key.write(data1);
+ key.flush();
- key.write(data1);
- key.flush();
-
- assertInstanceOf(RaftRetryFailureException.class,
- checkForException(blockOutputStream.getIoException()));
+ assertInstanceOf(RaftRetryFailureException.class,
+ checkForException(blockOutputStream.getIoException()));
- // Make sure the retryCount is reset after the exception is handled
- assertEquals(0, keyOutputStream.getRetryCount());
- assertEquals(1, raftClient.getCommitInfoMap().size());
+ // Make sure the retryCount is reset after the exception is handled
+ assertEquals(0, keyOutputStream.getRetryCount());
+ assertEquals(1, raftClient.getCommitInfoMap().size());
- // now close the stream, It will update ack length after watchForCommit
- key.close();
+ // now close the stream, It will update ack length after watchForCommit
+ key.close();
Review Comment:
Redundant 'close()'
##########
hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStream.java:
##########
@@ -411,57 +418,61 @@ void testWriteMoreThanChunkSize(boolean flushDelay,
boolean enablePiggybacking)
PutBlock);
long totalOpCount = metrics.getTotalOpCount();
String keyName = getKeyName();
- OzoneOutputStream key = createKey(client, keyName);
// write data more than 1 chunk
int dataLength = CHUNK_SIZE + 50;
byte[] data1 = RandomUtils.secure().randomBytes(dataLength);
- key.write(data1);
- assertEquals(totalOpCount + 1, metrics.getTotalOpCount());
- KeyOutputStream keyOutputStream =
- assertInstanceOf(KeyOutputStream.class, key.getOutputStream());
+ KeyOutputStream keyOutputStream;
+ RatisBlockOutputStream blockOutputStream;
+ BufferPool bufferPool;
+ try (OzoneOutputStream key = createKey(client, keyName)) {
+ key.write(data1);
+ assertEquals(totalOpCount + 1, metrics.getTotalOpCount());
+ keyOutputStream =
+ assertInstanceOf(KeyOutputStream.class, key.getOutputStream());
- assertEquals(1, keyOutputStream.getStreamEntries().size());
- RatisBlockOutputStream blockOutputStream =
- assertInstanceOf(RatisBlockOutputStream.class,
- keyOutputStream.getStreamEntries().get(0).getOutputStream());
+ assertEquals(1, keyOutputStream.getStreamEntries().size());
+ blockOutputStream =
+ assertInstanceOf(RatisBlockOutputStream.class,
+ keyOutputStream.getStreamEntries().get(0).getOutputStream());
- // we have just written data equal flush Size > 1 chunk, at this time
- // buffer pool will have 2 buffers allocated worth of chunk size
+ // we have just written data equal flush Size > 1 chunk, at this time
+ // buffer pool will have 2 buffers allocated worth of chunk size
- BufferPool bufferPool = blockOutputStream.getBufferPool();
- assertEquals(2, bufferPool.getSize());
- // writtenDataLength as well flushedDataLength will be updated here
- assertEquals(dataLength, blockOutputStream.getWrittenDataLength());
+ bufferPool = blockOutputStream.getBufferPool();
+ assertEquals(2, bufferPool.getSize());
+ // writtenDataLength as well flushedDataLength will be updated here
+ assertEquals(dataLength, blockOutputStream.getWrittenDataLength());
- // since data written is still less than flushLength, flushLength will
- // still be 0.
- assertEquals(0, blockOutputStream.getTotalDataFlushedLength());
- assertEquals(0, blockOutputStream.getTotalAckDataLength());
+ // since data written is still less than flushLength, flushLength will
+ // still be 0.
+ assertEquals(0, blockOutputStream.getTotalDataFlushedLength());
+ assertEquals(0, blockOutputStream.getTotalAckDataLength());
- assertEquals(0,
blockOutputStream.getCommitIndex2flushedDataMap().size());
+ assertEquals(0,
blockOutputStream.getCommitIndex2flushedDataMap().size());
- // This will flush the data and update the flush length and the map.
- key.flush();
- assertEquals(writeChunkCount + 2,
- metrics.getContainerOpCountMetrics(WriteChunk));
- assertEquals(putBlockCount + ((enablePiggybacking) ? 0 : 1),
- metrics.getContainerOpCountMetrics(PutBlock));
- assertEquals(pendingWriteChunkCount,
- metrics.getPendingContainerOpCountMetrics(WriteChunk));
- assertEquals(pendingPutBlockCount,
- metrics.getPendingContainerOpCountMetrics(PutBlock));
+ // This will flush the data and update the flush length and the map.
+ key.flush();
+ assertEquals(writeChunkCount + 2,
+ metrics.getContainerOpCountMetrics(WriteChunk));
+ assertEquals(putBlockCount + ((enablePiggybacking) ? 0 : 1),
+ metrics.getContainerOpCountMetrics(PutBlock));
+ assertEquals(pendingWriteChunkCount,
+ metrics.getPendingContainerOpCountMetrics(WriteChunk));
+ assertEquals(pendingPutBlockCount,
+ metrics.getPendingContainerOpCountMetrics(PutBlock));
- assertEquals(2, bufferPool.getSize());
- assertEquals(dataLength, blockOutputStream.getWrittenDataLength());
+ assertEquals(2, bufferPool.getSize());
+ assertEquals(dataLength, blockOutputStream.getWrittenDataLength());
- assertEquals(dataLength, blockOutputStream.getTotalDataFlushedLength());
- assertEquals(0,
blockOutputStream.getCommitIndex2flushedDataMap().size());
+ assertEquals(dataLength,
blockOutputStream.getTotalDataFlushedLength());
+ assertEquals(0,
blockOutputStream.getCommitIndex2flushedDataMap().size());
- // flush ensures watchForCommit updates the total length acknowledged
- assertEquals(dataLength, blockOutputStream.getTotalAckDataLength());
+ // flush ensures watchForCommit updates the total length acknowledged
+ assertEquals(dataLength, blockOutputStream.getTotalAckDataLength());
- // now close the stream, It will update ack length after watchForCommit
- key.close();
+ // now close the stream, It will update ack length after watchForCommit
+ key.close();
Review Comment:
Redundant 'close()'
##########
hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStreamWithFailures.java:
##########
@@ -493,165 +498,166 @@ private void testExceptionDuringClose(OzoneClient
client) throws Exception {
private void testWatchForCommitWithSingleNodeRatis(OzoneClient client)
throws Exception {
String keyName = getKeyName();
- OzoneOutputStream key =
- createKey(client, keyName, 0, ReplicationFactor.ONE);
int dataLength = MAX_FLUSH_SIZE + CHUNK_SIZE;
byte[] data1 = RandomUtils.secure().randomBytes(dataLength);
- key.write(data1);
-
- KeyOutputStream keyOutputStream =
- assertInstanceOf(KeyOutputStream.class, key.getOutputStream());
-
- assertEquals(1, keyOutputStream.getStreamEntries().size());
- RatisBlockOutputStream blockOutputStream =
- assertInstanceOf(RatisBlockOutputStream.class,
- keyOutputStream.getStreamEntries().get(0).getOutputStream());
-
- // we have just written data more than flush Size(2 chunks), at this time
- // buffer pool will have up to 4 buffers allocated worth of chunk size
-
- assertThat(blockOutputStream.getBufferPool().getSize())
- .isLessThanOrEqualTo(4);
- // writtenDataLength as well flushedDataLength will be updated here
- assertEquals(dataLength, blockOutputStream.getWrittenDataLength());
-
- assertEquals(MAX_FLUSH_SIZE,
blockOutputStream.getTotalDataFlushedLength());
-
- // since data equals to maxBufferSize is written, this will be a blocking
- // call and hence will wait for atleast flushSize worth of data to get
- // ack'd by all servers right here
- assertThat(blockOutputStream.getTotalAckDataLength())
- .isGreaterThanOrEqualTo(FLUSH_SIZE);
-
- // watchForCommit will clean up atleast one entry from the map where each
- // entry corresponds to flushSize worth of data
- assertThat(blockOutputStream.getCommitIndex2flushedDataMap().size())
- .isLessThanOrEqualTo(1);
-
- // This will flush the data and update the flush length and the map.
- key.flush();
-
- // Since the data in the buffer is already flushed, flush here will have
- // no impact on the counters and data structures
-
- assertThat(blockOutputStream.getBufferPool().getSize())
- .isLessThanOrEqualTo(4);
- assertEquals(dataLength, blockOutputStream.getWrittenDataLength());
-
- assertEquals(dataLength, blockOutputStream.getTotalDataFlushedLength());
- // flush will make sure one more entry gets updated in the map
- assertThat(blockOutputStream.getCommitIndex2flushedDataMap().size())
- .isLessThanOrEqualTo(2);
-
- XceiverClientRatis raftClient =
- (XceiverClientRatis) blockOutputStream.getXceiverClient();
- assertEquals(1, raftClient.getCommitInfoMap().size());
- // Close the containers on the Datanode and write more data
- TestHelper.waitForContainerClose(key, cluster);
- // 4 writeChunks = maxFlushSize + 2 putBlocks will be discarded here
- // once exception is hit
- key.write(data1);
-
- // As a part of handling the exception, 4 failed writeChunks will be
- // rewritten plus one partial chunk plus two putBlocks for flushSize
- // and one flush for partial chunk
- key.flush();
-
- assertInstanceOf(ContainerNotOpenException.class,
- checkForException(blockOutputStream.getIoException()));
- // Make sure the retryCount is reset after the exception is handled
- assertEquals(0, keyOutputStream.getRetryCount());
- // commitInfoMap will remain intact as there is no server failure
- assertEquals(1, raftClient.getCommitInfoMap().size());
- assertEquals(2, keyOutputStream.getStreamEntries().size());
- // now close the stream, It will update ack length after watchForCommit
- key.close();
- // make sure the bufferPool is empty
- assertEquals(0, blockOutputStream.getBufferPool().computeBufferData());
- assertEquals(dataLength, blockOutputStream.getTotalAckDataLength());
- assertEquals(0, blockOutputStream.getCommitIndex2flushedDataMap().size());
- assertEquals(0, keyOutputStream.getLocationInfoList().size());
- // Written the same data twice
- byte[] bytes = ArrayUtils.addAll(data1, data1);
- validateData(keyName, bytes, client.getObjectStore(), VOLUME, BUCKET);
- }
-
- @ParameterizedTest
- @MethodSource("clientParameters")
- void testDatanodeFailureWithSingleNode(boolean flushDelay, boolean
enablePiggybacking) throws Exception {
- OzoneClientConfig config = newClientConfig(cluster.getConf(), flushDelay,
enablePiggybacking);
- try (OzoneClient client = newClient(cluster.getConf(), config)) {
- String keyName = getKeyName();
- OzoneOutputStream key =
- createKey(client, keyName, 0, ReplicationFactor.ONE);
- int dataLength = MAX_FLUSH_SIZE + CHUNK_SIZE;
- byte[] data1 = RandomUtils.secure().randomBytes(dataLength);
+ KeyOutputStream keyOutputStream;
+ RatisBlockOutputStream blockOutputStream;
+ try (OzoneOutputStream key = createKey(client, keyName, 0,
ReplicationFactor.ONE)) {
key.write(data1);
- // since its hitting the full bufferCondition, it will call
watchForCommit
- // and completes at least putBlock for first flushSize worth of data
- KeyOutputStream keyOutputStream =
- assertInstanceOf(KeyOutputStream.class, key.getOutputStream());
+
+ keyOutputStream = assertInstanceOf(KeyOutputStream.class,
key.getOutputStream());
assertEquals(1, keyOutputStream.getStreamEntries().size());
- RatisBlockOutputStream blockOutputStream =
- assertInstanceOf(RatisBlockOutputStream.class,
- keyOutputStream.getStreamEntries().get(0).getOutputStream());
+ blockOutputStream = assertInstanceOf(RatisBlockOutputStream.class,
+ keyOutputStream.getStreamEntries().get(0).getOutputStream());
// we have just written data more than flush Size(2 chunks), at this time
- // buffer pool will have 3 buffers allocated worth of chunk size
+ // buffer pool will have up to 4 buffers allocated worth of chunk size
- assertEquals(4, blockOutputStream.getBufferPool().getSize());
+ assertThat(blockOutputStream.getBufferPool().getSize())
+ .isLessThanOrEqualTo(4);
// writtenDataLength as well flushedDataLength will be updated here
assertEquals(dataLength, blockOutputStream.getWrittenDataLength());
- assertEquals(MAX_FLUSH_SIZE,
- blockOutputStream.getTotalDataFlushedLength());
+ assertEquals(MAX_FLUSH_SIZE,
blockOutputStream.getTotalDataFlushedLength());
// since data equals to maxBufferSize is written, this will be a blocking
// call and hence will wait for atleast flushSize worth of data to get
// ack'd by all servers right here
assertThat(blockOutputStream.getTotalAckDataLength())
.isGreaterThanOrEqualTo(FLUSH_SIZE);
- // watchForCommit will clean up atleast flushSize worth of data buffer
- // where each entry corresponds to flushSize worth of data
+ // watchForCommit will clean up atleast one entry from the map where each
+ // entry corresponds to flushSize worth of data
assertThat(blockOutputStream.getCommitIndex2flushedDataMap().size())
- .isLessThanOrEqualTo(2);
+ .isLessThanOrEqualTo(1);
// This will flush the data and update the flush length and the map.
key.flush();
// Since the data in the buffer is already flushed, flush here will have
// no impact on the counters and data structures
- assertEquals(4, blockOutputStream.getBufferPool().getSize());
+ assertThat(blockOutputStream.getBufferPool().getSize())
+ .isLessThanOrEqualTo(4);
assertEquals(dataLength, blockOutputStream.getWrittenDataLength());
assertEquals(dataLength, blockOutputStream.getTotalDataFlushedLength());
- // flush will make sure one more entry gets updated in the map
- assertEquals(0,
blockOutputStream.getCommitIndex2flushedDataMap().size());
+ // flush will make sure one more entry gets updated in the map
+ assertThat(blockOutputStream.getCommitIndex2flushedDataMap().size())
+ .isLessThanOrEqualTo(2);
XceiverClientRatis raftClient =
(XceiverClientRatis) blockOutputStream.getXceiverClient();
assertEquals(1, raftClient.getCommitInfoMap().size());
- Pipeline pipeline = raftClient.getPipeline();
- cluster.shutdownHddsDatanode(pipeline.getNodes().get(0));
-
- // again write data with more than max buffer limit. This will call
- // watchForCommit again. No write will happen in the current block and
- // data will be rewritten to the next block.
-
+ // Close the containers on the Datanode and write more data
+ TestHelper.waitForContainerClose(key, cluster);
+ // 4 writeChunks = maxFlushSize + 2 putBlocks will be discarded here
+ // once exception is hit
key.write(data1);
+
+ // As a part of handling the exception, 4 failed writeChunks will be
+ // rewritten plus one partial chunk plus two putBlocks for flushSize
+ // and one flush for partial chunk
key.flush();
- assertInstanceOf(RaftRetryFailureException.class,
+ assertInstanceOf(ContainerNotOpenException.class,
checkForException(blockOutputStream.getIoException()));
- assertEquals(1, raftClient.getCommitInfoMap().size());
// Make sure the retryCount is reset after the exception is handled
assertEquals(0, keyOutputStream.getRetryCount());
+ // commitInfoMap will remain intact as there is no server failure
+ assertEquals(1, raftClient.getCommitInfoMap().size());
assertEquals(2, keyOutputStream.getStreamEntries().size());
// now close the stream, It will update ack length after watchForCommit
key.close();
+ }
+ // make sure the bufferPool is empty
+ assertEquals(0, blockOutputStream.getBufferPool().computeBufferData());
+ assertEquals(dataLength, blockOutputStream.getTotalAckDataLength());
+ assertEquals(0, blockOutputStream.getCommitIndex2flushedDataMap().size());
+ assertEquals(0, keyOutputStream.getLocationInfoList().size());
+ // Written the same data twice
+ byte[] bytes = ArrayUtils.addAll(data1, data1);
+ validateData(keyName, bytes, client.getObjectStore(), VOLUME, BUCKET);
+ }
+
+ @ParameterizedTest
+ @MethodSource("clientParameters")
+ void testDatanodeFailureWithSingleNode(boolean flushDelay, boolean
enablePiggybacking) throws Exception {
+ OzoneClientConfig config = newClientConfig(cluster.getConf(), flushDelay,
enablePiggybacking);
+ try (OzoneClient client = newClient(cluster.getConf(), config)) {
+ String keyName = getKeyName();
+ int dataLength = MAX_FLUSH_SIZE + CHUNK_SIZE;
+ byte[] data1 = RandomUtils.secure().randomBytes(dataLength);
+ KeyOutputStream keyOutputStream;
+ RatisBlockOutputStream blockOutputStream;
+ Pipeline pipeline;
+ try (OzoneOutputStream key = createKey(client, keyName, 0,
ReplicationFactor.ONE)) {
+ key.write(data1);
+ // since its hitting the full bufferCondition, it will call
watchForCommit
+ // and completes at least putBlock for first flushSize worth of data
+ keyOutputStream = assertInstanceOf(KeyOutputStream.class,
key.getOutputStream());
+
+ assertEquals(1, keyOutputStream.getStreamEntries().size());
+ blockOutputStream = assertInstanceOf(RatisBlockOutputStream.class,
+ keyOutputStream.getStreamEntries().get(0).getOutputStream());
+
+ // we have just written data more than flush Size(2 chunks), at this
time
+ // buffer pool will have 3 buffers allocated worth of chunk size
+
+ assertEquals(4, blockOutputStream.getBufferPool().getSize());
+ // writtenDataLength as well flushedDataLength will be updated here
+ assertEquals(dataLength, blockOutputStream.getWrittenDataLength());
+
+ assertEquals(MAX_FLUSH_SIZE,
+ blockOutputStream.getTotalDataFlushedLength());
+
+ // since data equals to maxBufferSize is written, this will be a
blocking
+ // call and hence will wait for atleast flushSize worth of data to get
+ // ack'd by all servers right here
+ assertThat(blockOutputStream.getTotalAckDataLength())
+ .isGreaterThanOrEqualTo(FLUSH_SIZE);
+
+ // watchForCommit will clean up atleast flushSize worth of data buffer
+ // where each entry corresponds to flushSize worth of data
+ assertThat(blockOutputStream.getCommitIndex2flushedDataMap().size())
+ .isLessThanOrEqualTo(2);
+
+ // This will flush the data and update the flush length and the map.
+ key.flush();
+
+ // Since the data in the buffer is already flushed, flush here will
have
+ // no impact on the counters and data structures
+
+ assertEquals(4, blockOutputStream.getBufferPool().getSize());
+ assertEquals(dataLength, blockOutputStream.getWrittenDataLength());
+
+ assertEquals(dataLength,
blockOutputStream.getTotalDataFlushedLength());
+ // flush will make sure one more entry gets updated in the map
+ assertEquals(0,
blockOutputStream.getCommitIndex2flushedDataMap().size());
+
+ XceiverClientRatis raftClient =
+ (XceiverClientRatis) blockOutputStream.getXceiverClient();
+ assertEquals(1, raftClient.getCommitInfoMap().size());
+ pipeline = raftClient.getPipeline();
+ cluster.shutdownHddsDatanode(pipeline.getNodes().get(0));
+
+ // again write data with more than max buffer limit. This will call
+ // watchForCommit again. No write will happen in the current block and
+ // data will be rewritten to the next block.
+
+ key.write(data1);
+ key.flush();
+
+ assertInstanceOf(RaftRetryFailureException.class,
+ checkForException(blockOutputStream.getIoException()));
+ assertEquals(1, raftClient.getCommitInfoMap().size());
+ // Make sure the retryCount is reset after the exception is handled
+ assertEquals(0, keyOutputStream.getRetryCount());
+ assertEquals(2, keyOutputStream.getStreamEntries().size());
+ // now close the stream, It will update ack length after watchForCommit
+ key.close();
Review Comment:
Redundant 'close()'
##########
hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStream.java:
##########
@@ -590,65 +601,68 @@ void testWriteExactlyMaxFlushSize(boolean flushDelay,
boolean enablePiggybacking
long totalOpCount = metrics.getTotalOpCount();
String keyName = getKeyName();
- OzoneOutputStream key = createKey(client, keyName);
int dataLength = MAX_FLUSH_SIZE;
byte[] data1 = RandomUtils.secure().randomBytes(dataLength);
- key.write(data1);
-
- KeyOutputStream keyOutputStream =
- assertInstanceOf(KeyOutputStream.class, key.getOutputStream());
- RatisBlockOutputStream blockOutputStream =
- assertInstanceOf(RatisBlockOutputStream.class,
- keyOutputStream.getStreamEntries().get(0).getOutputStream());
- BufferPool bufferPool = blockOutputStream.getBufferPool();
- // since it's hitting the full bufferCondition, it will call
watchForCommit
- // however, the outputstream will not wait for watchForCommit, but the
next call to
- // write() will need to wait for at least one watchForCommit, indirectly
when asking for new buffer allocation.
- bufferPool.waitUntilAvailable();
-
- assertThat(metrics.getPendingContainerOpCountMetrics(WriteChunk))
- .isLessThanOrEqualTo(pendingWriteChunkCount + 2);
- assertThat(metrics.getPendingContainerOpCountMetrics(PutBlock))
- .isLessThanOrEqualTo(pendingPutBlockCount + 1);
-
- assertEquals(1, keyOutputStream.getStreamEntries().size());
-
- assertEquals(4, blockOutputStream.getBufferPool().getSize());
- // writtenDataLength as well flushedDataLength will be updated here
- assertEquals(dataLength, blockOutputStream.getWrittenDataLength());
-
- assertEquals(MAX_FLUSH_SIZE,
- blockOutputStream.getTotalDataFlushedLength());
-
- // since data equals to maxBufferSize is written, this will be a blocking
- // call and hence will wait for atleast flushSize worth of data to get
- // ack'd by all servers right here
- assertThat(blockOutputStream.getTotalAckDataLength())
- .isGreaterThanOrEqualTo(FLUSH_SIZE);
-
- // watchForCommit will clean up atleast one entry from the map where each
- // entry corresponds to flushSize worth of data
-
- assertThat(blockOutputStream.getCommitIndex2flushedDataMap().size())
- .isLessThanOrEqualTo(1);
-
- // This will flush the data and update the flush length and the map.
- key.flush();
- assertEquals(1, keyOutputStream.getStreamEntries().size());
- assertEquals(totalOpCount + 6, metrics.getTotalOpCount());
-
- // Since the data in the buffer is already flushed, flush here will have
- // no impact on the counters and data structures
-
- assertEquals(4, blockOutputStream.getBufferPool().getSize());
- assertEquals(dataLength, blockOutputStream.getWrittenDataLength());
-
- assertEquals(dataLength, blockOutputStream.getTotalDataFlushedLength());
- assertThat(blockOutputStream.getCommitIndex2flushedDataMap().size())
- .isLessThanOrEqualTo(1);
-
- // now close the stream, it will update ack length after watchForCommit
- key.close();
+ KeyOutputStream keyOutputStream;
+ RatisBlockOutputStream blockOutputStream;
+ try (OzoneOutputStream key = createKey(client, keyName)) {
+ key.write(data1);
+
+ keyOutputStream =
+ assertInstanceOf(KeyOutputStream.class, key.getOutputStream());
+ blockOutputStream =
+ assertInstanceOf(RatisBlockOutputStream.class,
+ keyOutputStream.getStreamEntries().get(0).getOutputStream());
+ BufferPool bufferPool = blockOutputStream.getBufferPool();
+ // since it's hitting the full bufferCondition, it will call
watchForCommit
+ // however, the outputstream will not wait for watchForCommit, but the
next call to
+ // write() will need to wait for at least one watchForCommit,
indirectly when asking for new buffer allocation.
+ bufferPool.waitUntilAvailable();
+
+ assertThat(metrics.getPendingContainerOpCountMetrics(WriteChunk))
+ .isLessThanOrEqualTo(pendingWriteChunkCount + 2);
+ assertThat(metrics.getPendingContainerOpCountMetrics(PutBlock))
+ .isLessThanOrEqualTo(pendingPutBlockCount + 1);
+
+ assertEquals(1, keyOutputStream.getStreamEntries().size());
+
+ assertEquals(4, blockOutputStream.getBufferPool().getSize());
+ // writtenDataLength as well flushedDataLength will be updated here
+ assertEquals(dataLength, blockOutputStream.getWrittenDataLength());
+
+ assertEquals(MAX_FLUSH_SIZE,
+ blockOutputStream.getTotalDataFlushedLength());
+
+ // since data equals to maxBufferSize is written, this will be a
blocking
+ // call and hence will wait for atleast flushSize worth of data to get
+ // ack'd by all servers right here
+ assertThat(blockOutputStream.getTotalAckDataLength())
+ .isGreaterThanOrEqualTo(FLUSH_SIZE);
+
+ // watchForCommit will clean up atleast one entry from the map where
each
+ // entry corresponds to flushSize worth of data
+
+ assertThat(blockOutputStream.getCommitIndex2flushedDataMap().size())
+ .isLessThanOrEqualTo(1);
+
+ // This will flush the data and update the flush length and the map.
+ key.flush();
+ assertEquals(1, keyOutputStream.getStreamEntries().size());
+ assertEquals(totalOpCount + 6, metrics.getTotalOpCount());
+
+ // Since the data in the buffer is already flushed, flush here will
have
+ // no impact on the counters and data structures
+
+ assertEquals(4, blockOutputStream.getBufferPool().getSize());
+ assertEquals(dataLength, blockOutputStream.getWrittenDataLength());
+
+ assertEquals(dataLength,
blockOutputStream.getTotalDataFlushedLength());
+ assertThat(blockOutputStream.getCommitIndex2flushedDataMap().size())
+ .isLessThanOrEqualTo(1);
+
+ // now close the stream, it will update ack length after watchForCommit
+ key.close();
Review Comment:
Redundant 'close()'
##########
hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStreamWithFailures.java:
##########
@@ -372,51 +375,52 @@ void test2DatanodesFailure(boolean flushDelay, boolean
enablePiggybacking) throw
private void testWriteMoreThanMaxFlushSize(OzoneClient client)
throws Exception {
String keyName = getKeyName();
- OzoneOutputStream key = createKey(client, keyName);
int dataLength = MAX_FLUSH_SIZE + CHUNK_SIZE;
byte[] data1 = RandomUtils.secure().randomBytes(dataLength);
- key.write(data1);
+ KeyOutputStream keyOutputStream;
+ RatisBlockOutputStream blockOutputStream;
+ try (OzoneOutputStream key = createKey(client, keyName)) {
+ key.write(data1);
- KeyOutputStream keyOutputStream =
- assertInstanceOf(KeyOutputStream.class, key.getOutputStream());
+ keyOutputStream = assertInstanceOf(KeyOutputStream.class,
key.getOutputStream());
- assertEquals(1, keyOutputStream.getStreamEntries().size());
- RatisBlockOutputStream blockOutputStream =
- assertInstanceOf(RatisBlockOutputStream.class,
- keyOutputStream.getStreamEntries().get(0).getOutputStream());
+ assertEquals(1, keyOutputStream.getStreamEntries().size());
+ blockOutputStream = assertInstanceOf(RatisBlockOutputStream.class,
+ keyOutputStream.getStreamEntries().get(0).getOutputStream());
- assertThat(blockOutputStream.getBufferPool().getSize())
- .isLessThanOrEqualTo(4);
- assertEquals(dataLength, blockOutputStream.getWrittenDataLength());
+ assertThat(blockOutputStream.getBufferPool().getSize())
+ .isLessThanOrEqualTo(4);
+ assertEquals(dataLength, blockOutputStream.getWrittenDataLength());
- assertEquals(400, blockOutputStream.getTotalDataFlushedLength());
+ assertEquals(400, blockOutputStream.getTotalDataFlushedLength());
- // This will flush the data and update the flush length and the map.
- key.flush();
+ // This will flush the data and update the flush length and the map.
+ key.flush();
- assertEquals(dataLength, blockOutputStream.getTotalDataFlushedLength());
+ assertEquals(dataLength, blockOutputStream.getTotalDataFlushedLength());
- XceiverClientRatis raftClient =
- (XceiverClientRatis) blockOutputStream.getXceiverClient();
- assertEquals(3, raftClient.getCommitInfoMap().size());
- // Close the containers on the Datanode and write more data
- TestHelper.waitForContainerClose(key, cluster);
- key.write(data1);
+ XceiverClientRatis raftClient =
+ (XceiverClientRatis) blockOutputStream.getXceiverClient();
+ assertEquals(3, raftClient.getCommitInfoMap().size());
+ // Close the containers on the Datanode and write more data
+ TestHelper.waitForContainerClose(key, cluster);
+ key.write(data1);
- // As a part of handling the exception, 2 failed writeChunks will be
- // rewritten plus 1 putBlocks for flush
- // and one flush for partial chunk
- key.flush();
+ // As a part of handling the exception, 2 failed writeChunks will be
+ // rewritten plus 1 putBlocks for flush
+ // and one flush for partial chunk
+ key.flush();
- assertInstanceOf(ContainerNotOpenException.class,
- checkForException(blockOutputStream.getIoException()));
- // Make sure the retryCount is reset after the exception is handled
- assertEquals(0, keyOutputStream.getRetryCount());
+ assertInstanceOf(ContainerNotOpenException.class,
+ checkForException(blockOutputStream.getIoException()));
+ // Make sure the retryCount is reset after the exception is handled
+ assertEquals(0, keyOutputStream.getRetryCount());
- // commitInfoMap will remain intact as there is no server failure
- assertEquals(3, raftClient.getCommitInfoMap().size());
- // now close the stream, It will update ack length after watchForCommit
- key.close();
+ // commitInfoMap will remain intact as there is no server failure
+ assertEquals(3, raftClient.getCommitInfoMap().size());
+ // now close the stream, It will update ack length after watchForCommit
+ key.close();
Review Comment:
Redundant 'close()'
##########
hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStreamWithFailures.java:
##########
@@ -493,165 +498,166 @@ private void testExceptionDuringClose(OzoneClient
client) throws Exception {
private void testWatchForCommitWithSingleNodeRatis(OzoneClient client)
throws Exception {
String keyName = getKeyName();
- OzoneOutputStream key =
- createKey(client, keyName, 0, ReplicationFactor.ONE);
int dataLength = MAX_FLUSH_SIZE + CHUNK_SIZE;
byte[] data1 = RandomUtils.secure().randomBytes(dataLength);
- key.write(data1);
-
- KeyOutputStream keyOutputStream =
- assertInstanceOf(KeyOutputStream.class, key.getOutputStream());
-
- assertEquals(1, keyOutputStream.getStreamEntries().size());
- RatisBlockOutputStream blockOutputStream =
- assertInstanceOf(RatisBlockOutputStream.class,
- keyOutputStream.getStreamEntries().get(0).getOutputStream());
-
- // we have just written data more than flush Size(2 chunks), at this time
- // buffer pool will have up to 4 buffers allocated worth of chunk size
-
- assertThat(blockOutputStream.getBufferPool().getSize())
- .isLessThanOrEqualTo(4);
- // writtenDataLength as well flushedDataLength will be updated here
- assertEquals(dataLength, blockOutputStream.getWrittenDataLength());
-
- assertEquals(MAX_FLUSH_SIZE,
blockOutputStream.getTotalDataFlushedLength());
-
- // since data equals to maxBufferSize is written, this will be a blocking
- // call and hence will wait for atleast flushSize worth of data to get
- // ack'd by all servers right here
- assertThat(blockOutputStream.getTotalAckDataLength())
- .isGreaterThanOrEqualTo(FLUSH_SIZE);
-
- // watchForCommit will clean up atleast one entry from the map where each
- // entry corresponds to flushSize worth of data
- assertThat(blockOutputStream.getCommitIndex2flushedDataMap().size())
- .isLessThanOrEqualTo(1);
-
- // This will flush the data and update the flush length and the map.
- key.flush();
-
- // Since the data in the buffer is already flushed, flush here will have
- // no impact on the counters and data structures
-
- assertThat(blockOutputStream.getBufferPool().getSize())
- .isLessThanOrEqualTo(4);
- assertEquals(dataLength, blockOutputStream.getWrittenDataLength());
-
- assertEquals(dataLength, blockOutputStream.getTotalDataFlushedLength());
- // flush will make sure one more entry gets updated in the map
- assertThat(blockOutputStream.getCommitIndex2flushedDataMap().size())
- .isLessThanOrEqualTo(2);
-
- XceiverClientRatis raftClient =
- (XceiverClientRatis) blockOutputStream.getXceiverClient();
- assertEquals(1, raftClient.getCommitInfoMap().size());
- // Close the containers on the Datanode and write more data
- TestHelper.waitForContainerClose(key, cluster);
- // 4 writeChunks = maxFlushSize + 2 putBlocks will be discarded here
- // once exception is hit
- key.write(data1);
-
- // As a part of handling the exception, 4 failed writeChunks will be
- // rewritten plus one partial chunk plus two putBlocks for flushSize
- // and one flush for partial chunk
- key.flush();
-
- assertInstanceOf(ContainerNotOpenException.class,
- checkForException(blockOutputStream.getIoException()));
- // Make sure the retryCount is reset after the exception is handled
- assertEquals(0, keyOutputStream.getRetryCount());
- // commitInfoMap will remain intact as there is no server failure
- assertEquals(1, raftClient.getCommitInfoMap().size());
- assertEquals(2, keyOutputStream.getStreamEntries().size());
- // now close the stream, It will update ack length after watchForCommit
- key.close();
- // make sure the bufferPool is empty
- assertEquals(0, blockOutputStream.getBufferPool().computeBufferData());
- assertEquals(dataLength, blockOutputStream.getTotalAckDataLength());
- assertEquals(0, blockOutputStream.getCommitIndex2flushedDataMap().size());
- assertEquals(0, keyOutputStream.getLocationInfoList().size());
- // Written the same data twice
- byte[] bytes = ArrayUtils.addAll(data1, data1);
- validateData(keyName, bytes, client.getObjectStore(), VOLUME, BUCKET);
- }
-
- @ParameterizedTest
- @MethodSource("clientParameters")
- void testDatanodeFailureWithSingleNode(boolean flushDelay, boolean
enablePiggybacking) throws Exception {
- OzoneClientConfig config = newClientConfig(cluster.getConf(), flushDelay,
enablePiggybacking);
- try (OzoneClient client = newClient(cluster.getConf(), config)) {
- String keyName = getKeyName();
- OzoneOutputStream key =
- createKey(client, keyName, 0, ReplicationFactor.ONE);
- int dataLength = MAX_FLUSH_SIZE + CHUNK_SIZE;
- byte[] data1 = RandomUtils.secure().randomBytes(dataLength);
+ KeyOutputStream keyOutputStream;
+ RatisBlockOutputStream blockOutputStream;
+ try (OzoneOutputStream key = createKey(client, keyName, 0,
ReplicationFactor.ONE)) {
key.write(data1);
- // since its hitting the full bufferCondition, it will call
watchForCommit
- // and completes at least putBlock for first flushSize worth of data
- KeyOutputStream keyOutputStream =
- assertInstanceOf(KeyOutputStream.class, key.getOutputStream());
+
+ keyOutputStream = assertInstanceOf(KeyOutputStream.class,
key.getOutputStream());
assertEquals(1, keyOutputStream.getStreamEntries().size());
- RatisBlockOutputStream blockOutputStream =
- assertInstanceOf(RatisBlockOutputStream.class,
- keyOutputStream.getStreamEntries().get(0).getOutputStream());
+ blockOutputStream = assertInstanceOf(RatisBlockOutputStream.class,
+ keyOutputStream.getStreamEntries().get(0).getOutputStream());
// we have just written data more than flush Size(2 chunks), at this time
- // buffer pool will have 3 buffers allocated worth of chunk size
+ // buffer pool will have up to 4 buffers allocated worth of chunk size
- assertEquals(4, blockOutputStream.getBufferPool().getSize());
+ assertThat(blockOutputStream.getBufferPool().getSize())
+ .isLessThanOrEqualTo(4);
// writtenDataLength as well flushedDataLength will be updated here
assertEquals(dataLength, blockOutputStream.getWrittenDataLength());
- assertEquals(MAX_FLUSH_SIZE,
- blockOutputStream.getTotalDataFlushedLength());
+ assertEquals(MAX_FLUSH_SIZE,
blockOutputStream.getTotalDataFlushedLength());
// since data equals to maxBufferSize is written, this will be a blocking
// call and hence will wait for atleast flushSize worth of data to get
// ack'd by all servers right here
assertThat(blockOutputStream.getTotalAckDataLength())
.isGreaterThanOrEqualTo(FLUSH_SIZE);
- // watchForCommit will clean up atleast flushSize worth of data buffer
- // where each entry corresponds to flushSize worth of data
+ // watchForCommit will clean up atleast one entry from the map where each
+ // entry corresponds to flushSize worth of data
assertThat(blockOutputStream.getCommitIndex2flushedDataMap().size())
- .isLessThanOrEqualTo(2);
+ .isLessThanOrEqualTo(1);
// This will flush the data and update the flush length and the map.
key.flush();
// Since the data in the buffer is already flushed, flush here will have
// no impact on the counters and data structures
- assertEquals(4, blockOutputStream.getBufferPool().getSize());
+ assertThat(blockOutputStream.getBufferPool().getSize())
+ .isLessThanOrEqualTo(4);
assertEquals(dataLength, blockOutputStream.getWrittenDataLength());
assertEquals(dataLength, blockOutputStream.getTotalDataFlushedLength());
- // flush will make sure one more entry gets updated in the map
- assertEquals(0,
blockOutputStream.getCommitIndex2flushedDataMap().size());
+ // flush will make sure one more entry gets updated in the map
+ assertThat(blockOutputStream.getCommitIndex2flushedDataMap().size())
+ .isLessThanOrEqualTo(2);
XceiverClientRatis raftClient =
(XceiverClientRatis) blockOutputStream.getXceiverClient();
assertEquals(1, raftClient.getCommitInfoMap().size());
- Pipeline pipeline = raftClient.getPipeline();
- cluster.shutdownHddsDatanode(pipeline.getNodes().get(0));
-
- // again write data with more than max buffer limit. This will call
- // watchForCommit again. No write will happen in the current block and
- // data will be rewritten to the next block.
-
+ // Close the containers on the Datanode and write more data
+ TestHelper.waitForContainerClose(key, cluster);
+ // 4 writeChunks = maxFlushSize + 2 putBlocks will be discarded here
+ // once exception is hit
key.write(data1);
+
+ // As a part of handling the exception, 4 failed writeChunks will be
+ // rewritten plus one partial chunk plus two putBlocks for flushSize
+ // and one flush for partial chunk
key.flush();
- assertInstanceOf(RaftRetryFailureException.class,
+ assertInstanceOf(ContainerNotOpenException.class,
checkForException(blockOutputStream.getIoException()));
- assertEquals(1, raftClient.getCommitInfoMap().size());
// Make sure the retryCount is reset after the exception is handled
assertEquals(0, keyOutputStream.getRetryCount());
+ // commitInfoMap will remain intact as there is no server failure
+ assertEquals(1, raftClient.getCommitInfoMap().size());
assertEquals(2, keyOutputStream.getStreamEntries().size());
// now close the stream, It will update ack length after watchForCommit
key.close();
Review Comment:
Redundant 'close()'
##########
hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStream.java:
##########
@@ -195,64 +195,68 @@ void testWriteLessThanChunkSize(boolean flushDelay,
boolean enablePiggybacking)
metrics.getPendingContainerOpCountMetrics(PutBlock);
long totalOpCount = metrics.getTotalOpCount();
String keyName = getKeyName();
- OzoneOutputStream key = createKey(client, keyName);
int dataLength = 50;
final int totalWriteLength = dataLength * 2;
byte[] data1 = RandomUtils.secure().randomBytes(dataLength);
- key.write(data1);
- KeyOutputStream keyOutputStream =
- assertInstanceOf(KeyOutputStream.class, key.getOutputStream());
-
- assertEquals(1, keyOutputStream.getStreamEntries().size());
- RatisBlockOutputStream blockOutputStream =
- assertInstanceOf(RatisBlockOutputStream.class,
- keyOutputStream.getStreamEntries().get(0).getOutputStream());
-
- // we have written data less than a chunk size, the data will just sit
- // in the buffer, with only one buffer being allocated in the buffer pool
-
- BufferPool bufferPool = blockOutputStream.getBufferPool();
- assertEquals(1, bufferPool.getSize());
- //Just the writtenDataLength will be updated here
- assertEquals(dataLength, blockOutputStream.getWrittenDataLength());
-
- // no data will be flushed till now
- assertEquals(0, blockOutputStream.getTotalDataFlushedLength());
- assertEquals(0, blockOutputStream.getTotalAckDataLength());
- assertEquals(pendingWriteChunkCount,
- metrics.getPendingContainerOpCountMetrics(WriteChunk));
- assertEquals(pendingPutBlockCount,
- metrics.getPendingContainerOpCountMetrics(PutBlock));
+ KeyOutputStream keyOutputStream;
+ RatisBlockOutputStream blockOutputStream;
+ BufferPool bufferPool;
+ try (OzoneOutputStream key = createKey(client, keyName)) {
+ key.write(data1);
+ keyOutputStream =
+ assertInstanceOf(KeyOutputStream.class, key.getOutputStream());
- // commitIndex2FlushedData Map will be empty here
- assertEquals(0,
blockOutputStream.getCommitIndex2flushedDataMap().size());
- // Total write data greater than or equal one chunk
- // size to make sure flush will sync data.
- key.write(data1);
- // This will flush the data and update the flush length and the map.
- key.flush();
+ assertEquals(1, keyOutputStream.getStreamEntries().size());
+ blockOutputStream =
+ assertInstanceOf(RatisBlockOutputStream.class,
+ keyOutputStream.getStreamEntries().get(0).getOutputStream());
- // flush is a sync call, all pending operations will complete
- assertEquals(pendingWriteChunkCount,
- metrics.getPendingContainerOpCountMetrics(WriteChunk));
- assertEquals(pendingPutBlockCount,
- metrics.getPendingContainerOpCountMetrics(PutBlock));
- // we have written data less than a chunk size, the data will just sit
- // in the buffer, with only one buffer being allocated in the buffer pool
+ // we have written data less than a chunk size, the data will just sit
+ // in the buffer, with only one buffer being allocated in the buffer
pool
- assertEquals(1, bufferPool.getSize());
- assertEquals(totalWriteLength, blockOutputStream.getWrittenDataLength());
- assertEquals(totalWriteLength,
- blockOutputStream.getTotalDataFlushedLength());
- assertEquals(0,
- blockOutputStream.getCommitIndex2flushedDataMap().size());
+ bufferPool = blockOutputStream.getBufferPool();
+ assertEquals(1, bufferPool.getSize());
+ //Just the writtenDataLength will be updated here
+ assertEquals(dataLength, blockOutputStream.getWrittenDataLength());
- // flush ensures watchForCommit updates the total length acknowledged
- assertEquals(totalWriteLength,
blockOutputStream.getTotalAckDataLength());
+ // no data will be flushed till now
+ assertEquals(0, blockOutputStream.getTotalDataFlushedLength());
+ assertEquals(0, blockOutputStream.getTotalAckDataLength());
+ assertEquals(pendingWriteChunkCount,
+ metrics.getPendingContainerOpCountMetrics(WriteChunk));
+ assertEquals(pendingPutBlockCount,
+ metrics.getPendingContainerOpCountMetrics(PutBlock));
- assertEquals(1, keyOutputStream.getStreamEntries().size());
- // now close the stream, It will update ack length after watchForCommit
- key.close();
+ // commitIndex2FlushedData Map will be empty here
+ assertEquals(0,
blockOutputStream.getCommitIndex2flushedDataMap().size());
+ // Total write data greater than or equal one chunk
+ // size to make sure flush will sync data.
+ key.write(data1);
+ // This will flush the data and update the flush length and the map.
+ key.flush();
+
+ // flush is a sync call, all pending operations will complete
+ assertEquals(pendingWriteChunkCount,
+ metrics.getPendingContainerOpCountMetrics(WriteChunk));
+ assertEquals(pendingPutBlockCount,
+ metrics.getPendingContainerOpCountMetrics(PutBlock));
+ // we have written data less than a chunk size, the data will just sit
+ // in the buffer, with only one buffer being allocated in the buffer
pool
+
+ assertEquals(1, bufferPool.getSize());
+ assertEquals(totalWriteLength,
blockOutputStream.getWrittenDataLength());
+ assertEquals(totalWriteLength,
+ blockOutputStream.getTotalDataFlushedLength());
+ assertEquals(0,
+ blockOutputStream.getCommitIndex2flushedDataMap().size());
+
+ // flush ensures watchForCommit updates the total length acknowledged
+ assertEquals(totalWriteLength,
blockOutputStream.getTotalAckDataLength());
+
+ assertEquals(1, keyOutputStream.getStreamEntries().size());
+ // now close the stream, It will update ack length after watchForCommit
+ key.close();
Review Comment:
Redundant 'close()'
##########
hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStreamWithFailures.java:
##########
@@ -107,259 +107,262 @@ void testContainerClose(boolean flushDelay, boolean
enablePiggybacking) throws E
private void testWatchForCommitWithCloseContainerException(OzoneClient
client)
throws Exception {
String keyName = getKeyName();
- OzoneOutputStream key = createKey(client, keyName);
int dataLength = MAX_FLUSH_SIZE + CHUNK_SIZE;
byte[] data1 = RandomUtils.secure().randomBytes(dataLength);
- key.write(data1);
-
- KeyOutputStream keyOutputStream =
- assertInstanceOf(KeyOutputStream.class, key.getOutputStream());
-
- assertEquals(1, keyOutputStream.getStreamEntries().size());
- RatisBlockOutputStream blockOutputStream =
- assertInstanceOf(RatisBlockOutputStream.class,
- keyOutputStream.getStreamEntries().get(0).getOutputStream());
-
- // we have just written data more than flush Size(2 chunks), at this time
- // buffer pool will have 4 buffers allocated worth of chunk size
-
- assertEquals(4, blockOutputStream.getBufferPool().getSize());
- // writtenDataLength as well flushedDataLength will be updated here
- assertEquals(dataLength, blockOutputStream.getWrittenDataLength());
-
- assertEquals(MAX_FLUSH_SIZE,
blockOutputStream.getTotalDataFlushedLength());
-
- // since data equals to maxBufferSize is written, this will be a blocking
- // call and hence will wait for atleast flushSize worth of data to get
- // ack'd by all servers right here
- assertThat(blockOutputStream.getTotalAckDataLength())
- .isGreaterThanOrEqualTo(FLUSH_SIZE);
-
- // watchForCommit will clean up atleast one entry from the map where each
- // entry corresponds to flushSize worth of data
- assertThat(blockOutputStream.getCommitIndex2flushedDataMap().size())
- .isLessThanOrEqualTo(1);
-
- // This will flush the data and update the flush length and the map.
- key.flush();
-
- // flush is a sync call, all pending operations will complete
- // Since the data in the buffer is already flushed, flush here will have
- // no impact on the counters and data structures
-
- assertEquals(4, blockOutputStream.getBufferPool().getSize());
- assertEquals(dataLength, blockOutputStream.getWrittenDataLength());
-
- assertEquals(dataLength, blockOutputStream.getTotalDataFlushedLength());
- // flush will make sure one more entry gets updated in the map
- assertThat(blockOutputStream.getCommitIndex2flushedDataMap().size())
- .isLessThanOrEqualTo(2);
-
- XceiverClientRatis raftClient =
- (XceiverClientRatis) blockOutputStream.getXceiverClient();
- assertEquals(3, raftClient.getCommitInfoMap().size());
- // Close the containers on the Datanode and write more data
- TestHelper.waitForContainerClose(key, cluster);
- key.write(data1);
-
- // As a part of handling the exception, 4 failed writeChunks will be
- // rewritten plus one partial chunk plus two putBlocks for flushSize
- // and one flush for partial chunk
- key.flush();
- assertEquals(2, keyOutputStream.getStreamEntries().size());
- assertInstanceOf(ContainerNotOpenException.class,
- checkForException(blockOutputStream.getIoException()));
-
- // Make sure the retryCount is reset after the exception is handled
- assertEquals(0, keyOutputStream.getRetryCount());
- // commitInfoMap will remain intact as there is no server failure
- assertEquals(3, raftClient.getCommitInfoMap().size());
- // now close the stream, It will update ack length after watchForCommit
- key.close();
- // make sure the bufferPool is empty
- assertEquals(0, blockOutputStream.getBufferPool().computeBufferData());
- assertEquals(dataLength, blockOutputStream.getTotalAckDataLength());
- assertEquals(0, blockOutputStream.getCommitIndex2flushedDataMap().size());
- assertEquals(0, keyOutputStream.getStreamEntries().size());
- // Written the same data twice
- byte[] bytes = ArrayUtils.addAll(data1, data1);
- validateData(keyName, bytes, client.getObjectStore(), VOLUME, BUCKET);
- }
-
- @ParameterizedTest
- @MethodSource("clientParameters")
- void testWatchForCommitDatanodeFailure(boolean flushDelay, boolean
enablePiggybacking) throws Exception {
- OzoneClientConfig config = newClientConfig(cluster.getConf(), flushDelay,
enablePiggybacking);
- try (OzoneClient client = newClient(cluster.getConf(), config)) {
- String keyName = getKeyName();
- OzoneOutputStream key = createKey(client, keyName);
- int dataLength = MAX_FLUSH_SIZE + CHUNK_SIZE;
- byte[] data1 = RandomUtils.secure().randomBytes(dataLength);
+ KeyOutputStream keyOutputStream;
+ RatisBlockOutputStream blockOutputStream;
+ try (OzoneOutputStream key = createKey(client, keyName)) {
key.write(data1);
- // since its hitting the full bufferCondition, it will call
watchForCommit
- // and completes at least putBlock for first flushSize worth of data
- KeyOutputStream keyOutputStream =
- assertInstanceOf(KeyOutputStream.class, key.getOutputStream());
+
+ keyOutputStream = assertInstanceOf(KeyOutputStream.class,
key.getOutputStream());
assertEquals(1, keyOutputStream.getStreamEntries().size());
- RatisBlockOutputStream blockOutputStream =
- assertInstanceOf(RatisBlockOutputStream.class,
- keyOutputStream.getStreamEntries().get(0).getOutputStream());
+ blockOutputStream = assertInstanceOf(RatisBlockOutputStream.class,
+ keyOutputStream.getStreamEntries().get(0).getOutputStream());
// we have just written data more than flush Size(2 chunks), at this time
- // buffer pool will have 3 buffers allocated worth of chunk size
+ // buffer pool will have 4 buffers allocated worth of chunk size
assertEquals(4, blockOutputStream.getBufferPool().getSize());
// writtenDataLength as well flushedDataLength will be updated here
assertEquals(dataLength, blockOutputStream.getWrittenDataLength());
- // since data written is still less than flushLength, flushLength will
- // still be 0.
- assertEquals(MAX_FLUSH_SIZE,
- blockOutputStream.getTotalDataFlushedLength());
+ assertEquals(MAX_FLUSH_SIZE,
blockOutputStream.getTotalDataFlushedLength());
// since data equals to maxBufferSize is written, this will be a blocking
// call and hence will wait for atleast flushSize worth of data to get
// ack'd by all servers right here
assertThat(blockOutputStream.getTotalAckDataLength())
.isGreaterThanOrEqualTo(FLUSH_SIZE);
- // watchForCommit will clean up atleast flushSize worth of data buffer
- // where each entry corresponds to flushSize worth of data
+ // watchForCommit will clean up atleast one entry from the map where each
+ // entry corresponds to flushSize worth of data
assertThat(blockOutputStream.getCommitIndex2flushedDataMap().size())
- .isLessThanOrEqualTo(2);
+ .isLessThanOrEqualTo(1);
// This will flush the data and update the flush length and the map.
key.flush();
+ // flush is a sync call, all pending operations will complete
// Since the data in the buffer is already flushed, flush here will have
// no impact on the counters and data structures
assertEquals(4, blockOutputStream.getBufferPool().getSize());
assertEquals(dataLength, blockOutputStream.getWrittenDataLength());
assertEquals(dataLength, blockOutputStream.getTotalDataFlushedLength());
- // flush will make sure one more entry gets updated in the map
- assertEquals(0,
blockOutputStream.getCommitIndex2flushedDataMap().size());
+ // flush will make sure one more entry gets updated in the map
+ assertThat(blockOutputStream.getCommitIndex2flushedDataMap().size())
+ .isLessThanOrEqualTo(2);
XceiverClientRatis raftClient =
(XceiverClientRatis) blockOutputStream.getXceiverClient();
assertEquals(3, raftClient.getCommitInfoMap().size());
- Pipeline pipeline = raftClient.getPipeline();
- stopAndRemove(pipeline.getNodes().get(0));
-
- // again write data with more than max buffer limit. This will call
- // watchForCommit again. Since the commit will happen 2 way, the
- // commitInfoMap will get updated for servers which are alive
+ // Close the containers on the Datanode and write more data
+ TestHelper.waitForContainerClose(key, cluster);
key.write(data1);
+ // As a part of handling the exception, 4 failed writeChunks will be
+ // rewritten plus one partial chunk plus two putBlocks for flushSize
+ // and one flush for partial chunk
key.flush();
-
assertEquals(2, keyOutputStream.getStreamEntries().size());
- // now close the stream, It will update ack length after watchForCommit
- key.close();
+ assertInstanceOf(ContainerNotOpenException.class,
+ checkForException(blockOutputStream.getIoException()));
+
// Make sure the retryCount is reset after the exception is handled
assertEquals(0, keyOutputStream.getRetryCount());
- // make sure the bufferPool is empty
- assertEquals(0, blockOutputStream.getBufferPool().computeBufferData());
- assertEquals(0,
blockOutputStream.getCommitIndex2flushedDataMap().size());
- assertEquals(0, keyOutputStream.getStreamEntries().size());
- // Written the same data twice
- byte[] bytes = ArrayUtils.addAll(data1, data1);
- validateData(keyName, bytes, client.getObjectStore(), VOLUME, BUCKET);
+ // commitInfoMap will remain intact as there is no server failure
+ assertEquals(3, raftClient.getCommitInfoMap().size());
+ // now close the stream, It will update ack length after watchForCommit
+ key.close();
}
+ // make sure the bufferPool is empty
+ assertEquals(0, blockOutputStream.getBufferPool().computeBufferData());
+ assertEquals(dataLength, blockOutputStream.getTotalAckDataLength());
+ assertEquals(0, blockOutputStream.getCommitIndex2flushedDataMap().size());
+ assertEquals(0, keyOutputStream.getStreamEntries().size());
+ // Written the same data twice
+ byte[] bytes = ArrayUtils.addAll(data1, data1);
+ validateData(keyName, bytes, client.getObjectStore(), VOLUME, BUCKET);
}
@ParameterizedTest
@MethodSource("clientParameters")
- void test2DatanodesFailure(boolean flushDelay, boolean enablePiggybacking)
throws Exception {
+ void testWatchForCommitDatanodeFailure(boolean flushDelay, boolean
enablePiggybacking) throws Exception {
OzoneClientConfig config = newClientConfig(cluster.getConf(), flushDelay,
enablePiggybacking);
try (OzoneClient client = newClient(cluster.getConf(), config)) {
String keyName = getKeyName();
- OzoneOutputStream key = createKey(client, keyName);
int dataLength = MAX_FLUSH_SIZE + CHUNK_SIZE;
byte[] data1 = RandomUtils.secure().randomBytes(dataLength);
- key.write(data1);
- // since its hitting the full bufferCondition, it will call
watchForCommit
- // and completes atleast putBlock for first flushSize worth of data
- KeyOutputStream keyOutputStream =
- assertInstanceOf(KeyOutputStream.class, key.getOutputStream());
-
- RatisBlockOutputStream blockOutputStream =
- assertInstanceOf(RatisBlockOutputStream.class,
- keyOutputStream.getStreamEntries().get(0).getOutputStream());
+ KeyOutputStream keyOutputStream;
+ RatisBlockOutputStream blockOutputStream;
+ try (OzoneOutputStream key = createKey(client, keyName)) {
+ key.write(data1);
+ // since its hitting the full bufferCondition, it will call
watchForCommit
+ // and completes at least putBlock for first flushSize worth of data
+ keyOutputStream = assertInstanceOf(KeyOutputStream.class,
key.getOutputStream());
+
+ assertEquals(1, keyOutputStream.getStreamEntries().size());
+ blockOutputStream = assertInstanceOf(RatisBlockOutputStream.class,
+ keyOutputStream.getStreamEntries().get(0).getOutputStream());
- // we have just written data more than flush Size(2 chunks), at this time
- // buffer pool will have 3 buffers allocated worth of chunk size
+ // we have just written data more than flush Size(2 chunks), at this
time
+ // buffer pool will have 3 buffers allocated worth of chunk size
- assertEquals(4, blockOutputStream.getBufferPool().getSize());
- // writtenDataLength as well flushedDataLength will be updated here
- assertEquals(dataLength, blockOutputStream.getWrittenDataLength());
+ assertEquals(4, blockOutputStream.getBufferPool().getSize());
+ // writtenDataLength as well flushedDataLength will be updated here
+ assertEquals(dataLength, blockOutputStream.getWrittenDataLength());
- assertEquals(MAX_FLUSH_SIZE,
- blockOutputStream.getTotalDataFlushedLength());
+ // since data written is still less than flushLength, flushLength will
+ // still be 0.
+ assertEquals(MAX_FLUSH_SIZE,
+ blockOutputStream.getTotalDataFlushedLength());
- // since data equals to maxBufferSize is written, this will be a blocking
- // call and hence will wait for atleast flushSize worth of data to get
- // acked by all servers right here
- assertThat(blockOutputStream.getTotalAckDataLength())
- .isGreaterThanOrEqualTo(FLUSH_SIZE);
+ // since data equals to maxBufferSize is written, this will be a
blocking
+ // call and hence will wait for atleast flushSize worth of data to get
+ // ack'd by all servers right here
+ assertThat(blockOutputStream.getTotalAckDataLength())
+ .isGreaterThanOrEqualTo(FLUSH_SIZE);
- // watchForCommit will clean up atleast one entry from the map where each
- // entry corresponds to flushSize worth of data
- assertThat(blockOutputStream.getCommitIndex2flushedDataMap().size())
- .isLessThanOrEqualTo(1);
+ // watchForCommit will clean up atleast flushSize worth of data buffer
+ // where each entry corresponds to flushSize worth of data
+ assertThat(blockOutputStream.getCommitIndex2flushedDataMap().size())
+ .isLessThanOrEqualTo(2);
- // This will flush the data and update the flush length and the map.
- key.flush();
+ // This will flush the data and update the flush length and the map.
+ key.flush();
- // Since the data in the buffer is already flushed, flush here will have
- // no impact on the counters and data structures
+ // Since the data in the buffer is already flushed, flush here will
have
+ // no impact on the counters and data structures
- assertEquals(4, blockOutputStream.getBufferPool().getSize());
- assertEquals(dataLength, blockOutputStream.getWrittenDataLength());
+ assertEquals(4, blockOutputStream.getBufferPool().getSize());
+ assertEquals(dataLength, blockOutputStream.getWrittenDataLength());
- assertEquals(dataLength, blockOutputStream.getTotalDataFlushedLength());
- // flush will make sure one more entry gets updated in the map
- assertThat(blockOutputStream.getCommitIndex2flushedDataMap().size())
- .isLessThanOrEqualTo(2);
+ assertEquals(dataLength,
blockOutputStream.getTotalDataFlushedLength());
+ // flush will make sure one more entry gets updated in the map
+ assertEquals(0,
blockOutputStream.getCommitIndex2flushedDataMap().size());
- XceiverClientRatis raftClient =
- (XceiverClientRatis) blockOutputStream.getXceiverClient();
- assertEquals(3, raftClient.getCommitInfoMap().size());
- Pipeline pipeline = raftClient.getPipeline();
- stopAndRemove(pipeline.getNodes().get(0));
- stopAndRemove(pipeline.getNodes().get(1));
- // again write data with more than max buffer limit. This will call
- // watchForCommit again. Since the commit will happen 2 way, the
- // commitInfoMap will get updated for servers which are alive
+ XceiverClientRatis raftClient =
+ (XceiverClientRatis) blockOutputStream.getXceiverClient();
+ assertEquals(3, raftClient.getCommitInfoMap().size());
+ Pipeline pipeline = raftClient.getPipeline();
+ stopAndRemove(pipeline.getNodes().get(0));
- // 4 writeChunks = maxFlushSize + 2 putBlocks will be discarded here
- // once exception is hit
- key.write(data1);
+ // again write data with more than max buffer limit. This will call
+ // watchForCommit again. Since the commit will happen 2 way, the
+ // commitInfoMap will get updated for servers which are alive
+ key.write(data1);
- // As a part of handling the exception, 4 failed writeChunks will be
- // rewritten plus one partial chunk plus two putBlocks for flushSize
- // and one flush for partial chunk
- key.flush();
+ key.flush();
- Throwable ioException = checkForException(
- blockOutputStream.getIoException());
- // Since, 2 datanodes went down,
- // a) if the pipeline gets destroyed quickly it will hit
- // GroupMismatchException.
- // b) will hit close container exception if the container is closed
- // but pipeline is still not destroyed.
- // c) will fail with RaftRetryFailureException if the leader election
- // did not finish before the request retry count finishes.
- assertTrue(ioException instanceof RaftRetryFailureException
- || ioException instanceof GroupMismatchException
- || ioException instanceof ContainerNotOpenException);
+ assertEquals(2, keyOutputStream.getStreamEntries().size());
+ // now close the stream, It will update ack length after watchForCommit
+ key.close();
+ }
// Make sure the retryCount is reset after the exception is handled
assertEquals(0, keyOutputStream.getRetryCount());
- // now close the stream, It will update ack length after watchForCommit
+ // make sure the bufferPool is empty
+ assertEquals(0, blockOutputStream.getBufferPool().computeBufferData());
+ assertEquals(0,
blockOutputStream.getCommitIndex2flushedDataMap().size());
+ assertEquals(0, keyOutputStream.getStreamEntries().size());
+ // Written the same data twice
+ byte[] bytes = ArrayUtils.addAll(data1, data1);
+ validateData(keyName, bytes, client.getObjectStore(), VOLUME, BUCKET);
+ }
+ }
- key.close();
+ @ParameterizedTest
+ @MethodSource("clientParameters")
+ void test2DatanodesFailure(boolean flushDelay, boolean enablePiggybacking)
throws Exception {
+ OzoneClientConfig config = newClientConfig(cluster.getConf(), flushDelay,
enablePiggybacking);
+ try (OzoneClient client = newClient(cluster.getConf(), config)) {
+ String keyName = getKeyName();
+ int dataLength = MAX_FLUSH_SIZE + CHUNK_SIZE;
+ byte[] data1 = RandomUtils.secure().randomBytes(dataLength);
+ KeyOutputStream keyOutputStream;
+ RatisBlockOutputStream blockOutputStream;
+ try (OzoneOutputStream key = createKey(client, keyName)) {
+ key.write(data1);
+ // since its hitting the full bufferCondition, it will call
watchForCommit
+ // and completes atleast putBlock for first flushSize worth of data
+ keyOutputStream = assertInstanceOf(KeyOutputStream.class,
key.getOutputStream());
+
+ blockOutputStream = assertInstanceOf(RatisBlockOutputStream.class,
+ keyOutputStream.getStreamEntries().get(0).getOutputStream());
+
+ // we have just written data more than flush Size(2 chunks), at this
time
+ // buffer pool will have 3 buffers allocated worth of chunk size
+
+ assertEquals(4, blockOutputStream.getBufferPool().getSize());
+ // writtenDataLength as well flushedDataLength will be updated here
+ assertEquals(dataLength, blockOutputStream.getWrittenDataLength());
+
+ assertEquals(MAX_FLUSH_SIZE,
+ blockOutputStream.getTotalDataFlushedLength());
+
+ // since data equals to maxBufferSize is written, this will be a
blocking
+ // call and hence will wait for atleast flushSize worth of data to get
+ // acked by all servers right here
+ assertThat(blockOutputStream.getTotalAckDataLength())
+ .isGreaterThanOrEqualTo(FLUSH_SIZE);
+
+ // watchForCommit will clean up atleast one entry from the map where
each
+ // entry corresponds to flushSize worth of data
+ assertThat(blockOutputStream.getCommitIndex2flushedDataMap().size())
+ .isLessThanOrEqualTo(1);
+
+ // This will flush the data and update the flush length and the map.
+ key.flush();
+
+ // Since the data in the buffer is already flushed, flush here will
have
+ // no impact on the counters and data structures
+
+ assertEquals(4, blockOutputStream.getBufferPool().getSize());
+ assertEquals(dataLength, blockOutputStream.getWrittenDataLength());
+
+ assertEquals(dataLength,
blockOutputStream.getTotalDataFlushedLength());
+ // flush will make sure one more entry gets updated in the map
+ assertThat(blockOutputStream.getCommitIndex2flushedDataMap().size())
+ .isLessThanOrEqualTo(2);
+
+ XceiverClientRatis raftClient =
+ (XceiverClientRatis) blockOutputStream.getXceiverClient();
+ assertEquals(3, raftClient.getCommitInfoMap().size());
+ Pipeline pipeline = raftClient.getPipeline();
+ stopAndRemove(pipeline.getNodes().get(0));
+ stopAndRemove(pipeline.getNodes().get(1));
+ // again write data with more than max buffer limit. This will call
+ // watchForCommit again. Since the commit will happen 2 way, the
+ // commitInfoMap will get updated for servers which are alive
+
+ // 4 writeChunks = maxFlushSize + 2 putBlocks will be discarded here
+ // once exception is hit
+ key.write(data1);
+
+ // As a part of handling the exception, 4 failed writeChunks will be
+ // rewritten plus one partial chunk plus two putBlocks for flushSize
+ // and one flush for partial chunk
+ key.flush();
+
+ Throwable ioException = checkForException(
+ blockOutputStream.getIoException());
+ // Since, 2 datanodes went down,
+ // a) if the pipeline gets destroyed quickly it will hit
+ // GroupMismatchException.
+ // b) will hit close container exception if the container is closed
+ // but pipeline is still not destroyed.
+ // c) will fail with RaftRetryFailureException if the leader election
+ // did not finish before the request retry count finishes.
+ assertTrue(ioException instanceof RaftRetryFailureException
+ || ioException instanceof GroupMismatchException
+ || ioException instanceof ContainerNotOpenException);
+ // Make sure the retryCount is reset after the exception is handled
+ assertEquals(0, keyOutputStream.getRetryCount());
+ // now close the stream, It will update ack length after watchForCommit
+
+ key.close();
Review Comment:
Redundant 'close()'
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]