hemantk-12 commented on code in PR #5707:
URL: https://github.com/apache/ozone/pull/5707#discussion_r1427667244


##########
hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStream.java:
##########
@@ -18,691 +18,690 @@
 package org.apache.hadoop.ozone.client.rpc;
 
 import java.io.IOException;
-import java.io.OutputStream;
+import java.time.Duration;
 import java.util.UUID;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 
+import org.apache.commons.lang3.RandomUtils;
 import org.apache.hadoop.conf.StorageUnit;
+import org.apache.hadoop.hdds.client.ReplicationFactor;
 import org.apache.hadoop.hdds.client.ReplicationType;
+import org.apache.hadoop.hdds.conf.ConfigurationSource;
+import org.apache.hadoop.hdds.conf.DatanodeRatisServerConfig;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
-import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
 import 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ChecksumType;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.ratis.conf.RatisClientConfig;
 import org.apache.hadoop.hdds.scm.OzoneClientConfig;
 import org.apache.hadoop.hdds.scm.XceiverClientManager;
 import org.apache.hadoop.hdds.scm.XceiverClientMetrics;
-import org.apache.hadoop.hdds.scm.storage.BlockOutputStream;
+import org.apache.hadoop.hdds.scm.storage.BufferPool;
 import org.apache.hadoop.hdds.scm.storage.RatisBlockOutputStream;
-import org.apache.hadoop.hdds.utils.IOUtils;
 import org.apache.hadoop.ozone.MiniOzoneCluster;
-import org.apache.hadoop.ozone.OzoneConfigKeys;
 import org.apache.hadoop.ozone.client.ObjectStore;
 import org.apache.hadoop.ozone.client.OzoneClient;
 import org.apache.hadoop.ozone.client.OzoneClientFactory;
 import org.apache.hadoop.ozone.client.io.KeyOutputStream;
 import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
-import org.apache.hadoop.ozone.container.ContainerTestHelper;
 import org.apache.hadoop.ozone.container.TestHelper;
 
-import static java.nio.charset.StandardCharsets.UTF_8;
-import static 
org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_STALENODE_INTERVAL;
-
 import org.junit.jupiter.api.AfterAll;
-import org.junit.Assert;
 import org.junit.jupiter.api.BeforeAll;
-import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInstance;
 import org.junit.jupiter.api.Timeout;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
 
-/**
- * Tests BlockOutputStream class.
- */
+import static 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Type.PutBlock;
+import static 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Type.WriteChunk;
+import static 
org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_DATANODE_PIPELINE_LIMIT;
+import static 
org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_DEADNODE_INTERVAL;
+import static 
org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_STALENODE_INTERVAL;
+import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_SCM_BLOCK_SIZE;
+import static org.apache.hadoop.ozone.container.TestHelper.validateData;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertInstanceOf;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+@TestInstance(TestInstance.Lifecycle.PER_CLASS)
 @Timeout(300)
-public class TestBlockOutputStream {
-
-  private static MiniOzoneCluster cluster;
-  private static OzoneConfiguration conf = new OzoneConfiguration();
-  private static OzoneClient client;
-  private static ObjectStore objectStore;
-  private static int chunkSize;
-  private static int flushSize;
-  private static int maxFlushSize;
-  private static int blockSize;
-  private static String volumeName;
-  private static String bucketName;
-  private static String keyString;
-
-  /**
-   * Create a MiniDFSCluster for testing.
-   * <p>
-   * Ozone is made active by setting OZONE_ENABLED = true
-   *
-   * @throws IOException
-   */
-  @BeforeAll
-  public static void init() throws Exception {
-    chunkSize = 100;
-    flushSize = 2 * chunkSize;
-    maxFlushSize = 2 * flushSize;
-    blockSize = 2 * maxFlushSize;
+class TestBlockOutputStream {
 
-    OzoneClientConfig clientConfig = conf.getObject(OzoneClientConfig.class);
-    clientConfig.setChecksumType(ChecksumType.NONE);
-    clientConfig.setStreamBufferFlushDelay(false);
-    conf.setFromObject(clientConfig);
+  static final int CHUNK_SIZE = 100;
+  static final int FLUSH_SIZE = 2 * CHUNK_SIZE;
+  static final int MAX_FLUSH_SIZE = 2 * FLUSH_SIZE;
+  static final int BLOCK_SIZE = 2 * MAX_FLUSH_SIZE;
+  static final String VOLUME = "testblockoutputstream";
+  static final String BUCKET = VOLUME;
+
+  private MiniOzoneCluster cluster;
 
+  static MiniOzoneCluster createCluster() throws IOException,
+      InterruptedException, TimeoutException {
+
+    OzoneConfiguration conf = new OzoneConfiguration();
     conf.setTimeDuration(OZONE_SCM_STALENODE_INTERVAL, 3, TimeUnit.SECONDS);
+    conf.setTimeDuration(OZONE_SCM_DEADNODE_INTERVAL, 6, TimeUnit.SECONDS);
     conf.setQuietMode(false);
-    conf.setStorageSize(OzoneConfigKeys.OZONE_SCM_BLOCK_SIZE, 4,
-        StorageUnit.MB);
-
-    cluster = MiniOzoneCluster.newBuilder(conf)
+    conf.setStorageSize(OZONE_SCM_BLOCK_SIZE, 4, StorageUnit.MB);
+    conf.setInt(OZONE_DATANODE_PIPELINE_LIMIT, 3);
+
+    DatanodeRatisServerConfig ratisServerConfig =
+        conf.getObject(DatanodeRatisServerConfig.class);
+    ratisServerConfig.setRequestTimeOut(Duration.ofSeconds(3));
+    ratisServerConfig.setWatchTimeOut(Duration.ofSeconds(3));
+    conf.setFromObject(ratisServerConfig);
+
+    RatisClientConfig.RaftConfig raftClientConfig =
+        conf.getObject(RatisClientConfig.RaftConfig.class);
+    raftClientConfig.setRpcRequestTimeout(Duration.ofSeconds(3));
+    raftClientConfig.setRpcWatchRequestTimeout(Duration.ofSeconds(3));
+    conf.setFromObject(raftClientConfig);
+
+    RatisClientConfig ratisClientConfig =
+        conf.getObject(RatisClientConfig.class);
+    ratisClientConfig.setWriteRequestTimeout(Duration.ofSeconds(30));
+    ratisClientConfig.setWatchRequestTimeout(Duration.ofSeconds(30));
+    conf.setFromObject(ratisClientConfig);
+
+    MiniOzoneCluster cluster = MiniOzoneCluster.newBuilder(conf)
         .setNumDatanodes(5)
         .setTotalPipelineNumLimit(3)
-        .setBlockSize(blockSize)
-        .setChunkSize(chunkSize)
-        .setStreamBufferFlushSize(flushSize)
-        .setStreamBufferMaxSize(maxFlushSize)
+        .setBlockSize(BLOCK_SIZE)
+        .setChunkSize(CHUNK_SIZE)
+        .setStreamBufferFlushSize(FLUSH_SIZE)
+        .setStreamBufferMaxSize(MAX_FLUSH_SIZE)
         .setStreamBufferSizeUnit(StorageUnit.BYTES)
         .build();
     cluster.waitForClusterToBeReady();
-    //the easiest way to create an open container is creating a key
-    client = OzoneClientFactory.getRpcClient(conf);
-    objectStore = client.getObjectStore();
-    keyString = UUID.randomUUID().toString();
-    volumeName = "testblockoutputstream";
-    bucketName = volumeName;
-    objectStore.createVolume(volumeName);
-    objectStore.getVolume(volumeName).createBucket(bucketName);
+
+    cluster.waitForPipelineTobeReady(HddsProtos.ReplicationFactor.THREE,
+        180000);
+
+    try (OzoneClient client = cluster.newClient()) {
+      ObjectStore objectStore = client.getObjectStore();
+      objectStore.createVolume(VOLUME);
+      objectStore.getVolume(VOLUME).createBucket(BUCKET);
+    }
+
+    return cluster;
   }
 
-  private String getKeyName() {
-    return UUID.randomUUID().toString();
+  @BeforeAll
+  void init() throws Exception {
+    cluster = createCluster();
   }
 
-  /**
-   * Shutdown MiniDFSCluster.
-   */
   @AfterAll
-  public static void shutdown() {
-    IOUtils.closeQuietly(client);
+  void shutdown() {
     if (cluster != null) {
       cluster.shutdown();
     }
   }
 
-  @Test
-  public void testBufferCaching() throws Exception {
-    XceiverClientMetrics metrics =
-        XceiverClientManager.getXceiverClientMetrics();
-    long writeChunkCount = metrics.getContainerOpCountMetrics(
-        ContainerProtos.Type.WriteChunk);
-    long putBlockCount = metrics.getContainerOpCountMetrics(
-        ContainerProtos.Type.PutBlock);
-    long pendingWriteChunkCount =  metrics.getPendingContainerOpCountMetrics(
-        ContainerProtos.Type.WriteChunk);
-    long pendingPutBlockCount = metrics.getPendingContainerOpCountMetrics(
-        ContainerProtos.Type.PutBlock);
-    long totalOpCount = metrics.getTotalOpCount();
-    String keyName = getKeyName();
-    OzoneOutputStream key = createKey(keyName, ReplicationType.RATIS, 0);
-    int dataLength = 50;
-    byte[] data1 =
-        ContainerTestHelper.getFixedLengthString(keyString, dataLength)
-            .getBytes(UTF_8);
-    key.write(data1);
-    Assert.assertTrue(key.getOutputStream() instanceof KeyOutputStream);
-    KeyOutputStream keyOutputStream = (KeyOutputStream)key.getOutputStream();
-
-    Assert.assertTrue(keyOutputStream.getStreamEntries().size() == 1);
-    OutputStream stream = keyOutputStream.getStreamEntries().get(0)
-        .getOutputStream();
-    Assert.assertTrue(stream instanceof BlockOutputStream);
-    RatisBlockOutputStream blockOutputStream = (RatisBlockOutputStream) stream;
-
-    // we have just written data less than a chunk size, the data will just sit
-    // in the buffer, with only one buffer being allocated in the buffer pool
-
-    Assert.assertEquals(1, blockOutputStream.getBufferPool().getSize());
-    //Just the writtenDataLength will be updated here
-    Assert.assertEquals(dataLength, blockOutputStream.getWrittenDataLength());
-
-    // no data will be flushed till now
-    Assert.assertEquals(0, blockOutputStream.getTotalDataFlushedLength());
-    Assert.assertEquals(0, blockOutputStream.getTotalAckDataLength());
-    Assert.assertEquals(pendingWriteChunkCount,
-        XceiverClientManager.getXceiverClientMetrics()
-            .getPendingContainerOpCountMetrics(
-                ContainerProtos.Type.WriteChunk));
-    Assert.assertEquals(pendingPutBlockCount,
-        XceiverClientManager.getXceiverClientMetrics()
-            .getPendingContainerOpCountMetrics(ContainerProtos.Type.PutBlock));
-
-    // commitIndex2FlushedData Map will be empty here
-    Assert.assertTrue(
-        blockOutputStream.getCommitIndex2flushedDataMap().isEmpty());
-
-    // Now do a flush. This will flush the data and update the flush length and
-    // the map.
-    key.flush();
-
-    // flush is a sync call, all pending operations will complete
-    Assert.assertEquals(pendingWriteChunkCount, metrics
-        .getPendingContainerOpCountMetrics(ContainerProtos.Type.WriteChunk));
-    Assert.assertEquals(pendingPutBlockCount, metrics
-        .getPendingContainerOpCountMetrics(ContainerProtos.Type.PutBlock));
-    // we have just written data less than a chunk size, the data will just sit
-    // in the buffer, with only one buffer being allocated in the buffer pool
-
-    Assert.assertEquals(1, blockOutputStream.getBufferPool().getSize());
-    Assert.assertEquals(0,
-        blockOutputStream.getBufferPool().getBuffer(0).position());
-    Assert.assertEquals(dataLength, blockOutputStream.getWrittenDataLength());
-    Assert.assertEquals(dataLength,
-        blockOutputStream.getTotalDataFlushedLength());
-    Assert.assertEquals(0,
-        blockOutputStream.getCommitIndex2flushedDataMap().size());
-
-    // flush ensures watchForCommit updates the total length acknowledged
-    Assert.assertEquals(dataLength, blockOutputStream.getTotalAckDataLength());
-
-    Assert.assertEquals(1, keyOutputStream.getStreamEntries().size());
-    // now close the stream, It will update the ack length after watchForCommit
-    key.close();
-
-    Assert.assertEquals(pendingWriteChunkCount, metrics
-        .getPendingContainerOpCountMetrics(ContainerProtos.Type.WriteChunk));
-    Assert.assertEquals(pendingPutBlockCount, metrics
-        .getPendingContainerOpCountMetrics(ContainerProtos.Type.PutBlock));
-    Assert.assertEquals(writeChunkCount + 1,
-        metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk));
-    Assert.assertEquals(putBlockCount + 2,
-        metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock));
-    Assert.assertEquals(totalOpCount + 3,
-        metrics.getTotalOpCount());
-
-    // make sure the bufferPool is empty
-    Assert
-        .assertEquals(0, 
blockOutputStream.getBufferPool().computeBufferData());
-    Assert.assertEquals(dataLength, blockOutputStream.getTotalAckDataLength());
-    Assert.assertTrue(
-        blockOutputStream.getCommitIndex2flushedDataMap().isEmpty());
-    Assert.assertEquals(0, keyOutputStream.getStreamEntries().size());
-    validateData(keyName, data1);
+  static OzoneClientConfig newClientConfig(ConfigurationSource source,

Review Comment:
   That's a fair point.
   (Not for this PR, we should update 
[ozone-style](https://github.com/apache/ozone/blob/master/hadoop-ozone/dev-support/intellij/ozone-style.xml)
 to align parameters in same way as you suggested in that case.)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to